sysom1/environment/1_sdk/gcache_base/gcache.py

177 lines
5.2 KiB
Python

# -*- coding: utf-8 -*- #
"""
Time 2023/4/28 14:47
Author: mingfeng (SunnyQjm)
Email mfeng@linux.alibaba.com
File gcache.py
Description:
"""
import importlib
from abc import ABCMeta, abstractmethod
from typing import Union, Optional, Dict
from threading import Lock
from clogger import logger
from .exceptions import GCacheProtoAlreadyExistsException, \
GCacheProtoNotExistsException, GCacheException
from .url import GCacheUrl
class GCache(metaclass=ABCMeta):
"""
A generic cache used to caching data
"""
proto_dict = {}
proto_lock = Lock()
@abstractmethod
def store(self, key: str, value: Union[int, float, dict, str],
expire: int = -1) -> bool:
"""Store something to gcache
Args:
key: key
value: Value, can be one of [int, float, dict, str]
expire: Expire time, the stored data will expire after A seconds
- -1 means never expires
Returns:
"""
pass
@abstractmethod
def load(self, key: str) -> Union[None, int, float, dict, str]:
pass
@abstractmethod
def load_all(self) -> Dict[str, Union[int, float, dict, str]]:
pass
@abstractmethod
def delete(self, key: str) -> bool:
"""
Delete specific key
Args:
key:
Returns:
"""
pass
@abstractmethod
def clean(self):
"""
Clean all cache
Returns:
"""
pass
def load_int(self, key: str) -> Optional[int]:
res = self.load(key)
if res is None or isinstance(res, int):
return res
else:
raise GCacheException(f"GCache: expect int, get {type(res)}")
def load_float(self, key: str) -> Optional[float]:
res = self.load(key)
if res is None or isinstance(res, float):
return res
else:
raise GCacheException(f"GCache: expect float, get {type(res)}")
def load_dict(self, key: str) -> Optional[dict]:
res = self.load(key)
if res is None or isinstance(res, dict):
return res
else:
raise GCacheException(f"GCache: expect dict, get {type(res)}")
def load_str(self, key: str) -> Optional[str]:
res = self.load(key)
if res is None or isinstance(res, str):
return res
else:
raise GCacheException(f"GCache: expect str, get {type(res)}")
@staticmethod
def protocol_register(proto, sub_class):
"""Register one new protocol => indicate one execution module
Register a new protocol => This function is called by the executing
module to register its own implementation of Producer for the executing
module to take effect.
(Usually when the execution module is implemented according to the
specification, there is no need for the developer to call this method
manually, the abstraction layer will dynamically import)
Args:
proto(str): Protocol identification
sub_class: Implementation class of Producer
Returns:
Examples:
>>> GCache.protocol_register('redis', RedisGCache)
"""
if proto in GCache.proto_dict:
err = GCacheProtoAlreadyExistsException(
f"Proto '{proto}' already exists in Cmg-base-GCache."
)
logger.error(err)
raise err
GCache.proto_dict[proto] = sub_class
logger.info(
f"Gcache-base-GCache register proto '{proto}' success"
)
def dispatch_g_cache(cache_name: str, url: str, **kwargs) -> GCache:
"""Construct one GCache instance according the url
Construct a GCache instance of the corresponding type based on
the URL passed in.
Args:
cache_name:
url(str): GCacheUrl
Returns:
GCache: one GCache instance
Examples:
>>> g_cache = dispatch_g_cache(
"cache_name",
..."redis://localhost:6379?password=123456")
"""
cmg_url = GCacheUrl.parse(url)
with GCache.proto_lock:
if cmg_url.proto not in GCache.proto_dict:
# Check if dynamic import is possible
target_module = f"gcache_{cmg_url.proto}.{cmg_url.proto}_gcache"
try:
module = importlib.import_module(target_module)
GCache.protocol_register(
cmg_url.proto,
getattr(module, f'{cmg_url.proto.capitalize()}GCache')
)
except ModuleNotFoundError as exc:
logger.error(
f"Try to auto import module {target_module} failed."
)
raise GCacheProtoNotExistsException(
f"Proto '{cmg_url.proto}' not exists in GCache-base"
"-GCache."
) from exc
g_cache_instance = GCache.proto_dict[cmg_url.proto](cache_name,
cmg_url, **kwargs)
logger.info(
f"GCache-base-GCache dispatch one GCache instance "
f"success. proto={cmg_url.proto}, url={url}"
)
return g_cache_instance