sysom1/environment/1_sdk/cmg_redis/redis_service_registry.py

113 lines
4.4 KiB
Python

# -*- coding: utf-8 -*- #
"""
Time 2023/3/17 20:27
Author: mingfeng (SunnyQjm)
Email mfeng@linux.alibaba.com
File redis_service_registry.py
Description:
"""
import functools
import json
from clogger import logger
from cmg_base import ServiceRegistry, ServiceInstance, CmgUrl, CmgException, \
ServiceCheck, HealthState
from redis_lua import XRedisHashTable
from .common import ClientBase, StaticConst
from .utils import RedisLocker
class RedisServiceRegistry(ServiceRegistry, ClientBase):
"""A redis-based execution module implement of ServiceRegistry
"""
def __init__(self, url: CmgUrl):
ServiceRegistry.__init__(self)
ClientBase.__init__(self, url)
# Handles Redis implementation of event-centric specialization
# parameters
self._self_health_check = self.get_special_param(
StaticConst.REDIS_SPECIAL_PARM_CMG_ENABLE_SELF_HEALTH_CHECK, True
)
# 1. Connect to the Redis server
self._current_state = HealthState.OFFLINE
self._x_redis_hash_table = XRedisHashTable(self.redis_client)
def register(self, service_instance: ServiceInstance):
"""Register one service to redis-based register center"""
def _on_health_check(service_id: str, expire_time: int, state: HealthState):
if state == HealthState.ONLINE:
self._x_redis_hash_table.hexpire(inner_service_name,
service_id, expire=expire_time)
self._x_redis_hash_table.hexpire(
StaticConst.CMG_REDIS_SERVICES,
inner_service_name, expire=expire_time)
def _do_unregister(inner_service_name: str, inner_service_id: str):
try:
self.unregister(inner_service_name, inner_service_id)
except Exception as exc:
print(exc)
inner_service_name = StaticConst.get_inner_service_name(
service_instance.service_name
)
inner_service_id = StaticConst.get_inner_service_id(
service_instance.service_id
)
with RedisLocker(self.redis_client,
StaticConst.CMG_REDIS_SERVICES_LOCKER) as res:
if not res:
raise CmgException("Get locker failed")
expire_time = int(
service_instance.check.get("deregister", 20) * 1.2)
self._x_redis_hash_table.hset(
inner_service_name, inner_service_id,
json.dumps(service_instance.to_redis_mapping()),
expire=expire_time
)
self._x_redis_hash_table.hset(
StaticConst.CMG_REDIS_SERVICES, inner_service_name, "",
expire=expire_time
)
# 2. Deal health check
if self._self_health_check:
ServiceCheck.create_health_check_instance(
service_instance.check,
on_check=functools.partial(_on_health_check,
inner_service_id, expire_time),
do_unregister=functools.partial(
_do_unregister, inner_service_name, inner_service_id
)
).start()
def unregister(self, service_name: str, service_id: str):
"""Unregister one service from redis-based register center"""
# 1. Get service instance
inner_service_id = StaticConst.get_inner_service_id(
service_id
)
inner_service_name = StaticConst.get_inner_service_name(
service_name
)
service_instance_str = self._x_redis_hash_table.hget(
inner_service_name, inner_service_id)
if service_instance_str is None:
raise CmgException(
f"Service(service_id={service_id}) not exists")
with RedisLocker(self.redis_client,
StaticConst.CMG_REDIS_SERVICES_LOCKER) as res:
if not res:
raise CmgException("Get locker failed")
# 3. Delete service instance
self._x_redis_hash_table.hdel(inner_service_name, inner_service_id)
if self._x_redis_hash_table.hlen(inner_service_name) <= 0:
self._x_redis_hash_table.hdel(StaticConst.CMG_REDIS_SERVICES,
inner_service_name)