mirror of https://gitee.com/anolis/sysom.git
119 lines
4.3 KiB
Python
119 lines
4.3 KiB
Python
# -*- coding: utf-8 -*- #
|
|
"""
|
|
Time 2022/10/11 16:13
|
|
Author: mingfeng (SunnyQjm)
|
|
Email mfeng@linux.alibaba.com
|
|
File executor.py
|
|
Description:
|
|
"""
|
|
from cec_base.event import Event
|
|
from cec_base.consumer import Consumer
|
|
from cec_base.producer import Producer
|
|
from cec_base.cec_client import MultiConsumer, CecAsyncConsumeTask
|
|
from clogger import logger
|
|
from conf.settings import *
|
|
from app.database import SessionLocal
|
|
from app.crud import update_or_create_alert_data, append_alert_annotations, merge_alert_data
|
|
from app.schemas import AlertDataCreate
|
|
from sysom_utils import CecTarget, SysomFramework
|
|
|
|
|
|
class AlarmListener(MultiConsumer):
|
|
"""A cec-based channel listener
|
|
|
|
A cec-based channel lilster, ssed to listen to requests for channels from
|
|
other modules and output the results to cec after performing the corresponding
|
|
operation on the target node
|
|
|
|
Args:
|
|
task_process_thread_num(str): The number of threads contained in the thread
|
|
pool used to execute the task
|
|
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__(
|
|
YAML_CONFIG.get_cec_url(CecTarget.PRODUCER),
|
|
custom_callback=self.on_receive_event,
|
|
)
|
|
self.append_group_consume_task(
|
|
CEC_TOPIC_SYSOM_SAD_ALERT,
|
|
"sysom_alarm",
|
|
Consumer.generate_consumer_id(),
|
|
ensure_topic_exist=True,
|
|
)
|
|
self.append_group_consume_task(
|
|
CEC_TOPIC_SYSOM_ALARM_ACTION,
|
|
"sysom_alarm",
|
|
Consumer.generate_consumer_id(),
|
|
ensure_topic_exist=True,
|
|
)
|
|
self._producer: Producer = SysomFramework.cec_producer()
|
|
|
|
def _delivery(self, topic: str, value: dict):
|
|
self._producer.produce(topic, value)
|
|
self._producer.flush()
|
|
|
|
def _deal_sad_alert(self, alert_data: dict):
|
|
"""Save SAD format alert data to db
|
|
|
|
Args:
|
|
alert_data (dict): _description_
|
|
"""
|
|
with SessionLocal() as db:
|
|
update_or_create_alert_data(db, AlertDataCreate(**alert_data))
|
|
|
|
def _deal_alarm_action(self, action: str, action_data: dict):
|
|
"""Perform alert action for specific alert_id
|
|
|
|
Args:
|
|
action (str): _description_
|
|
action_data (dict): _description_
|
|
"""
|
|
if action not in ["ADD_ANNOTATION", "ADD_OPT", "MERGE"]:
|
|
raise Exception(f"Not support alarm action: {action}")
|
|
with SessionLocal() as db:
|
|
if action == "ADD_ANNOTATION":
|
|
# Add annotations to specific alert data
|
|
alert_id = action_data["alert_id"]
|
|
annotations = action_data["annotations"]
|
|
append_alert_annotations(db, alert_id, annotations)
|
|
elif action == "ADD_OPT":
|
|
# Add opt to specific alert data
|
|
alert_id = action_data["alert_id"]
|
|
opt = action_data["opt"]
|
|
opt_key = opt.pop("key")
|
|
extra_annotations = {f"SYSOM_ALARM:OPT:{opt_key}": opt}
|
|
append_alert_annotations(db, alert_id, extra_annotations)
|
|
elif action == "MERGE":
|
|
# Perform merge
|
|
merge_list = action_data["merge_list"]
|
|
new_data = action_data["new_data"]
|
|
merge_alert_data(db, merge_list, new_data)
|
|
else:
|
|
raise Exception("Should never invoke here")
|
|
|
|
def on_receive_event(self, event: Event, task: CecAsyncConsumeTask):
|
|
"""
|
|
处理每个单独的任务
|
|
"""
|
|
event_value = event.value
|
|
logger.warning(type(event_value), event_value)
|
|
try:
|
|
assert isinstance(event_value, dict)
|
|
if task.topic_name == CEC_TOPIC_SYSOM_SAD_ALERT:
|
|
self._deal_sad_alert(event_value)
|
|
elif task.topic_name == CEC_TOPIC_SYSOM_ALARM_ACTION:
|
|
action = event_value["action"]
|
|
data = event_value["data"]
|
|
self._deal_alarm_action(action, data)
|
|
else:
|
|
logger.warning(
|
|
f"Received not expect topic data, topic = {task.topic_name}"
|
|
)
|
|
except Exception as e:
|
|
logger.exception(e)
|
|
finally:
|
|
# 执行消息确认
|
|
task.ack(event)
|