mirror of https://gitee.com/anolis/sysom.git
113 lines
4.4 KiB
Python
113 lines
4.4 KiB
Python
# -*- coding: utf-8 -*- #
|
|
"""
|
|
Time 2023/3/17 20:27
|
|
Author: mingfeng (SunnyQjm)
|
|
Email mfeng@linux.alibaba.com
|
|
File redis_service_registry.py
|
|
Description:
|
|
"""
|
|
import functools
|
|
import json
|
|
|
|
from clogger import logger
|
|
from cmg_base import ServiceRegistry, ServiceInstance, CmgUrl, CmgException, \
|
|
ServiceCheck, HealthState
|
|
from redis_lua import XRedisHashTable
|
|
from .common import ClientBase, StaticConst
|
|
from .utils import RedisLocker
|
|
|
|
|
|
class RedisServiceRegistry(ServiceRegistry, ClientBase):
|
|
"""A redis-based execution module implement of ServiceRegistry
|
|
"""
|
|
|
|
def __init__(self, url: CmgUrl):
|
|
ServiceRegistry.__init__(self)
|
|
ClientBase.__init__(self, url)
|
|
|
|
# Handles Redis implementation of event-centric specialization
|
|
# parameters
|
|
self._self_health_check = self.get_special_param(
|
|
StaticConst.REDIS_SPECIAL_PARM_CMG_ENABLE_SELF_HEALTH_CHECK, True
|
|
)
|
|
|
|
# 1. Connect to the Redis server
|
|
self._current_state = HealthState.OFFLINE
|
|
|
|
self._x_redis_hash_table = XRedisHashTable(self.redis_client)
|
|
|
|
def register(self, service_instance: ServiceInstance):
|
|
"""Register one service to redis-based register center"""
|
|
|
|
def _on_health_check(service_id: str, expire_time: int, state: HealthState):
|
|
if state == HealthState.ONLINE:
|
|
self._x_redis_hash_table.hexpire(inner_service_name,
|
|
service_id, expire=expire_time)
|
|
self._x_redis_hash_table.hexpire(
|
|
StaticConst.CMG_REDIS_SERVICES,
|
|
inner_service_name, expire=expire_time)
|
|
|
|
def _do_unregister(inner_service_name: str, inner_service_id: str):
|
|
try:
|
|
self.unregister(inner_service_name, inner_service_id)
|
|
except Exception as exc:
|
|
print(exc)
|
|
|
|
inner_service_name = StaticConst.get_inner_service_name(
|
|
service_instance.service_name
|
|
)
|
|
inner_service_id = StaticConst.get_inner_service_id(
|
|
service_instance.service_id
|
|
)
|
|
with RedisLocker(self.redis_client,
|
|
StaticConst.CMG_REDIS_SERVICES_LOCKER) as res:
|
|
if not res:
|
|
raise CmgException("Get locker failed")
|
|
expire_time = int(
|
|
service_instance.check.get("deregister", 20) * 1.2)
|
|
self._x_redis_hash_table.hset(
|
|
inner_service_name, inner_service_id,
|
|
json.dumps(service_instance.to_redis_mapping()),
|
|
expire=expire_time
|
|
)
|
|
self._x_redis_hash_table.hset(
|
|
StaticConst.CMG_REDIS_SERVICES, inner_service_name, "",
|
|
expire=expire_time
|
|
)
|
|
|
|
# 2. Deal health check
|
|
if self._self_health_check:
|
|
ServiceCheck.create_health_check_instance(
|
|
service_instance.check,
|
|
on_check=functools.partial(_on_health_check,
|
|
inner_service_id, expire_time),
|
|
do_unregister=functools.partial(
|
|
_do_unregister, inner_service_name, inner_service_id
|
|
)
|
|
).start()
|
|
|
|
def unregister(self, service_name: str, service_id: str):
|
|
"""Unregister one service from redis-based register center"""
|
|
# 1. Get service instance
|
|
inner_service_id = StaticConst.get_inner_service_id(
|
|
service_id
|
|
)
|
|
inner_service_name = StaticConst.get_inner_service_name(
|
|
service_name
|
|
)
|
|
service_instance_str = self._x_redis_hash_table.hget(
|
|
inner_service_name, inner_service_id)
|
|
if service_instance_str is None:
|
|
raise CmgException(
|
|
f"Service(service_id={service_id}) not exists")
|
|
with RedisLocker(self.redis_client,
|
|
StaticConst.CMG_REDIS_SERVICES_LOCKER) as res:
|
|
if not res:
|
|
raise CmgException("Get locker failed")
|
|
|
|
# 3. Delete service instance
|
|
self._x_redis_hash_table.hdel(inner_service_name, inner_service_id)
|
|
if self._x_redis_hash_table.hlen(inner_service_name) <= 0:
|
|
self._x_redis_hash_table.hdel(StaticConst.CMG_REDIS_SERVICES,
|
|
inner_service_name)
|