mirror of https://gitee.com/anolis/sysom.git
177 lines
5.2 KiB
Python
177 lines
5.2 KiB
Python
# -*- coding: utf-8 -*- #
|
|
"""
|
|
Time 2023/4/28 14:47
|
|
Author: mingfeng (SunnyQjm)
|
|
Email mfeng@linux.alibaba.com
|
|
File gcache.py
|
|
Description:
|
|
"""
|
|
import importlib
|
|
from abc import ABCMeta, abstractmethod
|
|
from typing import Union, Optional, Dict
|
|
from threading import Lock
|
|
from clogger import logger
|
|
from .exceptions import GCacheProtoAlreadyExistsException, \
|
|
GCacheProtoNotExistsException, GCacheException
|
|
from .url import GCacheUrl
|
|
|
|
|
|
class GCache(metaclass=ABCMeta):
|
|
"""
|
|
A generic cache used to caching data
|
|
"""
|
|
|
|
proto_dict = {}
|
|
|
|
proto_lock = Lock()
|
|
|
|
@abstractmethod
|
|
def store(self, key: str, value: Union[int, float, dict, str],
|
|
expire: int = -1) -> bool:
|
|
"""Store something to gcache
|
|
|
|
Args:
|
|
key: key
|
|
value: Value, can be one of [int, float, dict, str]
|
|
expire: Expire time, the stored data will expire after A seconds
|
|
- -1 means never expires
|
|
Returns:
|
|
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def load(self, key: str) -> Union[None, int, float, dict, str]:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def load_all(self) -> Dict[str, Union[int, float, dict, str]]:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def delete(self, key: str) -> bool:
|
|
"""
|
|
Delete specific key
|
|
Args:
|
|
key:
|
|
|
|
Returns:
|
|
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def clean(self):
|
|
"""
|
|
Clean all cache
|
|
Returns:
|
|
|
|
"""
|
|
pass
|
|
|
|
def load_int(self, key: str) -> Optional[int]:
|
|
res = self.load(key)
|
|
if res is None or isinstance(res, int):
|
|
return res
|
|
else:
|
|
raise GCacheException(f"GCache: expect int, get {type(res)}")
|
|
|
|
def load_float(self, key: str) -> Optional[float]:
|
|
res = self.load(key)
|
|
if res is None or isinstance(res, float):
|
|
return res
|
|
else:
|
|
raise GCacheException(f"GCache: expect float, get {type(res)}")
|
|
|
|
def load_dict(self, key: str) -> Optional[dict]:
|
|
res = self.load(key)
|
|
if res is None or isinstance(res, dict):
|
|
return res
|
|
else:
|
|
raise GCacheException(f"GCache: expect dict, get {type(res)}")
|
|
|
|
def load_str(self, key: str) -> Optional[str]:
|
|
res = self.load(key)
|
|
if res is None or isinstance(res, str):
|
|
return res
|
|
else:
|
|
raise GCacheException(f"GCache: expect str, get {type(res)}")
|
|
|
|
@staticmethod
|
|
def protocol_register(proto, sub_class):
|
|
"""Register one new protocol => indicate one execution module
|
|
|
|
Register a new protocol => This function is called by the executing
|
|
module to register its own implementation of Producer for the executing
|
|
module to take effect.
|
|
(Usually when the execution module is implemented according to the
|
|
specification, there is no need for the developer to call this method
|
|
manually, the abstraction layer will dynamically import)
|
|
|
|
Args:
|
|
proto(str): Protocol identification
|
|
sub_class: Implementation class of Producer
|
|
|
|
Returns:
|
|
|
|
Examples:
|
|
>>> GCache.protocol_register('redis', RedisGCache)
|
|
|
|
"""
|
|
if proto in GCache.proto_dict:
|
|
err = GCacheProtoAlreadyExistsException(
|
|
f"Proto '{proto}' already exists in Cmg-base-GCache."
|
|
)
|
|
logger.error(err)
|
|
raise err
|
|
GCache.proto_dict[proto] = sub_class
|
|
logger.info(
|
|
f"Gcache-base-GCache register proto '{proto}' success"
|
|
)
|
|
|
|
|
|
def dispatch_g_cache(cache_name: str, url: str, **kwargs) -> GCache:
|
|
"""Construct one GCache instance according the url
|
|
|
|
Construct a GCache instance of the corresponding type based on
|
|
the URL passed in.
|
|
|
|
Args:
|
|
cache_name:
|
|
url(str): GCacheUrl
|
|
|
|
Returns:
|
|
GCache: one GCache instance
|
|
|
|
Examples:
|
|
>>> g_cache = dispatch_g_cache(
|
|
"cache_name",
|
|
..."redis://localhost:6379?password=123456")
|
|
"""
|
|
cmg_url = GCacheUrl.parse(url)
|
|
with GCache.proto_lock:
|
|
if cmg_url.proto not in GCache.proto_dict:
|
|
# Check if dynamic import is possible
|
|
target_module = f"gcache_{cmg_url.proto}.{cmg_url.proto}_gcache"
|
|
try:
|
|
module = importlib.import_module(target_module)
|
|
GCache.protocol_register(
|
|
cmg_url.proto,
|
|
getattr(module, f'{cmg_url.proto.capitalize()}GCache')
|
|
)
|
|
except ModuleNotFoundError as exc:
|
|
logger.error(
|
|
f"Try to auto import module {target_module} failed."
|
|
)
|
|
raise GCacheProtoNotExistsException(
|
|
f"Proto '{cmg_url.proto}' not exists in GCache-base"
|
|
"-GCache."
|
|
) from exc
|
|
g_cache_instance = GCache.proto_dict[cmg_url.proto](cache_name,
|
|
cmg_url, **kwargs)
|
|
logger.info(
|
|
f"GCache-base-GCache dispatch one GCache instance "
|
|
f"success. proto={cmg_url.proto}, url={url}"
|
|
)
|
|
return g_cache_instance
|