mirror of https://gitee.com/anolis/sysom.git
284 lines
8.7 KiB
Python
284 lines
8.7 KiB
Python
# -*- coding: utf-8 -*- #
|
|
"""
|
|
Time 2023/05/08 14:13
|
|
Author: mingfeng (SunnyQjm)
|
|
Email mfeng@linux.alibaba.com
|
|
File framework.py
|
|
Description:
|
|
"""
|
|
import uuid
|
|
import time
|
|
from typing import Optional, Dict, Type
|
|
from gcache_base import dispatch_g_cache, GCache
|
|
from channel_job.job import default_channel_job_executor
|
|
from gclient_base import dispatch_g_client, GClient
|
|
from cec_base import (
|
|
dispatch_producer,
|
|
Producer,
|
|
dispatch_consumer,
|
|
Consumer,
|
|
dispatch_admin,
|
|
Admin,
|
|
)
|
|
from clogger import logger
|
|
from .config_parser import ConfigParser, CecTarget
|
|
from .framework_plug_mag import FrameworkPlugMag, FrameworkPluginBase
|
|
|
|
|
|
class SysomFrameworkException(Exception):
|
|
pass
|
|
|
|
|
|
NOT_INIT_ERROR_TIP = "SysomFramework not init, please call SysomFramework.init first"
|
|
|
|
|
|
class SysomFramework:
|
|
_config: Optional[ConfigParser] = None
|
|
_gcache_map: Dict[str, GCache] = {}
|
|
_framework_plug_mag: Optional[FrameworkPlugMag] = None
|
|
_alarm_producer: Optional[Producer] = None
|
|
|
|
@classmethod
|
|
def init(cls, config: ConfigParser):
|
|
cls._config = config
|
|
cls._framework_plug_mag = FrameworkPlugMag(config)
|
|
cls.init_logger(config)
|
|
cls.catch_kill_sig_then_kill_all_child()
|
|
return cls
|
|
|
|
@classmethod
|
|
def catch_kill_sig_then_kill_all_child(cls):
|
|
"""Catch kill signal and kill all child process
|
|
"""
|
|
def signal_handler(signum, frame):
|
|
parent = psutil.Process(pid)
|
|
for child in parent.children(recursive=True): # 获取所有子进程
|
|
child.kill()
|
|
child.wait()
|
|
# 主动退出当前进程
|
|
os._exit(1)
|
|
import signal
|
|
import os
|
|
import psutil
|
|
|
|
pid = os.getpid()
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
signal.signal(signal.SIGHUP, signal_handler)
|
|
signal.signal(signal.SIGQUIT, signal_handler)
|
|
signal.signal(signal.SIGABRT, signal_handler)
|
|
|
|
@classmethod
|
|
def init_logger(cls, config: ConfigParser):
|
|
"""Init clogger
|
|
|
|
Args:
|
|
config (ConfigParser): Yaml config object
|
|
"""
|
|
log_format = config.get_server_config().logger.format
|
|
log_level = config.get_server_config().logger.level
|
|
logger.set_format(log_format)
|
|
logger.set_level(log_level)
|
|
|
|
@classmethod
|
|
def gcache(cls, cache_name: str) -> GCache:
|
|
"""Create one GCache instance
|
|
|
|
Args:
|
|
cache_name (str): Cache name
|
|
|
|
Raises:
|
|
SysomFrameworkException: Raise if SysomFramework is not initialized
|
|
|
|
Returns:
|
|
GCache: GCache instance
|
|
"""
|
|
if cls._config is None:
|
|
raise SysomFrameworkException(NOT_INIT_ERROR_TIP)
|
|
if cache_name in cls._gcache_map:
|
|
return cls._gcache_map[cache_name]
|
|
else:
|
|
new_gcache = dispatch_g_cache(cache_name, cls._config.get_gcache_url())
|
|
cls._gcache_map[cache_name] = new_gcache
|
|
return new_gcache
|
|
|
|
@classmethod
|
|
def gclient(cls, service_name: str = "") -> GClient:
|
|
"""Create one GClient instance
|
|
|
|
Args:
|
|
service_name (str, optional): Service name want to communicate
|
|
|
|
Raises:
|
|
SysomFrameworkException: Raise if SysomFramework is not initialized
|
|
|
|
Returns:
|
|
GClient: GClient instance
|
|
"""
|
|
if cls._config is None:
|
|
raise SysomFrameworkException(NOT_INIT_ERROR_TIP)
|
|
return dispatch_g_client(cls._config.get_gclient_url(service_name))
|
|
|
|
@classmethod
|
|
def cec_producer(cls) -> Producer:
|
|
"""Create one CEC Producer instance
|
|
|
|
Raises:
|
|
SysomFrameworkException: Raise if SysomFramework is not initialized
|
|
|
|
Returns:
|
|
Producer: CEC Producer instance
|
|
"""
|
|
if cls._config is None:
|
|
raise SysomFrameworkException(NOT_INIT_ERROR_TIP)
|
|
return dispatch_producer(cls._config.get_cec_url(CecTarget.PRODUCER))
|
|
|
|
@classmethod
|
|
def cec_consumer(
|
|
cls,
|
|
topic_name: str,
|
|
consumer_id: str = "",
|
|
group_id: str = "",
|
|
start_from_now: bool = True,
|
|
**kwargs,
|
|
) -> Consumer:
|
|
"""Create one CEC Consumer instance
|
|
|
|
Args: @see See cec_base.Consumer
|
|
|
|
Raises:
|
|
SysomFrameworkException: Raise if SysomFramework is not initialized
|
|
|
|
Returns:
|
|
Consumer: CEC Consumer instance
|
|
"""
|
|
if cls._config is None:
|
|
raise SysomFrameworkException(NOT_INIT_ERROR_TIP)
|
|
return dispatch_consumer(
|
|
cls._config.get_cec_url(CecTarget.CONSUMER),
|
|
topic_name=topic_name,
|
|
consumer_id=consumer_id,
|
|
group_id=group_id,
|
|
start_from_now=start_from_now,
|
|
**kwargs,
|
|
)
|
|
|
|
@classmethod
|
|
def cec_admin(cls) -> Admin:
|
|
"""Create one CEC Admin instance
|
|
|
|
Raises:
|
|
SysomFrameworkException: Raise if SysomFramework is not initialized
|
|
|
|
Returns:
|
|
Admin: CEC Admin instance
|
|
"""
|
|
if cls._config is None:
|
|
raise SysomFrameworkException(NOT_INIT_ERROR_TIP)
|
|
return dispatch_admin(cls._config.get_cec_url(CecTarget.ADMIN))
|
|
|
|
@classmethod
|
|
def load_plugin(cls, plugin: FrameworkPluginBase):
|
|
if cls._framework_plug_mag is None:
|
|
raise SysomFrameworkException(NOT_INIT_ERROR_TIP)
|
|
cls._framework_plug_mag.load_plugin(plugin)
|
|
return cls
|
|
|
|
@classmethod
|
|
def load_plugin_cls(cls, plugin_cls: Type[FrameworkPluginBase]):
|
|
if cls._framework_plug_mag is None:
|
|
raise SysomFrameworkException(NOT_INIT_ERROR_TIP)
|
|
cls._framework_plug_mag.load_plugin_cls(plugin_cls)
|
|
return cls
|
|
|
|
@classmethod
|
|
def enable_channel_job(cls):
|
|
if cls._config is None:
|
|
raise SysomFrameworkException(NOT_INIT_ERROR_TIP)
|
|
cls._config.get_cmg_url()
|
|
default_channel_job_executor.init_config(
|
|
cls._config.get_local_channel_job_url(), cls.gclient("sysom_channel")
|
|
)
|
|
default_channel_job_executor.start()
|
|
return cls
|
|
|
|
################################################################################
|
|
# Alarm
|
|
################################################################################
|
|
@classmethod
|
|
def _get_alarm_producer(cls):
|
|
if cls._alarm_producer is None:
|
|
cls._alarm_producer = cls.cec_producer()
|
|
return cls._alarm_producer
|
|
|
|
@classmethod
|
|
def alarm(cls, alert_data):
|
|
"""Dispatch one SAD alert data to event center
|
|
|
|
Args:
|
|
alert_data (_type_): _description_
|
|
"""
|
|
cls._get_alarm_producer().produce("SYSOM_SAD_ALERT", alert_data)
|
|
|
|
@classmethod
|
|
def alarm_application(
|
|
cls,
|
|
summary_message: str,
|
|
annotations: dict = {},
|
|
labels: dict = {},
|
|
alert_item: Optional[str] = None,
|
|
level: str = "WARNING",
|
|
):
|
|
if cls._config is None:
|
|
raise (Exception("Init SysomFramework first!"))
|
|
if level not in ["WARNING", "ERROR", "CRITICAL"]:
|
|
raise Exception(
|
|
f'Alarm level not suppport => {level}, required in ["WARNING", "ERROR", "CRITICAL"]'
|
|
)
|
|
if alert_item is None:
|
|
alert_item = f"{cls._config.get_service_config().service_name}_alert"
|
|
cls.alarm(
|
|
{
|
|
"alert_id": str(uuid.uuid4()),
|
|
"instance": cls._config.get_service_config().host,
|
|
"alert_item": alert_item,
|
|
"alert_category": "APPLICATION",
|
|
"alert_source_type": "sysom",
|
|
"alert_level": level,
|
|
"alert_time": int(round(time.time() * 1000)),
|
|
"status": "FIRING",
|
|
"labels": {
|
|
"service": cls._config.get_service_config().service_name,
|
|
**labels,
|
|
},
|
|
"annotations": {"summary": summary_message, **annotations},
|
|
}
|
|
)
|
|
|
|
@classmethod
|
|
def alarm_action(cls, action: str, action_data: dict):
|
|
"""Invoke one alarm action
|
|
|
|
Args:
|
|
action (str): _description_
|
|
action_data (dict): _description_
|
|
|
|
Returns:
|
|
_type_: _description_
|
|
"""
|
|
if action not in ["ADD_ANNOTATION", "ADD_OPT", "MERGE"]:
|
|
raise Exception(f"Not support alarm action: {action}")
|
|
cls._get_alarm_producer().produce(
|
|
"SYSOM_ALARM_ACTION", {"action": action, "data": action_data}
|
|
)
|
|
|
|
@classmethod
|
|
def start(cls):
|
|
cls._framework_plug_mag.start()
|
|
return cls
|
|
|
|
def join(cls, timeout: Optional[float] = None):
|
|
cls._framework_plug_mag.join()
|
|
return cls
|