mirror of https://gitee.com/anolis/sysom.git
83 lines
2.6 KiB
Python
83 lines
2.6 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
@Author: wb-msm241621
|
|
@Date: 2023-07-14 10:56:24
|
|
@LastEditTime: 2023-07-14 10:56:25
|
|
@Description:
|
|
"""
|
|
import json
|
|
from clogger import logger
|
|
from pydantic import ValidationError
|
|
from cec_base.cec_client import CecAsyncConsumeTask
|
|
from cec_base.event import Event
|
|
from cec_base import Consumer
|
|
from sysom_utils import AsyncEventExecutor, CecAsyncConsumeTask
|
|
from conf.settings import *
|
|
from app.crud import create_node_log, create_audit_log
|
|
from app.database import SessionLocal
|
|
from app.schemas import NodeLogCreate, AuditLogCreate
|
|
|
|
|
|
class LogEventListener(AsyncEventExecutor):
|
|
"""async log evenvt listener class
|
|
listener topic
|
|
node_event
|
|
AUDIT_EVENT
|
|
|
|
"""
|
|
|
|
def __init__(self):
|
|
super().__init__(SYSOM_CEC_URL, callback=self.process_event)
|
|
self.append_group_consume_task(
|
|
SYSOM_CEC_LOG_DISPATCH_NODE_EVENT_TOPIC,
|
|
SYSOM_CEC_LOG_DISPATCH_NODE_EVENT_GROUP,
|
|
consumer_id=Consumer.generate_consumer_id(),
|
|
ensure_topic_exist=True,
|
|
)
|
|
self.append_group_consume_task(
|
|
SYSOM_CEC_LOG_DISPATCH_AUDIT_EVENT_TOPIC,
|
|
SYSOM_CEC_LOG_DISPATCH_AUDIT_EVENT_GROUP,
|
|
consumer_id=Consumer.generate_consumer_id(),
|
|
ensure_topic_exist=True,
|
|
)
|
|
|
|
async def process_event(self, event: Event, task: CecAsyncConsumeTask):
|
|
try:
|
|
print(event.value)
|
|
|
|
if task.topic_name == SYSOM_CEC_LOG_DISPATCH_NODE_EVENT_TOPIC:
|
|
await self._node_log_event_handler(event)
|
|
elif task.topic_name == SYSOM_CEC_LOG_DISPATCH_AUDIT_EVENT_TOPIC:
|
|
await self._audit_log_event_handler(event=event)
|
|
except Exception as e:
|
|
logger.exception(e)
|
|
finally:
|
|
task.ack(event)
|
|
|
|
async def _node_log_event_handler(self, event: Event):
|
|
"""
|
|
{
|
|
"ts": 1689917326,
|
|
"instance": "ece-aloxmfna",
|
|
"event_type": "xxxx",
|
|
"description": "xxxx",
|
|
"extra": {}
|
|
}
|
|
"""
|
|
assert isinstance(event.value, dict)
|
|
try:
|
|
node_log = NodeLogCreate(**event.value)
|
|
with SessionLocal() as db:
|
|
create_node_log(db, node_log)
|
|
except Exception as e:
|
|
logger.exception(e)
|
|
|
|
async def _audit_log_event_handler(self, event: Event):
|
|
assert isinstance(event.value, dict)
|
|
try:
|
|
audit_log = AuditLogCreate(**event.value)
|
|
with SessionLocal() as db:
|
|
create_audit_log(db, audit_log)
|
|
except Exception as e:
|
|
logger.exception(e)
|