matrix-alertbot/matrix_alertbot/alertmanager.py

238 lines
7.6 KiB
Python
Raw Normal View History

2022-07-08 22:37:09 +02:00
from __future__ import annotations
2022-08-08 01:59:09 +02:00
import logging
2022-07-10 02:40:04 +02:00
from datetime import datetime, timedelta
2024-01-22 11:35:13 +01:00
from typing import Any, Dict, List, Optional, Tuple, TypedDict, cast
import aiohttp
from aiohttp import ClientError
from aiohttp_prometheus_exporter.trace import PrometheusTraceConfig
from diskcache import Cache
2022-07-06 00:54:13 +02:00
from matrix_alertbot.errors import (
2022-07-09 10:38:40 +02:00
AlertmanagerServerError,
2022-07-06 00:54:13 +02:00
AlertNotFoundError,
SilenceExpiredError,
2022-08-08 00:28:36 +02:00
SilenceExtendError,
2022-07-06 00:54:13 +02:00
SilenceNotFoundError,
)
2022-08-08 00:28:36 +02:00
DEFAULT_DURATION = timedelta(hours=3)
MAX_DURATION = timedelta(days=3652)
2022-07-27 21:11:11 +02:00
2022-08-08 01:59:09 +02:00
logger = logging.getLogger(__name__)
2024-01-22 11:35:13 +01:00
AlertDict = TypedDict(
"AlertDict",
{
"fingerprint": str,
"labels": Dict[str, str],
},
)
SilenceDict = TypedDict(
"SilenceDict",
{
"id": str,
"matchers": List[Dict[str, Any]],
"createdBy": str,
"status": Dict[str, str],
},
)
2022-08-08 01:59:09 +02:00
2022-07-08 22:46:04 +02:00
class AlertmanagerClient:
def __init__(self, url: str, cache: Cache) -> None:
self.api_url = f"{url}/api/v2"
self.cache = cache
self.session = aiohttp.ClientSession(trace_configs=[PrometheusTraceConfig()])
async def close(self) -> None:
await self.session.close()
2024-01-22 11:35:13 +01:00
async def get_alerts(self) -> List[AlertDict]:
2022-07-06 00:54:13 +02:00
try:
2022-07-08 23:23:38 +02:00
async with self.session.get(f"{self.api_url}/alerts") as response:
response.raise_for_status()
return await response.json()
except ClientError as e:
2022-07-09 10:38:40 +02:00
raise AlertmanagerServerError(
2022-07-09 15:25:16 +02:00
"Cannot fetch alerts from Alertmanager"
2022-07-09 10:38:40 +02:00
) from e
2024-01-22 11:35:13 +01:00
async def get_alert(self, fingerprint: str) -> AlertDict:
2022-08-08 01:59:09 +02:00
logger.debug(f"Fetching details for alert with fingerprint {fingerprint}")
alerts = await self.get_alerts()
2022-07-06 00:54:13 +02:00
return self._find_alert(fingerprint, alerts)
2024-01-22 11:35:13 +01:00
async def get_silences(self) -> List[SilenceDict]:
try:
async with self.session.get(f"{self.api_url}/silences") as response:
response.raise_for_status()
return await response.json()
except ClientError as e:
raise AlertmanagerServerError(
"Cannot fetch silences from Alertmanager"
) from e
2024-01-22 11:35:13 +01:00
async def get_silence(self, silence_id: str) -> SilenceDict:
2022-08-08 01:59:09 +02:00
logger.debug(f"Fetching details for silence with ID {silence_id}")
silences = await self.get_silences()
return self._find_silence(silence_id, silences)
2022-07-10 02:40:04 +02:00
async def create_silence(
self,
fingerprint: str,
user: str,
duration_seconds: Optional[int] = None,
2022-07-10 02:40:04 +02:00
) -> str:
alert = await self.get_alert(fingerprint)
2022-08-08 02:03:25 +02:00
logger.debug(f"Creating silence for alert with fingerprint {fingerprint}")
2022-07-10 03:03:08 +02:00
silence_matchers = [
{"name": label, "value": value, "isRegex": False, "isEqual": True}
for label, value in alert["labels"].items()
]
2022-08-08 00:28:36 +02:00
return await self._create_or_update_silence(
fingerprint, silence_matchers, user, duration_seconds
)
2022-08-08 01:44:08 +02:00
async def update_silence(
self,
fingerprint: str,
user: Optional[str] = None,
duration_seconds: Optional[int] = None,
2022-08-08 11:26:08 +02:00
*,
force: bool = False,
2022-08-08 01:44:08 +02:00
) -> str:
2022-08-08 01:59:09 +02:00
logger.debug(
f"Reading silence for alert with fingerprint {fingerprint} from cache"
)
2024-01-22 11:35:13 +01:00
cache_result = cast(
Optional[Tuple[str, int]], self.cache.get(fingerprint, expire_time=True)
)
if cache_result is not None:
silence_id, expire_time = cache_result
else:
2022-08-08 00:28:36 +02:00
silence_id = None
2024-01-22 11:35:13 +01:00
expire_time = None
2022-08-08 00:28:36 +02:00
if silence_id is None:
raise SilenceNotFoundError(
f"Cannot find silence for alert with fingerprint {fingerprint} in cache."
)
2022-08-08 02:03:25 +02:00
logger.debug(
f"Updating silence with ID {silence_id} for alert with fingerprint {fingerprint}"
)
2022-08-08 01:59:09 +02:00
2022-08-08 11:26:08 +02:00
# If silence in cache had a duration, and the new silence doesn't have a duration
# then we cannot update this silence.
if not force and duration_seconds is None and expire_time is not None:
raise SilenceExtendError(
f"Cannot extend silence ID {silence_id} with static duration."
)
2022-08-08 00:28:36 +02:00
silence = await self.get_silence(silence_id)
2022-08-08 01:44:08 +02:00
if user is None:
user = silence["createdBy"]
2022-08-08 00:28:36 +02:00
silence_matchers = silence["matchers"]
2022-08-08 01:44:08 +02:00
return await self._create_or_update_silence(
2022-08-08 02:03:25 +02:00
fingerprint, silence_matchers, user, duration_seconds, silence_id
2022-08-08 01:44:08 +02:00
)
async def create_or_update_silence(
2022-08-08 11:26:08 +02:00
self,
fingerprint: str,
user: str,
duration_seconds: Optional[int] = None,
*,
force: bool = False,
2022-08-08 01:44:08 +02:00
) -> str:
try:
2022-08-08 11:26:08 +02:00
silence_id = await self.update_silence(
fingerprint, user, duration_seconds, force=force
)
2022-08-08 01:44:08 +02:00
except SilenceNotFoundError:
silence_id = await self.create_silence(fingerprint, user, duration_seconds)
return silence_id
2022-08-08 00:28:36 +02:00
async def _create_or_update_silence(
self,
fingerprint: str,
silence_matchers: List,
user: str,
duration_seconds: Optional[int] = None,
silence_id: Optional[str] = None,
) -> str:
if duration_seconds is None:
duration_delta = DEFAULT_DURATION
elif duration_seconds > MAX_DURATION.total_seconds():
duration_delta = MAX_DURATION
else:
2022-07-27 21:11:11 +02:00
duration_delta = timedelta(seconds=duration_seconds)
2022-08-08 00:28:36 +02:00
start_time = datetime.now()
end_time = start_time + duration_delta
silence = {
"id": silence_id,
2022-07-10 03:03:08 +02:00
"matchers": silence_matchers,
2022-07-05 23:35:19 +02:00
"startsAt": start_time.isoformat(),
"endsAt": end_time.isoformat(),
"createdBy": user,
"comment": "Acknowledge alert from Matrix",
}
2022-07-06 00:54:13 +02:00
try:
async with self.session.post(
f"{self.api_url}/silences", json=silence
) as response:
response.raise_for_status()
data = await response.json()
except ClientError as e:
2022-07-09 10:38:40 +02:00
raise AlertmanagerServerError(
2022-07-06 00:54:13 +02:00
f"Cannot create silence for alert fingerprint {fingerprint}"
) from e
2022-08-08 00:28:36 +02:00
self.cache.set(fingerprint, data["silenceID"], expire=duration_seconds)
return data["silenceID"]
async def delete_silence(self, silence_id: str) -> None:
silence = await self.get_silence(silence_id)
2022-07-06 00:54:13 +02:00
2022-07-27 21:28:57 +02:00
silence_state = silence["status"]["state"]
if silence_state == "expired":
raise SilenceExpiredError(
f"Cannot delete already expired silence with ID {silence_id}"
2022-07-06 00:54:13 +02:00
)
try:
async with self.session.delete(
f"{self.api_url}/silence/{silence_id}"
) as response:
2022-07-06 00:54:13 +02:00
response.raise_for_status()
except ClientError as e:
2022-07-09 10:38:40 +02:00
raise AlertmanagerServerError(
f"Cannot delete silence with ID {silence_id}"
2022-07-09 10:38:40 +02:00
) from e
2022-07-06 00:54:13 +02:00
@staticmethod
2024-01-22 11:35:13 +01:00
def _find_alert(fingerprint: str, alerts: List[AlertDict]) -> AlertDict:
for alert in alerts:
if alert["fingerprint"] == fingerprint:
return alert
raise AlertNotFoundError(f"Cannot find alert with fingerprint {fingerprint}")
2022-07-10 02:40:04 +02:00
@staticmethod
2024-01-22 11:35:13 +01:00
def _find_silence(silence_id: str, silences: List[SilenceDict]) -> SilenceDict:
for silence in silences:
if silence["id"] == silence_id:
return silence
raise SilenceNotFoundError(f"Cannot find silence with ID {silence_id}")