import datetime from typing import Dict, List import pytimeparse import requests from matrix_alertbot.cache import Cache from matrix_alertbot.errors import AlertNotFoundError class AlertmanagerClient: def __init__(self, url: str, cache: Cache) -> None: self.api_url = f"{url}/api/v2" self.cache = cache def get_alerts(self) -> List[Dict]: response = requests.get(f"{self.api_url}/alerts") response.raise_for_status() return response.json() def get_alert_labels(self, fingerprint: str) -> Dict[str, str]: if fingerprint not in self.cache: alerts = self.get_alerts() alert = self._find_alert(alerts, fingerprint) self.cache[fingerprint] = alert["labels"] return self.cache[fingerprint] def create_silence(self, fingerprint: str, duration: str, user: str) -> str: labels = self.get_alert_labels(fingerprint) matchers = [] for label_name, label_value in labels.items(): matchers.append( {"name": label_name, "value": label_value, "isRegex": False} ) start_time = datetime.datetime.now() duration_seconds = pytimeparse.parse(duration) duration_delta = datetime.timedelta(seconds=duration_seconds) end_time = start_time + duration_delta silence = { "matchers": matchers, "startsAt": start_time.isoformat(), "endsAt": end_time.isoformat(), "createdBy": user, "comment": "Acknowledge alert from Matrix", } response = requests.post(f"{self.api_url}/silences", json=silence) response.raise_for_status() data = response.json() return data["silenceID"] @staticmethod def _find_alert(alerts: List[Dict], fingerprint: str) -> Dict: for alert in alerts: if alert["fingerprint"] == fingerprint: return alert raise AlertNotFoundError(f"Cannot find alert with fingerprint {fingerprint}")