mirror of https://gitee.com/anolis/sysom.git
36 lines
1.2 KiB
Python
36 lines
1.2 KiB
Python
|
|
# -*- coding: utf-8 -*- #
|
|
"""
|
|
Time 2023/09/18 10:32
|
|
Author: mingfeng (SunnyQjm)
|
|
Email mfeng@linux.alibaba.com
|
|
File health.py
|
|
Description:
|
|
"""
|
|
from importlib import import_module
|
|
from clogger import logger
|
|
from fastapi import APIRouter, Request
|
|
from sysom_utils import SysomFramework
|
|
from lib.source_convert.base import SourceConverterBase
|
|
|
|
|
|
router = APIRouter()
|
|
@router.post("/{source_type}")
|
|
async def alert_webhook(
|
|
source_type: str, request: Request
|
|
):
|
|
def _get_converter(source_type) -> SourceConverterBase:
|
|
"""
|
|
根据告警源的类型,动态引入一个 SourceConverter 用于将原始告警数据转换成 SysOM 标准的告警格式
|
|
"""
|
|
try:
|
|
return import_module(f"lib.source_convert.{source_type}").SourceConverter()
|
|
except Exception as e:
|
|
raise Exception(f"No SourceConverter available => {str(e)}")
|
|
|
|
data = await request.json()
|
|
converter = _get_converter(source_type)
|
|
sysom_alert_datas = converter.convert(source_type, data)
|
|
for item in sysom_alert_datas:
|
|
SysomFramework.alarm(item.dict())
|
|
return {"code": 0, "err_msg": "", "data": ""} |