112 lines
3.8 KiB
Python
112 lines
3.8 KiB
Python
from __future__ import annotations
|
|
|
|
import datetime
|
|
from typing import Any, Dict, List
|
|
|
|
import aiohttp
|
|
import pytimeparse
|
|
from aiohttp import ClientError
|
|
from aiotools import AsyncContextManager
|
|
from diskcache import Cache
|
|
|
|
from matrix_alertbot.errors import (
|
|
AlertmanagerError,
|
|
AlertNotFoundError,
|
|
SilenceNotFoundError,
|
|
)
|
|
|
|
|
|
class AlertmanagerClient(AsyncContextManager):
|
|
def __init__(self, url: str, cache: Cache) -> None:
|
|
self.api_url = f"{url}/api/v2"
|
|
self.cache = cache
|
|
self.session = aiohttp.ClientSession()
|
|
|
|
async def __aenter__(self) -> AlertmanagerClient:
|
|
return await self
|
|
|
|
async def __aexit__(self, *args: Any, **kwargs: Any) -> None:
|
|
await super().__aexit__(*args, **kwargs)
|
|
await self.close()
|
|
|
|
async def close(self) -> None:
|
|
await self.session.close()
|
|
|
|
async def get_alerts(self) -> List[Dict]:
|
|
try:
|
|
async with self.session.get(f"{self.api_url}") as response:
|
|
response.raise_for_status()
|
|
return await response.json()
|
|
except ClientError as e:
|
|
raise AlertmanagerError(f"Cannot fetch alerts from Alertmanager") from e
|
|
|
|
async def get_alert(self, fingerprint: str) -> Dict:
|
|
alerts = await self.get_alerts()
|
|
return self._find_alert(fingerprint, alerts)
|
|
|
|
async def get_alert_labels(self, fingerprint: str) -> Dict[str, str]:
|
|
alert = await self.get_alert(fingerprint)
|
|
return alert["labels"]
|
|
|
|
async def create_silence(self, fingerprint: str, duration: str, user: str) -> str:
|
|
labels = await self.get_alert_labels(fingerprint)
|
|
matchers = []
|
|
for label_name, label_value in labels.items():
|
|
matchers.append(
|
|
{"name": label_name, "value": label_value, "isRegex": False}
|
|
)
|
|
|
|
start_time = datetime.datetime.now()
|
|
duration_seconds = pytimeparse.parse(duration)
|
|
duration_delta = datetime.timedelta(seconds=duration_seconds)
|
|
end_time = start_time + duration_delta
|
|
|
|
silence = {
|
|
"matchers": matchers,
|
|
"startsAt": start_time.isoformat(),
|
|
"endsAt": end_time.isoformat(),
|
|
"createdBy": user,
|
|
"comment": "Acknowledge alert from Matrix",
|
|
}
|
|
|
|
try:
|
|
async with self.session.post(
|
|
f"{self.api_url}/silences", json=silence
|
|
) as response:
|
|
response.raise_for_status()
|
|
data = await response.json()
|
|
except ClientError as e:
|
|
raise AlertmanagerError(
|
|
f"Cannot create silence for alert fingerprint {fingerprint}"
|
|
) from e
|
|
|
|
return data["silenceID"]
|
|
|
|
async def delete_silence(self, fingerprint: str) -> None:
|
|
alert = await self.get_alert(fingerprint)
|
|
|
|
alert_state = alert["status"]["state"]
|
|
if alert_state != "suppressed":
|
|
raise SilenceNotFoundError(
|
|
f"Cannot find silences for alert fingerprint {fingerprint} in state {alert_state}"
|
|
)
|
|
|
|
silences = alert["status"]["silencedBy"]
|
|
for silence in silences:
|
|
await self._delete_silence_by_id(silence)
|
|
|
|
async def _delete_silence_by_id(self, silence: str) -> None:
|
|
try:
|
|
async with self.session.delete(
|
|
f"{self.api_url}/silence/{silence}"
|
|
) as response:
|
|
response.raise_for_status()
|
|
except ClientError as e:
|
|
raise AlertmanagerError(f"Cannot delete silence with ID {silence}") from e
|
|
|
|
@staticmethod
|
|
def _find_alert(fingerprint: str, alerts: List[Dict]) -> Dict:
|
|
for alert in alerts:
|
|
if alert["fingerprint"] == fingerprint:
|
|
return alert
|
|
raise AlertNotFoundError(f"Cannot find alert with fingerprint {fingerprint}")
|