from __future__ import annotations import logging from typing import Any from aiohttp import web, web_request from aiotools import AsyncContextManager from diskcache import Cache from nio import AsyncClient, SendRetryError from matrix_alertbot.chat_functions import send_text_to_room from matrix_alertbot.config import Config from matrix_alertbot.storage import Alert logger = logging.getLogger(__name__) routes = web.RouteTableDef() @routes.post("/alert") async def create_alert(request: web_request.Request) -> web.Response: data = await request.json() logger.info(f"Received alert: {data}") client: AsyncClient = request.app["client"] config: Config = request.app["config"] cache: Cache = request.app["cache"] plaintext = "" html = "" for i, alert in enumerate(data["alerts"]): alert = Alert.from_dict(alert) if i != 0: plaintext += "\n" html += "
\n" plaintext += alert.plaintext() html += alert.html() try: event = await send_text_to_room( client, config.room_id, plaintext, html, notice=False ) except SendRetryError as e: logger.error(e) return web.Response(status=500) fingerprints = tuple(alert["fingerprint"] for alert in data["alerts"]) cache.set( event.event_id, fingerprints, expire=config.cache_expire_time, tag="event" ) return web.Response(status=200) class Webhook(AsyncContextManager): def __init__(self, client: AsyncClient, cache: Cache, config: Config) -> None: self.app = web.Application(logger=logger) self.app["client"] = client self.app["config"] = config self.app["cache"] = cache self.app.add_routes(routes) self.runner = web.AppRunner(self.app) self.config = config self.address = config.address self.port = config.port self.socket = config.socket def __aenter__(self) -> Webhook: return self async def __aexit__(self, *args: Any, **kwargs: Any) -> None: await super().__aexit__(*args, **kwargs) await self.close() async def start(self) -> None: await self.runner.setup() site: web.BaseSite if self.address and self.port: site = web.TCPSite(self.runner, self.address, self.port) logger.info(f"Listenning on {self.address}:{self.port}") elif self.socket: site = web.UnixSite(self.runner, self.socket) logger.info(f"Listenning on unix://{self.socket}") await site.start() async def close(self) -> None: await self.runner.cleanup()