matrix-alertbot/matrix_alertbot/alertmanager.py

59 lines
2 KiB
Python
Raw Normal View History

import datetime
from typing import Dict, List
import pytimeparse
import requests
from matrix_alertbot.cache import Cache
from matrix_alertbot.errors import AlertNotFoundError
class AlertmanagerClient:
def __init__(self, url: str, cache: Cache) -> None:
self.api_url = f"{url}/api/v2"
self.cache = cache
def get_alerts(self) -> List[Dict]:
response = requests.get(f"{self.api_url}/alert")
response.raise_for_status()
return response.json()
def get_alert_labels(self, fingerprint: str) -> Dict[str, str]:
if fingerprint not in self.cache:
alerts = self.get_alerts()
alert = self._find_alert(alerts, fingerprint)
self.cache[fingerprint] = alert["labels"]
return self.cache[fingerprint]
def create_silence(self, fingerprint: str, duration: str, user: str) -> str:
labels = self.get_alert_labels(fingerprint)
matchers = []
for label_name, label_value in labels.items():
matchers.append(
{"name": label_name, "value": label_value, "isRegex": False}
)
start_time = datetime.datetime.now()
duration_seconds = pytimeparse.parse(duration)
duration_delta = datetime.timedelta(seconds=duration_seconds)
end_time = start_time + duration_delta
silence = {
"matchers": matchers,
"startsAt": start_time,
"endsAt": end_time,
"createdBy": user,
"comment": "Acknowledge alert from Matrix",
}
response = requests.post(f"{self.api_url}/silences", json=silence)
response.raise_for_status()
data = response.json()
return data["silenceID"]
@staticmethod
def _find_alert(alerts: List[Dict], fingerprint: str) -> Dict:
for alert in alerts:
if alert["fingerprint"] == fingerprint:
return alert
raise AlertNotFoundError(f"Cannot find alert with fingerprint {fingerprint}")