mirror of https://gitee.com/anolis/sysom.git
117 lines
3.4 KiB
Python
117 lines
3.4 KiB
Python
# -*- coding: utf-8 -*- #
|
|
"""
|
|
Time 2022/7/29 13:33
|
|
Author: mingfeng (SunnyQjm)
|
|
Email mfeng@linux.alibaba.com
|
|
File common.py
|
|
Description:
|
|
"""
|
|
from typing import Optional
|
|
from clogger import logger
|
|
from redis import Redis
|
|
from cmg_base.url import CmgUrl
|
|
from .utils import do_connect_by_cmg_url
|
|
|
|
|
|
class StaticConst:
|
|
"""Static consts
|
|
|
|
This class defines all the static constant values in the cmg-redis module
|
|
"""
|
|
CMG_REDIS_PREFIX = "CMG-REDIS:"
|
|
CMG_REDIS_SERVICES = f"{CMG_REDIS_PREFIX}SERVICES"
|
|
CMG_REDIS_SERVICE_PREFIX = f"{CMG_REDIS_PREFIX}SERVICE:"
|
|
CMG_REDIS_INSTANCE_PREFIX = f"{CMG_REDIS_PREFIX}INSTANCE:"
|
|
|
|
CMG_REDIS_SERVICES_LOCKER = f"{CMG_REDIS_PREFIX}LOCKER:SERVICES"
|
|
|
|
# List of specialization parameters
|
|
REDIS_SPECIAL_PARM_CMG_ENABLE_SELF_HEALTH_CHECK = \
|
|
"cmg_enable_auto_health_check"
|
|
|
|
_redis_special_parameter_list = [
|
|
REDIS_SPECIAL_PARM_CMG_ENABLE_SELF_HEALTH_CHECK
|
|
]
|
|
|
|
_redis_special_parameters_default_value = {
|
|
REDIS_SPECIAL_PARM_CMG_ENABLE_SELF_HEALTH_CHECK: (bool, True)
|
|
}
|
|
|
|
@staticmethod
|
|
def parse_special_parameter(params: dict) -> dict:
|
|
"""Parse specialization parameters
|
|
|
|
Parse the specialization parameters and remove the specialization
|
|
parameters from the parameter list
|
|
|
|
Args:
|
|
params(dict): CecUrl.params
|
|
|
|
Returns:
|
|
|
|
"""
|
|
res = {}
|
|
for key in StaticConst._redis_special_parameter_list:
|
|
_type, default = \
|
|
StaticConst._redis_special_parameters_default_value[key]
|
|
res[key] = _type(params.pop(key, default))
|
|
return res
|
|
|
|
@classmethod
|
|
def get_inner_service_name(cls, service_name: str):
|
|
return f"{cls.CMG_REDIS_SERVICE_PREFIX}{service_name}"
|
|
|
|
@classmethod
|
|
def get_origin_service_name(cls, inner_service_name: str):
|
|
return inner_service_name.replace(cls.CMG_REDIS_SERVICE_PREFIX, "")
|
|
|
|
@classmethod
|
|
def get_inner_service_id(cls, service_id: str):
|
|
return f"{cls.CMG_REDIS_INSTANCE_PREFIX}{service_id}"
|
|
|
|
@classmethod
|
|
def get_origin_service_id(cls, inner_service_id: str):
|
|
return inner_service_id.replace(cls.CMG_REDIS_INSTANCE_PREFIX, "")
|
|
|
|
|
|
class ClientBase:
|
|
"""
|
|
cec-redis client base class, Redis* requires inherit from this class,
|
|
which provides some generic implementation
|
|
"""
|
|
|
|
def __init__(self, url: CmgUrl):
|
|
self._redis_version = None
|
|
self._special_params = StaticConst.parse_special_parameter(url.params)
|
|
self.redis_client: Optional[Redis] = None
|
|
self.current_url = ""
|
|
self.connect_by_cmg_url(url)
|
|
|
|
def get_special_param(self, key: str, default=''):
|
|
"""Get specialization parameter by key
|
|
|
|
Args:
|
|
key(str): specialization parameter key
|
|
default(Any): default value if key not exists
|
|
|
|
Returns:
|
|
|
|
"""
|
|
return self._special_params.get(key, default)
|
|
|
|
def connect_by_cmg_url(self, url: CmgUrl):
|
|
"""Connect to redis server by CmgUrl
|
|
|
|
Connecting to the Redis server via CmgUrl
|
|
|
|
Args:
|
|
url(str): CmgUrl
|
|
"""
|
|
logger.debug(
|
|
f"{self} try to connect to '{url}'.")
|
|
self.redis_client = do_connect_by_cmg_url(url)
|
|
self.current_url = str(url)
|
|
logger.info(
|
|
f"{self} connect to '{url}' successfully.")
|
|
return self
|