sysom1/sysom_server/sysom_ad_proxy/app/routers/webhook.py

36 lines
1.2 KiB
Python

# -*- coding: utf-8 -*- #
"""
Time 2023/09/18 10:32
Author: mingfeng (SunnyQjm)
Email mfeng@linux.alibaba.com
File health.py
Description:
"""
from importlib import import_module
from clogger import logger
from fastapi import APIRouter, Request
from sysom_utils import SysomFramework
from lib.source_convert.base import SourceConverterBase
router = APIRouter()
@router.post("/{source_type}")
async def alert_webhook(
source_type: str, request: Request
):
def _get_converter(source_type) -> SourceConverterBase:
"""
根据告警源的类型,动态引入一个 SourceConverter 用于将原始告警数据转换成 SysOM 标准的告警格式
"""
try:
return import_module(f"lib.source_convert.{source_type}").SourceConverter()
except Exception as e:
raise Exception(f"No SourceConverter available => {str(e)}")
data = await request.json()
converter = _get_converter(source_type)
sysom_alert_datas = converter.convert(source_type, data)
for item in sysom_alert_datas:
SysomFramework.alarm(item.dict())
return {"code": 0, "err_msg": "", "data": ""}