mirror of https://gitee.com/anolis/sysom.git
139 lines
4.6 KiB
Python
139 lines
4.6 KiB
Python
# -*- coding: utf-8 -*- #
|
|
"""
|
|
Time 2023/3/17 19:57
|
|
Author: mingfeng (SunnyQjm)
|
|
Email mfeng@linux.alibaba.com
|
|
File service_discovery.py
|
|
Description:
|
|
"""
|
|
import importlib
|
|
from threading import Lock
|
|
from abc import ABCMeta, abstractmethod
|
|
from clogger import logger
|
|
from typing import List
|
|
from .service_instance import ServiceInstance
|
|
from .exceptions import CmgProtoAlreadyExistsException, \
|
|
CmgProtoNotExistsException
|
|
from .url import CmgUrl
|
|
from .load_balancing_strategy import LoadBalancingStrategy
|
|
|
|
|
|
class ServiceDiscovery(metaclass=ABCMeta):
|
|
"""Common Microservice Governance ServiceDiscovery interface definition
|
|
|
|
This interface defines the generic behavior of the CMG ServiceDiscovery.
|
|
|
|
"""
|
|
proto_dict = {}
|
|
proto_lock = Lock()
|
|
|
|
def __init__(self, fetch_interval: int):
|
|
self.fetch_interval = fetch_interval
|
|
|
|
@abstractmethod
|
|
def get_services(self) -> List[str]:
|
|
"""Get services from register center"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_instance_count(self, service_name: str) -> int:
|
|
"""Get instances count of specific service"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_instances(self, service_id: str, force: bool = False) \
|
|
-> List[ServiceInstance]:
|
|
"""Get instances of specific service from register center"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_instance(self, service_name: str,
|
|
strategy: LoadBalancingStrategy,
|
|
force: bool = False) -> ServiceInstance:
|
|
"""Get one instance of specific service from register center according
|
|
passed load balancing strategy
|
|
"""
|
|
pass
|
|
|
|
@staticmethod
|
|
def protocol_register(proto, sub_class):
|
|
"""Register one new protocol => indicate one execution module
|
|
|
|
Register a new protocol => This function is called by the executing
|
|
module to register its own implementation of Producer for the executing
|
|
module to take effect.
|
|
(Usually when the execution module is implemented according to the
|
|
specification, there is no need for the developer to call this method
|
|
manually, the abstraction layer will dynamically import)
|
|
|
|
Args:
|
|
proto(str): Protocol identification
|
|
sub_class: Implementation class of Producer
|
|
|
|
Returns:
|
|
|
|
Examples:
|
|
>>> ServiceDiscovery.protocol_register('redis',
|
|
...RedisServiceDiscovery)
|
|
|
|
"""
|
|
if proto in ServiceDiscovery.proto_dict:
|
|
err = CmgProtoAlreadyExistsException(
|
|
f"Proto '{proto}' already exists in Cmg-base-ServiceDiscovery."
|
|
)
|
|
logger.error(err)
|
|
raise err
|
|
ServiceDiscovery.proto_dict[proto] = sub_class
|
|
logger.info(
|
|
f"Cmg-base-ServiceDiscovery register proto '{proto}' success"
|
|
)
|
|
|
|
|
|
def dispatch_service_discovery(url: str, **kwargs) -> ServiceDiscovery:
|
|
"""Construct one ServiceDiscovery instance according the url
|
|
|
|
Construct a ServiceDiscovery instance of the corresponding type based on
|
|
the URL passed in.
|
|
|
|
Args:
|
|
url(str): CmgUrl
|
|
|
|
Keyword Args:
|
|
fetch_interval(int): The interval between pulling the instance list
|
|
of a specific service
|
|
|
|
Returns:
|
|
ServiceDiscovery: one ServiceDiscovery instance
|
|
|
|
Examples:
|
|
>>> service_discovery = dispatch_service_discovery(
|
|
..."redis://localhost:6379?password=123456")
|
|
"""
|
|
cmg_url = CmgUrl.parse(url)
|
|
with ServiceDiscovery.proto_lock:
|
|
if cmg_url.proto not in ServiceDiscovery.proto_dict:
|
|
# Check if dynamic import is possible
|
|
target_module = f"cmg_{cmg_url.proto}.{cmg_url.proto}_service_discovery"
|
|
try:
|
|
module = importlib.import_module(target_module)
|
|
ServiceDiscovery.protocol_register(
|
|
cmg_url.proto,
|
|
getattr(module,
|
|
f'{cmg_url.proto.capitalize()}ServiceDiscovery')
|
|
)
|
|
except ModuleNotFoundError as exc:
|
|
logger.error(
|
|
f"Try to auto import module {target_module} failed."
|
|
)
|
|
raise CmgProtoNotExistsException(
|
|
f"Proto '{cmg_url.proto}' not exists in Cmg-base"
|
|
"-ServiceDiscovery."
|
|
) from exc
|
|
service_registry_instance = ServiceDiscovery.proto_dict[cmg_url.proto](
|
|
cmg_url, **kwargs)
|
|
logger.info(
|
|
f"Cmg-base-ServiceDiscovery dispatch one ServiceDiscovery instance "
|
|
f"success. proto={cmg_url.proto}, url={url}"
|
|
)
|
|
return service_registry_instance
|