matrix-alertbot/matrix_alertbot/alertmanager.py

136 lines
4.5 KiB
Python

from __future__ import annotations
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import aiohttp
from aiohttp import ClientError
from aiohttp_prometheus_exporter.trace import PrometheusTraceConfig
from diskcache import Cache
from matrix_alertbot.errors import (
AlertmanagerServerError,
AlertNotFoundError,
InvalidDurationError,
SilenceExpiredError,
SilenceNotFoundError,
)
class AlertmanagerClient:
def __init__(self, url: str, cache: Cache) -> None:
self.api_url = f"{url}/api/v2"
self.cache = cache
self.session = aiohttp.ClientSession(trace_configs=[PrometheusTraceConfig()])
async def close(self) -> None:
await self.session.close()
async def get_alerts(self) -> List[Dict]:
try:
async with self.session.get(f"{self.api_url}/alerts") as response:
response.raise_for_status()
return await response.json()
except ClientError as e:
raise AlertmanagerServerError(
"Cannot fetch alerts from Alertmanager"
) from e
async def get_alert(self, fingerprint: str) -> Dict:
alerts = await self.get_alerts()
return self._find_alert(fingerprint, alerts)
async def get_silences(self) -> List[Dict]:
try:
async with self.session.get(f"{self.api_url}/silences") as response:
response.raise_for_status()
return await response.json()
except ClientError as e:
raise AlertmanagerServerError(
"Cannot fetch silences from Alertmanager"
) from e
async def get_silence(self, silence_id: str) -> Dict:
silences = await self.get_silences()
return self._find_silence(silence_id, silences)
async def create_silence(
self,
fingerprint: str,
user: str,
duration_seconds: Optional[int] = None,
silence_id: Optional[str] = None,
) -> str:
alert = await self.get_alert(fingerprint)
silence_matchers = [
{"name": label, "value": value, "isRegex": False, "isEqual": True}
for label, value in alert["labels"].items()
]
start_time = datetime.now()
if duration_seconds is None:
end_time = datetime.max
elif duration_seconds > 0:
try:
duration_delta = timedelta(seconds=duration_seconds)
end_time = start_time + duration_delta
except OverflowError:
end_time = datetime.max
else:
raise InvalidDurationError(f"Duration must be positive: {duration_seconds}")
silence = {
"id": silence_id,
"matchers": silence_matchers,
"startsAt": start_time.isoformat(),
"endsAt": end_time.isoformat(),
"createdBy": user,
"comment": "Acknowledge alert from Matrix",
}
try:
async with self.session.post(
f"{self.api_url}/silences", json=silence
) as response:
response.raise_for_status()
data = await response.json()
except ClientError as e:
raise AlertmanagerServerError(
f"Cannot create silence for alert fingerprint {fingerprint}"
) from e
return data["silenceID"]
async def delete_silence(self, silence_id: str) -> None:
silence = await self.get_silence(silence_id)
silence_state = silence["state"]
if silence_state == "expired":
raise SilenceExpiredError(
f"Cannot delete already expired silence with ID {silence_id}"
)
try:
async with self.session.delete(
f"{self.api_url}/silence/{silence_id}"
) as response:
response.raise_for_status()
except ClientError as e:
raise AlertmanagerServerError(
f"Cannot delete silence with ID {silence_id}"
) from e
@staticmethod
def _find_alert(fingerprint: str, alerts: List[Dict]) -> Dict:
for alert in alerts:
if alert["fingerprint"] == fingerprint:
return alert
raise AlertNotFoundError(f"Cannot find alert with fingerprint {fingerprint}")
@staticmethod
def _find_silence(silence_id: str, silences: List[Dict]) -> Dict:
for silence in silences:
if silence["id"] == silence_id:
return silence
raise SilenceNotFoundError(f"Cannot find silence with ID {silence_id}")