sysom1/environment/1_sdk/sysom_utils/framework.py

284 lines
8.7 KiB
Python

# -*- coding: utf-8 -*- #
"""
Time 2023/05/08 14:13
Author: mingfeng (SunnyQjm)
Email mfeng@linux.alibaba.com
File framework.py
Description:
"""
import uuid
import time
from typing import Optional, Dict, Type
from gcache_base import dispatch_g_cache, GCache
from channel_job.job import default_channel_job_executor
from gclient_base import dispatch_g_client, GClient
from cec_base import (
dispatch_producer,
Producer,
dispatch_consumer,
Consumer,
dispatch_admin,
Admin,
)
from clogger import logger
from .config_parser import ConfigParser, CecTarget
from .framework_plug_mag import FrameworkPlugMag, FrameworkPluginBase
class SysomFrameworkException(Exception):
pass
NOT_INIT_ERROR_TIP = "SysomFramework not init, please call SysomFramework.init first"
class SysomFramework:
_config: Optional[ConfigParser] = None
_gcache_map: Dict[str, GCache] = {}
_framework_plug_mag: Optional[FrameworkPlugMag] = None
_alarm_producer: Optional[Producer] = None
@classmethod
def init(cls, config: ConfigParser):
cls._config = config
cls._framework_plug_mag = FrameworkPlugMag(config)
cls.init_logger(config)
cls.catch_kill_sig_then_kill_all_child()
return cls
@classmethod
def catch_kill_sig_then_kill_all_child(cls):
"""Catch kill signal and kill all child process
"""
def signal_handler(signum, frame):
parent = psutil.Process(pid)
for child in parent.children(recursive=True): # 获取所有子进程
child.kill()
child.wait()
# 主动退出当前进程
os._exit(1)
import signal
import os
import psutil
pid = os.getpid()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGHUP, signal_handler)
signal.signal(signal.SIGQUIT, signal_handler)
signal.signal(signal.SIGABRT, signal_handler)
@classmethod
def init_logger(cls, config: ConfigParser):
"""Init clogger
Args:
config (ConfigParser): Yaml config object
"""
log_format = config.get_server_config().logger.format
log_level = config.get_server_config().logger.level
logger.set_format(log_format)
logger.set_level(log_level)
@classmethod
def gcache(cls, cache_name: str) -> GCache:
"""Create one GCache instance
Args:
cache_name (str): Cache name
Raises:
SysomFrameworkException: Raise if SysomFramework is not initialized
Returns:
GCache: GCache instance
"""
if cls._config is None:
raise SysomFrameworkException(NOT_INIT_ERROR_TIP)
if cache_name in cls._gcache_map:
return cls._gcache_map[cache_name]
else:
new_gcache = dispatch_g_cache(cache_name, cls._config.get_gcache_url())
cls._gcache_map[cache_name] = new_gcache
return new_gcache
@classmethod
def gclient(cls, service_name: str = "") -> GClient:
"""Create one GClient instance
Args:
service_name (str, optional): Service name want to communicate
Raises:
SysomFrameworkException: Raise if SysomFramework is not initialized
Returns:
GClient: GClient instance
"""
if cls._config is None:
raise SysomFrameworkException(NOT_INIT_ERROR_TIP)
return dispatch_g_client(cls._config.get_gclient_url(service_name))
@classmethod
def cec_producer(cls) -> Producer:
"""Create one CEC Producer instance
Raises:
SysomFrameworkException: Raise if SysomFramework is not initialized
Returns:
Producer: CEC Producer instance
"""
if cls._config is None:
raise SysomFrameworkException(NOT_INIT_ERROR_TIP)
return dispatch_producer(cls._config.get_cec_url(CecTarget.PRODUCER))
@classmethod
def cec_consumer(
cls,
topic_name: str,
consumer_id: str = "",
group_id: str = "",
start_from_now: bool = True,
**kwargs,
) -> Consumer:
"""Create one CEC Consumer instance
Args: @see See cec_base.Consumer
Raises:
SysomFrameworkException: Raise if SysomFramework is not initialized
Returns:
Consumer: CEC Consumer instance
"""
if cls._config is None:
raise SysomFrameworkException(NOT_INIT_ERROR_TIP)
return dispatch_consumer(
cls._config.get_cec_url(CecTarget.CONSUMER),
topic_name=topic_name,
consumer_id=consumer_id,
group_id=group_id,
start_from_now=start_from_now,
**kwargs,
)
@classmethod
def cec_admin(cls) -> Admin:
"""Create one CEC Admin instance
Raises:
SysomFrameworkException: Raise if SysomFramework is not initialized
Returns:
Admin: CEC Admin instance
"""
if cls._config is None:
raise SysomFrameworkException(NOT_INIT_ERROR_TIP)
return dispatch_admin(cls._config.get_cec_url(CecTarget.ADMIN))
@classmethod
def load_plugin(cls, plugin: FrameworkPluginBase):
if cls._framework_plug_mag is None:
raise SysomFrameworkException(NOT_INIT_ERROR_TIP)
cls._framework_plug_mag.load_plugin(plugin)
return cls
@classmethod
def load_plugin_cls(cls, plugin_cls: Type[FrameworkPluginBase]):
if cls._framework_plug_mag is None:
raise SysomFrameworkException(NOT_INIT_ERROR_TIP)
cls._framework_plug_mag.load_plugin_cls(plugin_cls)
return cls
@classmethod
def enable_channel_job(cls):
if cls._config is None:
raise SysomFrameworkException(NOT_INIT_ERROR_TIP)
cls._config.get_cmg_url()
default_channel_job_executor.init_config(
cls._config.get_local_channel_job_url(), cls.gclient("sysom_channel")
)
default_channel_job_executor.start()
return cls
################################################################################
# Alarm
################################################################################
@classmethod
def _get_alarm_producer(cls):
if cls._alarm_producer is None:
cls._alarm_producer = cls.cec_producer()
return cls._alarm_producer
@classmethod
def alarm(cls, alert_data):
"""Dispatch one SAD alert data to event center
Args:
alert_data (_type_): _description_
"""
cls._get_alarm_producer().produce("SYSOM_SAD_ALERT", alert_data)
@classmethod
def alarm_application(
cls,
summary_message: str,
annotations: dict = {},
labels: dict = {},
alert_item: Optional[str] = None,
level: str = "WARNING",
):
if cls._config is None:
raise (Exception("Init SysomFramework first!"))
if level not in ["WARNING", "ERROR", "CRITICAL"]:
raise Exception(
f'Alarm level not suppport => {level}, required in ["WARNING", "ERROR", "CRITICAL"]'
)
if alert_item is None:
alert_item = f"{cls._config.get_service_config().service_name}_alert"
cls.alarm(
{
"alert_id": str(uuid.uuid4()),
"instance": cls._config.get_service_config().host,
"alert_item": alert_item,
"alert_category": "APPLICATION",
"alert_source_type": "sysom",
"alert_level": level,
"alert_time": int(round(time.time() * 1000)),
"status": "FIRING",
"labels": {
"service": cls._config.get_service_config().service_name,
**labels,
},
"annotations": {"summary": summary_message, **annotations},
}
)
@classmethod
def alarm_action(cls, action: str, action_data: dict):
"""Invoke one alarm action
Args:
action (str): _description_
action_data (dict): _description_
Returns:
_type_: _description_
"""
if action not in ["ADD_ANNOTATION", "ADD_OPT", "MERGE"]:
raise Exception(f"Not support alarm action: {action}")
cls._get_alarm_producer().produce(
"SYSOM_ALARM_ACTION", {"action": action, "data": action_data}
)
@classmethod
def start(cls):
cls._framework_plug_mag.start()
return cls
def join(cls, timeout: Optional[float] = None):
cls._framework_plug_mag.join()
return cls