sysom1/sysom_server/sysom_log/app/executor.py

83 lines
2.6 KiB
Python

# -*- coding: utf-8 -*-
"""
@Author: wb-msm241621
@Date: 2023-07-14 10:56:24
@LastEditTime: 2023-07-14 10:56:25
@Description:
"""
import json
from clogger import logger
from pydantic import ValidationError
from cec_base.cec_client import CecAsyncConsumeTask
from cec_base.event import Event
from cec_base import Consumer
from sysom_utils import AsyncEventExecutor, CecAsyncConsumeTask
from conf.settings import *
from app.crud import create_node_log, create_audit_log
from app.database import SessionLocal
from app.schemas import NodeLogCreate, AuditLogCreate
class LogEventListener(AsyncEventExecutor):
"""async log evenvt listener class
listener topic
node_event
AUDIT_EVENT
"""
def __init__(self):
super().__init__(SYSOM_CEC_URL, callback=self.process_event)
self.append_group_consume_task(
SYSOM_CEC_LOG_DISPATCH_NODE_EVENT_TOPIC,
SYSOM_CEC_LOG_DISPATCH_NODE_EVENT_GROUP,
consumer_id=Consumer.generate_consumer_id(),
ensure_topic_exist=True,
)
self.append_group_consume_task(
SYSOM_CEC_LOG_DISPATCH_AUDIT_EVENT_TOPIC,
SYSOM_CEC_LOG_DISPATCH_AUDIT_EVENT_GROUP,
consumer_id=Consumer.generate_consumer_id(),
ensure_topic_exist=True,
)
async def process_event(self, event: Event, task: CecAsyncConsumeTask):
try:
print(event.value)
if task.topic_name == SYSOM_CEC_LOG_DISPATCH_NODE_EVENT_TOPIC:
await self._node_log_event_handler(event)
elif task.topic_name == SYSOM_CEC_LOG_DISPATCH_AUDIT_EVENT_TOPIC:
await self._audit_log_event_handler(event=event)
except Exception as e:
logger.exception(e)
finally:
task.ack(event)
async def _node_log_event_handler(self, event: Event):
"""
{
"ts": 1689917326,
"instance": "ece-aloxmfna",
"event_type": "xxxx",
"description": "xxxx",
"extra": {}
}
"""
assert isinstance(event.value, dict)
try:
node_log = NodeLogCreate(**event.value)
with SessionLocal() as db:
create_node_log(db, node_log)
except Exception as e:
logger.exception(e)
async def _audit_log_event_handler(self, event: Event):
assert isinstance(event.value, dict)
try:
audit_log = AuditLogCreate(**event.value)
with SessionLocal() as db:
create_audit_log(db, audit_log)
except Exception as e:
logger.exception(e)