sysom1/environment/1_sdk/cmg_base/service_discovery.py

139 lines
4.6 KiB
Python

# -*- coding: utf-8 -*- #
"""
Time 2023/3/17 19:57
Author: mingfeng (SunnyQjm)
Email mfeng@linux.alibaba.com
File service_discovery.py
Description:
"""
import importlib
from threading import Lock
from abc import ABCMeta, abstractmethod
from clogger import logger
from typing import List
from .service_instance import ServiceInstance
from .exceptions import CmgProtoAlreadyExistsException, \
CmgProtoNotExistsException
from .url import CmgUrl
from .load_balancing_strategy import LoadBalancingStrategy
class ServiceDiscovery(metaclass=ABCMeta):
"""Common Microservice Governance ServiceDiscovery interface definition
This interface defines the generic behavior of the CMG ServiceDiscovery.
"""
proto_dict = {}
proto_lock = Lock()
def __init__(self, fetch_interval: int):
self.fetch_interval = fetch_interval
@abstractmethod
def get_services(self) -> List[str]:
"""Get services from register center"""
pass
@abstractmethod
def get_instance_count(self, service_name: str) -> int:
"""Get instances count of specific service"""
pass
@abstractmethod
def get_instances(self, service_id: str, force: bool = False) \
-> List[ServiceInstance]:
"""Get instances of specific service from register center"""
pass
@abstractmethod
def get_instance(self, service_name: str,
strategy: LoadBalancingStrategy,
force: bool = False) -> ServiceInstance:
"""Get one instance of specific service from register center according
passed load balancing strategy
"""
pass
@staticmethod
def protocol_register(proto, sub_class):
"""Register one new protocol => indicate one execution module
Register a new protocol => This function is called by the executing
module to register its own implementation of Producer for the executing
module to take effect.
(Usually when the execution module is implemented according to the
specification, there is no need for the developer to call this method
manually, the abstraction layer will dynamically import)
Args:
proto(str): Protocol identification
sub_class: Implementation class of Producer
Returns:
Examples:
>>> ServiceDiscovery.protocol_register('redis',
...RedisServiceDiscovery)
"""
if proto in ServiceDiscovery.proto_dict:
err = CmgProtoAlreadyExistsException(
f"Proto '{proto}' already exists in Cmg-base-ServiceDiscovery."
)
logger.error(err)
raise err
ServiceDiscovery.proto_dict[proto] = sub_class
logger.info(
f"Cmg-base-ServiceDiscovery register proto '{proto}' success"
)
def dispatch_service_discovery(url: str, **kwargs) -> ServiceDiscovery:
"""Construct one ServiceDiscovery instance according the url
Construct a ServiceDiscovery instance of the corresponding type based on
the URL passed in.
Args:
url(str): CmgUrl
Keyword Args:
fetch_interval(int): The interval between pulling the instance list
of a specific service
Returns:
ServiceDiscovery: one ServiceDiscovery instance
Examples:
>>> service_discovery = dispatch_service_discovery(
..."redis://localhost:6379?password=123456")
"""
cmg_url = CmgUrl.parse(url)
with ServiceDiscovery.proto_lock:
if cmg_url.proto not in ServiceDiscovery.proto_dict:
# Check if dynamic import is possible
target_module = f"cmg_{cmg_url.proto}.{cmg_url.proto}_service_discovery"
try:
module = importlib.import_module(target_module)
ServiceDiscovery.protocol_register(
cmg_url.proto,
getattr(module,
f'{cmg_url.proto.capitalize()}ServiceDiscovery')
)
except ModuleNotFoundError as exc:
logger.error(
f"Try to auto import module {target_module} failed."
)
raise CmgProtoNotExistsException(
f"Proto '{cmg_url.proto}' not exists in Cmg-base"
"-ServiceDiscovery."
) from exc
service_registry_instance = ServiceDiscovery.proto_dict[cmg_url.proto](
cmg_url, **kwargs)
logger.info(
f"Cmg-base-ServiceDiscovery dispatch one ServiceDiscovery instance "
f"success. proto={cmg_url.proto}, url={url}"
)
return service_registry_instance