sysom1/sysom_server/sysom_alert_pusher/lib/targets/webhook.py

20 lines
715 B
Python

import aiohttp
from .base import PushTargetBase
class PushTargetWebhook(PushTargetBase):
def __init__(self, config: dict) -> None:
self.url = config.get("url", "")
self.method = config.get("method", "POST")
self.headers = config.get("headers", {})
self.rules = config.get("rules", [])
async def push(self, data: dict):
async with aiohttp.ClientSession() as session:
async with session.request(
self.method, self.url, headers=self.headers, json=data
) as resp:
if resp.status != 200:
raise Exception(f"Webhook send failed, status = {resp.status}")
return await resp.json()