from __future__ import annotations import datetime from typing import Any, Dict, List import aiohttp import pytimeparse from aiohttp import ClientError from aiotools import AsyncContextManager from diskcache import Cache from matrix_alertbot.errors import ( AlertmanagerError, AlertNotFoundError, SilenceNotFoundError, ) class AlertmanagerClient(AsyncContextManager): def __init__(self, url: str, cache: Cache) -> None: self.api_url = f"{url}/api/v2" self.cache = cache self.session = aiohttp.ClientSession() async def __aenter__(self) -> AlertmanagerClient: return self async def __aexit__(self, *args: Any, **kwargs: Any) -> None: await super().__aexit__(*args, **kwargs) await self.close() async def close(self) -> None: await self.session.close() async def get_alerts(self) -> List[Dict]: try: async with self.session.get(f"{self.api_url}") as response: response.raise_for_status() return await response.json() except ClientError as e: raise AlertmanagerError(f"Cannot fetch alerts from Alertmanager") from e async def get_alert(self, fingerprint: str) -> Dict: alerts = await self.get_alerts() return self._find_alert(fingerprint, alerts) async def get_alert_labels(self, fingerprint: str) -> Dict[str, str]: alert = await self.get_alert(fingerprint) return alert["labels"] async def create_silence(self, fingerprint: str, duration: str, user: str) -> str: labels = await self.get_alert_labels(fingerprint) matchers = [] for label_name, label_value in labels.items(): matchers.append( {"name": label_name, "value": label_value, "isRegex": False} ) start_time = datetime.datetime.now() duration_seconds = pytimeparse.parse(duration) duration_delta = datetime.timedelta(seconds=duration_seconds) end_time = start_time + duration_delta silence = { "matchers": matchers, "startsAt": start_time.isoformat(), "endsAt": end_time.isoformat(), "createdBy": user, "comment": "Acknowledge alert from Matrix", } try: async with self.session.post( f"{self.api_url}/silences", json=silence ) as response: response.raise_for_status() data = await response.json() except ClientError as e: raise AlertmanagerError( f"Cannot create silence for alert fingerprint {fingerprint}" ) from e return data["silenceID"] async def delete_silence(self, fingerprint: str) -> None: alert = await self.get_alert(fingerprint) alert_state = alert["status"]["state"] if alert_state != "suppressed": raise SilenceNotFoundError( f"Cannot find silences for alert fingerprint {fingerprint} in state {alert_state}" ) silences = alert["status"]["silencedBy"] for silence in silences: await self._delete_silence_by_id(silence) async def _delete_silence_by_id(self, silence: str) -> None: try: async with self.session.delete( f"{self.api_url}/silence/{silence}" ) as response: response.raise_for_status() except ClientError as e: raise AlertmanagerError(f"Cannot delete silence with ID {silence}") from e @staticmethod def _find_alert(fingerprint: str, alerts: List[Dict]) -> Dict: for alert in alerts: if alert["fingerprint"] == fingerprint: return alert raise AlertNotFoundError(f"Cannot find alert with fingerprint {fingerprint}")