sysom1/sysom_server/sysom_alarm/app/executor.py

119 lines
4.3 KiB
Python

# -*- coding: utf-8 -*- #
"""
Time 2022/10/11 16:13
Author: mingfeng (SunnyQjm)
Email mfeng@linux.alibaba.com
File executor.py
Description:
"""
from cec_base.event import Event
from cec_base.consumer import Consumer
from cec_base.producer import Producer
from cec_base.cec_client import MultiConsumer, CecAsyncConsumeTask
from clogger import logger
from conf.settings import *
from app.database import SessionLocal
from app.crud import update_or_create_alert_data, append_alert_annotations, merge_alert_data
from app.schemas import AlertDataCreate
from sysom_utils import CecTarget, SysomFramework
class AlarmListener(MultiConsumer):
"""A cec-based channel listener
A cec-based channel lilster, ssed to listen to requests for channels from
other modules and output the results to cec after performing the corresponding
operation on the target node
Args:
task_process_thread_num(str): The number of threads contained in the thread
pool used to execute the task
"""
def __init__(self) -> None:
super().__init__(
YAML_CONFIG.get_cec_url(CecTarget.PRODUCER),
custom_callback=self.on_receive_event,
)
self.append_group_consume_task(
CEC_TOPIC_SYSOM_SAD_ALERT,
"sysom_alarm",
Consumer.generate_consumer_id(),
ensure_topic_exist=True,
)
self.append_group_consume_task(
CEC_TOPIC_SYSOM_ALARM_ACTION,
"sysom_alarm",
Consumer.generate_consumer_id(),
ensure_topic_exist=True,
)
self._producer: Producer = SysomFramework.cec_producer()
def _delivery(self, topic: str, value: dict):
self._producer.produce(topic, value)
self._producer.flush()
def _deal_sad_alert(self, alert_data: dict):
"""Save SAD format alert data to db
Args:
alert_data (dict): _description_
"""
with SessionLocal() as db:
update_or_create_alert_data(db, AlertDataCreate(**alert_data))
def _deal_alarm_action(self, action: str, action_data: dict):
"""Perform alert action for specific alert_id
Args:
action (str): _description_
action_data (dict): _description_
"""
if action not in ["ADD_ANNOTATION", "ADD_OPT", "MERGE"]:
raise Exception(f"Not support alarm action: {action}")
with SessionLocal() as db:
if action == "ADD_ANNOTATION":
# Add annotations to specific alert data
alert_id = action_data["alert_id"]
annotations = action_data["annotations"]
append_alert_annotations(db, alert_id, annotations)
elif action == "ADD_OPT":
# Add opt to specific alert data
alert_id = action_data["alert_id"]
opt = action_data["opt"]
opt_key = opt.pop("key")
extra_annotations = {f"SYSOM_ALARM:OPT:{opt_key}": opt}
append_alert_annotations(db, alert_id, extra_annotations)
elif action == "MERGE":
# Perform merge
merge_list = action_data["merge_list"]
new_data = action_data["new_data"]
merge_alert_data(db, merge_list, new_data)
else:
raise Exception("Should never invoke here")
def on_receive_event(self, event: Event, task: CecAsyncConsumeTask):
"""
处理每个单独的任务
"""
event_value = event.value
logger.warning(type(event_value), event_value)
try:
assert isinstance(event_value, dict)
if task.topic_name == CEC_TOPIC_SYSOM_SAD_ALERT:
self._deal_sad_alert(event_value)
elif task.topic_name == CEC_TOPIC_SYSOM_ALARM_ACTION:
action = event_value["action"]
data = event_value["data"]
self._deal_alarm_action(action, data)
else:
logger.warning(
f"Received not expect topic data, topic = {task.topic_name}"
)
except Exception as e:
logger.exception(e)
finally:
# 执行消息确认
task.ack(event)