mirror of https://gitee.com/anolis/sysom.git
20 lines
715 B
Python
20 lines
715 B
Python
import aiohttp
|
|
from .base import PushTargetBase
|
|
|
|
|
|
class PushTargetWebhook(PushTargetBase):
|
|
def __init__(self, config: dict) -> None:
|
|
self.url = config.get("url", "")
|
|
self.method = config.get("method", "POST")
|
|
self.headers = config.get("headers", {})
|
|
self.rules = config.get("rules", [])
|
|
|
|
async def push(self, data: dict):
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.request(
|
|
self.method, self.url, headers=self.headers, json=data
|
|
) as resp:
|
|
if resp.status != 200:
|
|
raise Exception(f"Webhook send failed, status = {resp.status}")
|
|
return await resp.json()
|