matrix-alertbot/matrix_alertbot/alertmanager.py

134 lines
4.4 KiB
Python

from __future__ import annotations
from datetime import datetime, timedelta
from typing import Dict, List
import aiohttp
import pytimeparse2
from aiohttp import ClientError
from aiohttp_prometheus_exporter.trace import PrometheusTraceConfig
from diskcache import Cache
from matrix_alertbot.errors import (
AlertmanagerServerError,
AlertMismatchError,
AlertNotFoundError,
SilenceNotFoundError,
)
from matrix_alertbot.matcher import AlertMatcher
class AlertmanagerClient:
def __init__(self, url: str, cache: Cache) -> None:
self.api_url = f"{url}/api/v2"
self.cache = cache
self.session = aiohttp.ClientSession(trace_configs=[PrometheusTraceConfig()])
async def close(self) -> None:
await self.session.close()
async def get_alerts(self) -> List[Dict]:
try:
async with self.session.get(f"{self.api_url}/alerts") as response:
response.raise_for_status()
return await response.json()
except ClientError as e:
raise AlertmanagerServerError(
"Cannot fetch alerts from Alertmanager"
) from e
async def get_alert(self, fingerprint: str) -> Dict:
alerts = await self.get_alerts()
return self._find_alert(fingerprint, alerts)
async def create_silence(
self,
fingerprint: str,
duration: str,
user: str,
matchers: List[AlertMatcher],
) -> str:
alert = await self.get_alert(fingerprint)
self._match_alert(alert, matchers)
silence_matchers = [
{"name": label, "value": value, "isRegex": False, "isEqual": True}
for label, value in alert["labels"].items()
]
start_time = datetime.now()
duration_seconds = pytimeparse2.parse(duration)
duration_delta = timedelta(seconds=duration_seconds)
end_time = start_time + duration_delta
silence = {
"matchers": silence_matchers,
"startsAt": start_time.isoformat(),
"endsAt": end_time.isoformat(),
"createdBy": user,
"comment": "Acknowledge alert from Matrix",
}
try:
async with self.session.post(
f"{self.api_url}/silences", json=silence
) as response:
response.raise_for_status()
data = await response.json()
except ClientError as e:
raise AlertmanagerServerError(
f"Cannot create silence for alert fingerprint {fingerprint}"
) from e
return data["silenceID"]
async def delete_silences(
self, fingerprint: str, matchers: List[AlertMatcher]
) -> List[str]:
alert = await self.get_alert(fingerprint)
alert_state = alert["status"]["state"]
if alert_state != "suppressed":
raise SilenceNotFoundError(
f"Cannot find silences for alert fingerprint {fingerprint} in state {alert_state}"
)
self._match_alert(alert, matchers)
silences = alert["status"]["silencedBy"]
for silence in silences:
await self._delete_silence(silence)
return silences
async def _delete_silence(self, silence: str) -> None:
try:
async with self.session.delete(
f"{self.api_url}/silence/{silence}"
) as response:
response.raise_for_status()
except ClientError as e:
raise AlertmanagerServerError(
f"Cannot delete silence with ID {silence}"
) from e
@staticmethod
def _find_alert(fingerprint: str, alerts: List[Dict]) -> Dict:
for alert in alerts:
if alert["fingerprint"] == fingerprint:
return alert
raise AlertNotFoundError(f"Cannot find alert with fingerprint {fingerprint}")
@staticmethod
def _match_alert(alert: Dict, matchers: List[AlertMatcher]) -> None:
labels = alert["labels"]
for matcher in matchers:
if matcher.label not in labels:
labels_text = ", ".join(labels)
raise AlertMismatchError(
f"Cannot find label {matcher.label} in alert labels: {labels_text}"
)
if not matcher.match(labels):
raise AlertMismatchError(
f"Alert with label {matcher.label}={labels[matcher.label]} does not match {matcher}"
)