sysom1/environment/1_sdk/sysom_utils/framework_plugins.py

85 lines
3.0 KiB
Python

# -*- coding: utf-8 -*- #
"""
Time 2023/04/06 13:13
Author: mingfeng (SunnyQjm)
Email mfeng@linux.alibaba.com
File framework_plugins.py
Description:
"""
from typing import Optional
from urllib.parse import urljoin
from channel_job import ChannelJobExecutor
from cmg_base import dispatch_service_registry, ServiceRegistry, \
ServiceInstance
from gclient_base import dispatch_g_client
from .config_parser import ConfigParser
from .event_executor import PluginEventExecutor
from .framework_plug_mag import FrameworkPluginBase
class NodeDispatcherPlugin(FrameworkPluginBase):
"""Node Dispatcher Plugin
NDP automatically implements the function of node file down and
initialization by reading the configuration
Args:
FrameworkPluginBase (_type_): _description_
"""
def __init__(self, config: ConfigParser) -> None:
self._config = config
self._channel_job_executor = ChannelJobExecutor()
self._channel_job_executor.init_config(
self._config.get_local_channel_job_url(),
dispatch_g_client(self._config.get_gclient_url("sysom_channel"))
)
self._plugin_event_executor = PluginEventExecutor(
config, self._channel_job_executor
)
def start(self):
self._channel_job_executor.start()
self._plugin_event_executor.start()
def stop(self):
self._channel_job_executor.stop()
self._plugin_event_executor.stop()
def join(self, timeout: Optional[float] = None):
self._plugin_event_executor.join(timeout)
class CmgPlugin(FrameworkPluginBase):
def __init__(self, config: ConfigParser) -> None:
self._config = config
self._registry: ServiceRegistry = \
dispatch_service_registry(self._config.get_cmg_url())
self._service_instance: Optional[ServiceInstance] = None
service_config = self._config.get_service_config()
cmg_plugin_config = service_config.framework.cmg
if cmg_plugin_config.check.type == "http" and \
(not cmg_plugin_config.check.url or
not cmg_plugin_config.check.url.startswith("http")
):
cmg_plugin_config.check.url = urljoin(
f"{service_config.protocol}://{service_config.host}:{service_config.port}",
cmg_plugin_config.check.url
)
cmg_plugin_config.service_name = service_config.service_name
cmg_plugin_config.host = service_config.host
cmg_plugin_config.port = service_config.port
if cmg_plugin_config:
self._service_instance = ServiceInstance(**dict(cmg_plugin_config))
def start(self):
if self._service_instance:
self._registry.register(self._service_instance)
def stop(self):
if self._service_instance:
self._registry.unregister(self._service_instance.service_id)
def join(self, timeout: Optional[float] = None):
return super().join(timeout)