matrix-alertbot/matrix_alertbot/webhook.py

106 lines
3.4 KiB
Python
Raw Normal View History

2022-07-08 22:37:09 +02:00
from __future__ import annotations
import logging
import prometheus_client
from aiohttp import ClientError, web, web_request
2022-07-09 12:43:18 +02:00
from aiohttp_prometheus_exporter.handler import metrics
from aiohttp_prometheus_exporter.middleware import prometheus_middleware_factory
2022-07-06 00:54:13 +02:00
from diskcache import Cache
from nio import AsyncClient, LocalProtocolError
from matrix_alertbot.alert import Alert
from matrix_alertbot.chat_functions import send_text_to_room
from matrix_alertbot.config import Config
logger = logging.getLogger(__name__)
routes = web.RouteTableDef()
2022-07-12 18:19:52 +02:00
@routes.get("/health")
async def get_health(request: web_request.Request) -> web.Response:
return web.Response(status=200)
2022-07-12 18:22:11 +02:00
@routes.post("/alerts")
async def create_alerts(request: web_request.Request) -> web.Response:
data = await request.json()
logger.info(f"Received alerts: {data}")
2022-07-06 00:54:13 +02:00
client: AsyncClient = request.app["client"]
config: Config = request.app["config"]
cache: Cache = request.app["cache"]
if "alerts" not in data:
return web.Response(status=400, body="Data must contain 'alerts' key.")
if not isinstance(data["alerts"], list):
return web.Response(status=400, body="Alerts must be a list.")
if len(data["alerts"]) == 0:
return web.Response(status=400, body="Alerts cannot be empty.")
for alert in data["alerts"]:
try:
alert = Alert.from_dict(alert)
except KeyError:
return web.Response(status=400, body=f"Invalid alert: {alert}.")
plaintext = alert.plaintext()
html = alert.html()
try:
event = await send_text_to_room(
client, config.room_id, plaintext, html, notice=False
)
except (LocalProtocolError, ClientError) as e:
logger.error(
f"Unable to send alert {alert.fingerprint} to Matrix room: {e}"
)
return web.Response(
status=500,
body=f"An error occured when sending alert with fingerprint '{alert.fingerprint}' to Matrix room.",
)
cache.set(event.event_id, alert.fingerprint, expire=config.cache_expire_time)
return web.Response(status=200)
2022-07-08 22:46:04 +02:00
class Webhook:
def __init__(self, client: AsyncClient, cache: Cache, config: Config) -> None:
self.app = web.Application(logger=logger)
self.app["client"] = client
2022-07-06 00:54:13 +02:00
self.app["config"] = config
self.app["cache"] = cache
self.app.add_routes(routes)
2022-07-09 12:43:18 +02:00
prometheus_registry = prometheus_client.CollectorRegistry(auto_describe=True)
self.app.middlewares.append(
prometheus_middleware_factory(registry=prometheus_registry)
)
2022-07-09 12:43:18 +02:00
self.app.router.add_get("/metrics", metrics())
self.runner = web.AppRunner(self.app)
self.config = config
self.address = config.address
self.port = config.port
self.socket = config.socket
async def start(self) -> None:
await self.runner.setup()
site: web.BaseSite
if self.address and self.port:
site = web.TCPSite(self.runner, self.address, self.port)
logger.info(f"Listenning on {self.address}:{self.port}")
elif self.socket:
site = web.UnixSite(self.runner, self.socket)
logger.info(f"Listenning on unix://{self.socket}")
await site.start()
async def close(self) -> None:
await self.runner.cleanup()