mirror of https://gitee.com/anolis/sysom.git
86 lines
3.2 KiB
Python
86 lines
3.2 KiB
Python
# -*- coding: utf-8 -*- #
|
|
"""
|
|
Time 2023/3/19 20:45
|
|
Author: mingfeng (SunnyQjm)
|
|
Email mfeng@linux.alibaba.com
|
|
File redis_service_discovery.py
|
|
Description:
|
|
"""
|
|
import json
|
|
import time
|
|
from typing import List, Dict
|
|
from cmg_base import ServiceDiscovery, ServiceInstance, CmgUrl, \
|
|
LoadBalancingStrategy, create_load_balancing_strategy, \
|
|
LoadBalancingStrategyBase
|
|
from redis_lua import XRedisHashTable
|
|
from .common import ClientBase, StaticConst
|
|
|
|
|
|
class RedisServiceDiscovery(ServiceDiscovery, ClientBase):
|
|
"""A redis-based execution module implement of ServiceDiscovery
|
|
"""
|
|
|
|
def __init__(self, url: CmgUrl, fetch_interval: int = 5):
|
|
ServiceDiscovery.__init__(self, fetch_interval)
|
|
ClientBase.__init__(self, url)
|
|
|
|
self._x_redis_hash_table = XRedisHashTable(self.redis_client)
|
|
|
|
# Handles Redis implementation of event-centric specialization
|
|
|
|
self._auto_fetch_thread = None
|
|
self._services = {
|
|
|
|
}
|
|
self._strategy_map: Dict[str, LoadBalancingStrategyBase] = {
|
|
|
|
}
|
|
self._last_fetch_time_map = {
|
|
|
|
}
|
|
|
|
def get_services(self) -> List[str]:
|
|
return [StaticConst.get_origin_service_name(item) for item in
|
|
self._x_redis_hash_table.hkeys(StaticConst.CMG_REDIS_SERVICES)]
|
|
|
|
def get_instance_count(self, service_name: str) -> int:
|
|
return self._x_redis_hash_table.hlen(
|
|
StaticConst.get_inner_service_name(service_name))
|
|
|
|
def _get_instances(self, service_name: str) -> List[ServiceInstance]:
|
|
inner_service_name = StaticConst.get_inner_service_name(service_name)
|
|
res = self._x_redis_hash_table.hgetall(inner_service_name)
|
|
return [
|
|
ServiceInstance.from_redis_mapping(json.loads(res[k])) for k in
|
|
res
|
|
]
|
|
|
|
def get_instances(self, service_name: str, force: bool = False) \
|
|
-> List[ServiceInstance]:
|
|
if force or (service_name not in self._services) or \
|
|
(service_name not in self._last_fetch_time_map) or \
|
|
(self._last_fetch_time_map[
|
|
service_name] + self.fetch_interval < time.time()):
|
|
# Perform fetch in the following two situations
|
|
# 1. First fetch instances for specific service_name
|
|
# 2. The last fetch time is more than *fetch_interval* seconds
|
|
# away.
|
|
instances = self._get_instances(service_name)
|
|
self._services[service_name] = instances
|
|
self._last_fetch_time_map[service_name] = time.time()
|
|
if service_name in self._strategy_map:
|
|
self._strategy_map[service_name].update(instances)
|
|
return self._services[service_name]
|
|
|
|
def get_instance(self, service_name: str, strategy: LoadBalancingStrategy,
|
|
force: bool = False) \
|
|
-> ServiceInstance:
|
|
if service_name not in self._strategy_map or \
|
|
self._strategy_map[service_name].type() != strategy:
|
|
self._strategy_map[service_name] = create_load_balancing_strategy(
|
|
strategy
|
|
)
|
|
# Update strategy if possible
|
|
self.get_instances(service_name, force)
|
|
return self._strategy_map[service_name].select()
|