mirror of https://gitee.com/anolis/sysom.git
85 lines
3.0 KiB
Python
85 lines
3.0 KiB
Python
# -*- coding: utf-8 -*- #
|
|
"""
|
|
Time 2023/04/06 13:13
|
|
Author: mingfeng (SunnyQjm)
|
|
Email mfeng@linux.alibaba.com
|
|
File framework_plugins.py
|
|
Description:
|
|
"""
|
|
from typing import Optional
|
|
from urllib.parse import urljoin
|
|
from channel_job import ChannelJobExecutor
|
|
from cmg_base import dispatch_service_registry, ServiceRegistry, \
|
|
ServiceInstance
|
|
from gclient_base import dispatch_g_client
|
|
from .config_parser import ConfigParser
|
|
from .event_executor import PluginEventExecutor
|
|
from .framework_plug_mag import FrameworkPluginBase
|
|
|
|
|
|
class NodeDispatcherPlugin(FrameworkPluginBase):
|
|
"""Node Dispatcher Plugin
|
|
|
|
NDP automatically implements the function of node file down and
|
|
initialization by reading the configuration
|
|
|
|
Args:
|
|
FrameworkPluginBase (_type_): _description_
|
|
"""
|
|
|
|
def __init__(self, config: ConfigParser) -> None:
|
|
self._config = config
|
|
self._channel_job_executor = ChannelJobExecutor()
|
|
self._channel_job_executor.init_config(
|
|
self._config.get_local_channel_job_url(),
|
|
dispatch_g_client(self._config.get_gclient_url("sysom_channel"))
|
|
)
|
|
self._plugin_event_executor = PluginEventExecutor(
|
|
config, self._channel_job_executor
|
|
)
|
|
|
|
def start(self):
|
|
self._channel_job_executor.start()
|
|
self._plugin_event_executor.start()
|
|
|
|
def stop(self):
|
|
self._channel_job_executor.stop()
|
|
self._plugin_event_executor.stop()
|
|
|
|
def join(self, timeout: Optional[float] = None):
|
|
self._plugin_event_executor.join(timeout)
|
|
|
|
|
|
class CmgPlugin(FrameworkPluginBase):
|
|
def __init__(self, config: ConfigParser) -> None:
|
|
self._config = config
|
|
self._registry: ServiceRegistry = \
|
|
dispatch_service_registry(self._config.get_cmg_url())
|
|
self._service_instance: Optional[ServiceInstance] = None
|
|
service_config = self._config.get_service_config()
|
|
cmg_plugin_config = service_config.framework.cmg
|
|
if cmg_plugin_config.check.type == "http" and \
|
|
(not cmg_plugin_config.check.url or
|
|
not cmg_plugin_config.check.url.startswith("http")
|
|
):
|
|
cmg_plugin_config.check.url = urljoin(
|
|
f"{service_config.protocol}://{service_config.host}:{service_config.port}",
|
|
cmg_plugin_config.check.url
|
|
)
|
|
cmg_plugin_config.service_name = service_config.service_name
|
|
cmg_plugin_config.host = service_config.host
|
|
cmg_plugin_config.port = service_config.port
|
|
if cmg_plugin_config:
|
|
self._service_instance = ServiceInstance(**dict(cmg_plugin_config))
|
|
|
|
def start(self):
|
|
if self._service_instance:
|
|
self._registry.register(self._service_instance)
|
|
|
|
def stop(self):
|
|
if self._service_instance:
|
|
self._registry.unregister(self._service_instance.service_id)
|
|
|
|
def join(self, timeout: Optional[float] = None):
|
|
return super().join(timeout)
|