from __future__ import annotations import json import unittest from datetime import datetime, timedelta from typing import Any from unittest.mock import MagicMock, Mock import aiohttp import aiohttp.test_utils import aiotools from aiohttp import web, web_request from diskcache import Cache from freezegun import freeze_time from matrix_alertbot.alertmanager import AlertmanagerClient from matrix_alertbot.errors import ( AlertmanagerServerError, AlertNotFoundError, InvalidDurationError, SilenceExpiredError, SilenceNotFoundError, ) class FakeTimeDelta: def __init__(self, seconds: int) -> None: self.seconds = seconds def __radd__(self, other: Any) -> datetime: return datetime.utcfromtimestamp(self.seconds) class AbstractFakeAlertmanagerServer: def __init__(self) -> None: self.app = web.Application() self.app.router.add_routes( [ web.get("/api/v2/alerts", self.get_alerts), web.get("/api/v2/silences", self.get_silences), web.post("/api/v2/silences", self.create_silence), web.delete("/api/v2/silence/{silence}", self.delete_silence), ] ) self.app["silences"] = [ {"id": "silence1", "status": {"state": "active"}}, {"id": "silence2", "status": {"state": "expired"}}, ] self.runner = web.AppRunner(self.app) async def __aenter__(self) -> AbstractFakeAlertmanagerServer: await self.start() return self async def __aexit__(self, *args: Any) -> None: await self.stop() async def start(self) -> None: self.port = aiohttp.test_utils.unused_port() await self.runner.setup() site = web.TCPSite(self.runner, "localhost", self.port) await site.start() async def stop(self) -> None: await self.runner.cleanup() async def get_alerts(self, request: web_request.Request) -> web.Response: raise NotImplementedError async def get_silences(self, request: web_request.Request) -> web.Response: raise NotImplementedError async def create_silence(self, request: web_request.Request) -> web.Response: raise NotImplementedError async def delete_silence(self, request: web_request.Request) -> web.Response: raise NotImplementedError class FakeAlertmanagerServer(AbstractFakeAlertmanagerServer): async def get_alerts(self, request: web_request.Request) -> web.Response: return web.Response( body=json.dumps( [ { "fingerprint": "fingerprint1", "labels": {"alertname": "alert1"}, "status": {"state": "active"}, }, { "fingerprint": "fingerprint2", "labels": {"alertname": "alert2"}, "status": { "state": "suppressed", "silencedBy": ["silence1", "silence2"], }, }, ] ), content_type="application/json", ) async def get_silences(self, request: web_request.Request) -> web.Response: return web.Response( body=json.dumps(self.app["silences"]), content_type="application/json" ) async def create_silence(self, request: web_request.Request) -> web.Response: silences = self.app["silences"] silence = await request.json() if silence["id"] is None: silence["id"] = "silence1" silence["status"] = {"state": "active"} silences.append(silence) return web.Response( body=json.dumps({"silenceID": silence["id"]}), content_type="application/json", ) async def delete_silence(self, request: web_request.Request) -> web.Response: silence_id = request.match_info["silence"] for i, silence in enumerate(self.app["silences"]): if silence["id"] == silence_id: del self.app["silences"][i] break return web.Response(status=200, content_type="application/json") class FakeAlertmanagerServerWithoutAlert(FakeAlertmanagerServer): async def get_alerts(self, request: web_request.Request) -> web.Response: return web.Response(body=json.dumps([]), content_type="application/json") class FakeAlertmanagerServerWithErrorAlerts(FakeAlertmanagerServer): async def get_alerts(self, request: web_request.Request) -> web.Response: return web.Response(status=500) class FakeAlertmanagerServerWithoutSilence(FakeAlertmanagerServer): def __init__(self) -> None: super().__init__() self.app["silences"] = [] class FakeAlertmanagerServerWithErrorSilences(FakeAlertmanagerServer): async def get_silences(self, request: web_request.Request) -> web.Response: return web.Response(status=500) class FakeAlertmanagerServerWithErrorCreateSilence(FakeAlertmanagerServer): async def create_silence(self, request: web_request.Request) -> web.Response: return web.Response(status=500) class FakeAlertmanagerServerWithErrorDeleteSilence(FakeAlertmanagerServer): async def delete_silence(self, request: web_request.Request) -> web.Response: return web.Response(status=500) class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase): async def asyncSetUp(self) -> None: self.fake_fingerprints = Mock(return_value=["fingerprint1", "fingerprint2"]) self.fake_cache = MagicMock(spec=Cache) self.fake_cache.__getitem__ = self.fake_fingerprints async def test_get_alerts_happy(self) -> None: async with FakeAlertmanagerServer() as fake_alertmanager_server: port = fake_alertmanager_server.port alertmanager = AlertmanagerClient( f"http://localhost:{port}", self.fake_cache ) async with aiotools.closing_async(alertmanager): alerts = await alertmanager.get_alerts() self.assertEqual( [ { "fingerprint": "fingerprint1", "labels": {"alertname": "alert1"}, "status": {"state": "active"}, }, { "fingerprint": "fingerprint2", "labels": {"alertname": "alert2"}, "status": { "state": "suppressed", "silencedBy": ["silence1", "silence2"], }, }, ], alerts, ) async def test_get_alerts_empty(self) -> None: async with FakeAlertmanagerServerWithoutAlert() as fake_alertmanager_server: port = fake_alertmanager_server.port alertmanager = AlertmanagerClient( f"http://localhost:{port}", self.fake_cache ) async with aiotools.closing_async(alertmanager): alerts = await alertmanager.get_alerts() self.assertEqual([], alerts) async def test_get_alerts_raise_alertmanager_error(self) -> None: async with FakeAlertmanagerServerWithErrorAlerts() as fake_alertmanager_server: port = fake_alertmanager_server.port alertmanager = AlertmanagerClient( f"http://localhost:{port}", self.fake_cache ) async with aiotools.closing_async(alertmanager): with self.assertRaises(AlertmanagerServerError): await alertmanager.get_alerts() async def test_get_silences_happy(self) -> None: async with FakeAlertmanagerServer() as fake_alertmanager_server: port = fake_alertmanager_server.port alertmanager = AlertmanagerClient( f"http://localhost:{port}", self.fake_cache ) async with aiotools.closing_async(alertmanager): silences = await alertmanager.get_silences() self.assertEqual( [ {"id": "silence1", "status": {"state": "active"}}, {"id": "silence2", "status": {"state": "expired"}}, ], silences, ) async def test_get_silences_empty(self) -> None: async with FakeAlertmanagerServerWithoutSilence() as fake_alertmanager_server: port = fake_alertmanager_server.port alertmanager = AlertmanagerClient( f"http://localhost:{port}", self.fake_cache ) async with aiotools.closing_async(alertmanager): silences = await alertmanager.get_silences() self.assertEqual([], silences) async def test_get_silences_raise_alertmanager_error(self) -> None: async with FakeAlertmanagerServerWithErrorSilences() as fake_alertmanager_server: port = fake_alertmanager_server.port alertmanager = AlertmanagerClient( f"http://localhost:{port}", self.fake_cache ) async with aiotools.closing_async(alertmanager): with self.assertRaises(AlertmanagerServerError): await alertmanager.get_silences() async def test_get_alert_happy(self) -> None: async with FakeAlertmanagerServer() as fake_alertmanager_server: port = fake_alertmanager_server.port alertmanager = AlertmanagerClient( f"http://localhost:{port}", self.fake_cache ) async with aiotools.closing_async(alertmanager): alert = await alertmanager.get_alert("fingerprint1") self.assertEqual( { "fingerprint": "fingerprint1", "labels": {"alertname": "alert1"}, "status": {"state": "active"}, }, alert, ) async def test_get_alert_raise_alert_not_found(self) -> None: async with FakeAlertmanagerServerWithoutAlert() as fake_alertmanager_server: port = fake_alertmanager_server.port alertmanager = AlertmanagerClient( f"http://localhost:{port}", self.fake_cache ) async with aiotools.closing_async(alertmanager): with self.assertRaises(AlertNotFoundError): await alertmanager.get_alert("fingerprint1") async def test_get_alert_raise_alertmanager_error(self) -> None: async with FakeAlertmanagerServerWithErrorAlerts() as fake_alertmanager_server: port = fake_alertmanager_server.port alertmanager = AlertmanagerClient( f"http://localhost:{port}", self.fake_cache ) async with aiotools.closing_async(alertmanager): with self.assertRaises(AlertmanagerServerError): await alertmanager.get_alert("fingerprint1") async def test_get_silence_happy(self) -> None: async with FakeAlertmanagerServer() as fake_alertmanager_server: port = fake_alertmanager_server.port alertmanager = AlertmanagerClient( f"http://localhost:{port}", self.fake_cache ) async with aiotools.closing_async(alertmanager): silence1 = await alertmanager.get_silence("silence1") silence2 = await alertmanager.get_silence("silence2") self.assertEqual( {"id": "silence1", "status": {"state": "active"}}, silence1, ) self.assertEqual( {"id": "silence2", "status": {"state": "expired"}}, silence2, ) async def test_get_silence_raise_silence_not_found(self) -> None: async with FakeAlertmanagerServerWithoutSilence() as fake_alertmanager_server: port = fake_alertmanager_server.port alertmanager = AlertmanagerClient( f"http://localhost:{port}", self.fake_cache ) async with aiotools.closing_async(alertmanager): with self.assertRaises(SilenceNotFoundError): await alertmanager.get_silence("silence1") async def test_get_silence_raise_alertmanager_error(self) -> None: async with FakeAlertmanagerServerWithErrorSilences() as fake_alertmanager_server: port = fake_alertmanager_server.port alertmanager = AlertmanagerClient( f"http://localhost:{port}", self.fake_cache ) async with aiotools.closing_async(alertmanager): with self.assertRaises(AlertmanagerServerError): await alertmanager.get_silence("silence1") @freeze_time(datetime.utcfromtimestamp(0)) async def test_create_silence(self) -> None: async with FakeAlertmanagerServerWithoutSilence() as fake_alertmanager_server: port = fake_alertmanager_server.port alertmanager = AlertmanagerClient( f"http://localhost:{port}", self.fake_cache ) async with aiotools.closing_async(alertmanager): silence_id = await alertmanager.create_silence( "fingerprint1", "user", 86400 ) silence = await alertmanager.get_silence("silence1") self.assertEqual("silence1", silence_id) self.assertEqual( { "id": "silence1", "status": {"state": "active"}, "matchers": [ { "name": "alertname", "value": "alert1", "isRegex": False, "isEqual": True, } ], "createdBy": "user", "startsAt": "1970-01-01T00:00:00", "endsAt": "1970-01-02T00:00:00", "comment": "Acknowledge alert from Matrix", }, silence, ) @freeze_time(datetime.utcfromtimestamp(0)) async def test_create_silence_with_id(self) -> None: async with FakeAlertmanagerServerWithoutSilence() as fake_alertmanager_server: port = fake_alertmanager_server.port alertmanager = AlertmanagerClient( f"http://localhost:{port}", self.fake_cache ) async with aiotools.closing_async(alertmanager): silence_id = await alertmanager.create_silence( "fingerprint1", "user", 86400, "silence2" ) silence = await alertmanager.get_silence("silence2") self.assertEqual("silence2", silence_id) self.assertEqual( { "id": "silence2", "status": {"state": "active"}, "matchers": [ { "name": "alertname", "value": "alert1", "isRegex": False, "isEqual": True, } ], "createdBy": "user", "startsAt": "1970-01-01T00:00:00", "endsAt": "1970-01-02T00:00:00", "comment": "Acknowledge alert from Matrix", }, silence, ) @freeze_time(datetime.utcfromtimestamp(0)) async def test_create_silence_with_indefinite_duration(self) -> None: async with FakeAlertmanagerServerWithoutSilence() as fake_alertmanager_server: port = fake_alertmanager_server.port alertmanager = AlertmanagerClient( f"http://localhost:{port}", self.fake_cache ) async with aiotools.closing_async(alertmanager): silence_id = await alertmanager.create_silence("fingerprint1", "user") silence = await alertmanager.get_silence("silence1") self.assertEqual("silence1", silence_id) self.assertEqual( { "id": "silence1", "status": {"state": "active"}, "matchers": [ { "name": "alertname", "value": "alert1", "isRegex": False, "isEqual": True, } ], "createdBy": "user", "startsAt": "1970-01-01T00:00:00", "endsAt": "1980-01-01T00:00:00", "comment": "Acknowledge alert from Matrix", }, silence, ) @freeze_time(datetime.utcfromtimestamp(0)) async def test_create_silence_with_max_duration(self) -> None: async with FakeAlertmanagerServerWithoutSilence() as fake_alertmanager_server: port = fake_alertmanager_server.port alertmanager = AlertmanagerClient( f"http://localhost:{port}", self.fake_cache ) async with aiotools.closing_async(alertmanager): silence_id = await alertmanager.create_silence( "fingerprint1", "user", int(timedelta.max.total_seconds()) ) silence = await alertmanager.get_silence("silence1") self.assertEqual("silence1", silence_id) self.assertEqual( { "id": "silence1", "status": {"state": "active"}, "matchers": [ { "name": "alertname", "value": "alert1", "isRegex": False, "isEqual": True, } ], "createdBy": "user", "startsAt": "1970-01-01T00:00:00", "endsAt": "1980-01-01T00:00:00", "comment": "Acknowledge alert from Matrix", }, silence, ) @freeze_time(datetime.utcfromtimestamp(0)) async def test_create_silence_raise_duration_error(self) -> None: async with FakeAlertmanagerServerWithoutSilence() as fake_alertmanager_server: port = fake_alertmanager_server.port alertmanager = AlertmanagerClient( f"http://localhost:{port}", self.fake_cache ) async with aiotools.closing_async(alertmanager): with self.assertRaises(InvalidDurationError): await alertmanager.create_silence("fingerprint1", "user", -1) async def test_create_silence_raise_alert_not_found(self) -> None: async with FakeAlertmanagerServerWithoutAlert() as fake_alertmanager_server: port = fake_alertmanager_server.port alertmanager = AlertmanagerClient( f"http://localhost:{port}", self.fake_cache ) async with aiotools.closing_async(alertmanager): with self.assertRaises(AlertNotFoundError): await alertmanager.create_silence("fingerprint1", "user") async def test_create_silence_raise_alertmanager_error(self) -> None: async with FakeAlertmanagerServerWithErrorCreateSilence() as fake_alertmanager_server: port = fake_alertmanager_server.port alertmanager = AlertmanagerClient( f"http://localhost:{port}", self.fake_cache ) async with aiotools.closing_async(alertmanager): await alertmanager.get_alert("fingerprint1") with self.assertRaises(AlertmanagerServerError): await alertmanager.create_silence("fingerprint1", "user") async def test_delete_silence(self) -> None: async with FakeAlertmanagerServer() as fake_alertmanager_server: port = fake_alertmanager_server.port alertmanager = AlertmanagerClient( f"http://localhost:{port}", self.fake_cache ) async with aiotools.closing_async(alertmanager): await alertmanager.delete_silence("silence1") silences = await alertmanager.get_silences() self.assertEqual([{"id": "silence2", "status": {"state": "expired"}}], silences) async def test_delete_silence_raise_silence_expired(self) -> None: async with FakeAlertmanagerServer() as fake_alertmanager_server: port = fake_alertmanager_server.port alertmanager = AlertmanagerClient( f"http://localhost:{port}", self.fake_cache ) async with aiotools.closing_async(alertmanager): with self.assertRaises(SilenceExpiredError): await alertmanager.delete_silence("silence2") silences = await alertmanager.get_silences() self.assertEqual( [ {"id": "silence1", "status": {"state": "active"}}, {"id": "silence2", "status": {"state": "expired"}}, ], silences, ) async def test_delete_silence_raise_alertmanager_error(self) -> None: async with FakeAlertmanagerServerWithErrorDeleteSilence() as fake_alertmanager_server: port = fake_alertmanager_server.port alertmanager = AlertmanagerClient( f"http://localhost:{port}", self.fake_cache ) async with aiotools.closing_async(alertmanager): await alertmanager.get_alert("fingerprint1") with self.assertRaises(AlertmanagerServerError): await alertmanager.delete_silence("silence1") async def test_find_alert_happy(self) -> None: alertmanager = AlertmanagerClient("http://localhost", self.fake_cache) alert = alertmanager._find_alert( "fingerprint1", [{"fingerprint": "fingerprint1"}] ) self.assertEqual({"fingerprint": "fingerprint1"}, alert) async def test_find_alert_raise_alert_not_found(self) -> None: alertmanager = AlertmanagerClient("http://localhost", self.fake_cache) with self.assertRaises(AlertNotFoundError): alertmanager._find_alert("fingerprint1", []) with self.assertRaises(AlertNotFoundError): alertmanager._find_alert("fingerprint2", [{"fingerprint": "fingerprint1"}]) async def test_find_silence_happy(self) -> None: alertmanager = AlertmanagerClient("http://localhost", self.fake_cache) silence = alertmanager._find_silence("silence1", [{"id": "silence1"}]) self.assertEqual({"id": "silence1"}, silence) async def test_find_silence_raise_silence_not_found(self) -> None: alertmanager = AlertmanagerClient("http://localhost", self.fake_cache) with self.assertRaises(SilenceNotFoundError): alertmanager._find_silence("silence1", []) with self.assertRaises(SilenceNotFoundError): alertmanager._find_silence("silence2", [{"id": "silence1"}]) if __name__ == "__main__": unittest.main()