578 lines
22 KiB
Python
578 lines
22 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import unittest
|
|
from datetime import datetime, timedelta
|
|
from typing import Any
|
|
from unittest.mock import MagicMock, Mock
|
|
|
|
import aiohttp
|
|
import aiohttp.test_utils
|
|
import aiotools
|
|
from aiohttp import web, web_request
|
|
from diskcache import Cache
|
|
from freezegun import freeze_time
|
|
|
|
from matrix_alertbot.alertmanager import AlertmanagerClient
|
|
from matrix_alertbot.errors import (
|
|
AlertmanagerServerError,
|
|
AlertNotFoundError,
|
|
InvalidDurationError,
|
|
SilenceExpiredError,
|
|
SilenceNotFoundError,
|
|
)
|
|
|
|
|
|
class FakeTimeDelta:
|
|
def __init__(self, seconds: int) -> None:
|
|
self.seconds = seconds
|
|
|
|
def __radd__(self, other: Any) -> datetime:
|
|
return datetime.utcfromtimestamp(self.seconds)
|
|
|
|
|
|
class AbstractFakeAlertmanagerServer:
|
|
def __init__(self) -> None:
|
|
self.app = web.Application()
|
|
self.app.router.add_routes(
|
|
[
|
|
web.get("/api/v2/alerts", self.get_alerts),
|
|
web.get("/api/v2/silences", self.get_silences),
|
|
web.post("/api/v2/silences", self.create_silence),
|
|
web.delete("/api/v2/silence/{silence}", self.delete_silence),
|
|
]
|
|
)
|
|
self.app["silences"] = [
|
|
{"id": "silence1", "state": "active"},
|
|
{"id": "silence2", "state": "expired"},
|
|
]
|
|
|
|
self.runner = web.AppRunner(self.app)
|
|
|
|
async def __aenter__(self) -> AbstractFakeAlertmanagerServer:
|
|
await self.start()
|
|
return self
|
|
|
|
async def __aexit__(self, *args: Any) -> None:
|
|
await self.stop()
|
|
|
|
async def start(self) -> None:
|
|
self.port = aiohttp.test_utils.unused_port()
|
|
|
|
await self.runner.setup()
|
|
|
|
site = web.TCPSite(self.runner, "localhost", self.port)
|
|
await site.start()
|
|
|
|
async def stop(self) -> None:
|
|
await self.runner.cleanup()
|
|
|
|
async def get_alerts(self, request: web_request.Request) -> web.Response:
|
|
raise NotImplementedError
|
|
|
|
async def get_silences(self, request: web_request.Request) -> web.Response:
|
|
raise NotImplementedError
|
|
|
|
async def create_silence(self, request: web_request.Request) -> web.Response:
|
|
raise NotImplementedError
|
|
|
|
async def delete_silence(self, request: web_request.Request) -> web.Response:
|
|
raise NotImplementedError
|
|
|
|
|
|
class FakeAlertmanagerServer(AbstractFakeAlertmanagerServer):
|
|
async def get_alerts(self, request: web_request.Request) -> web.Response:
|
|
return web.Response(
|
|
body=json.dumps(
|
|
[
|
|
{
|
|
"fingerprint": "fingerprint1",
|
|
"labels": {"alertname": "alert1"},
|
|
"status": {"state": "active"},
|
|
},
|
|
{
|
|
"fingerprint": "fingerprint2",
|
|
"labels": {"alertname": "alert2"},
|
|
"status": {
|
|
"state": "suppressed",
|
|
"silencedBy": ["silence1", "silence2"],
|
|
},
|
|
},
|
|
]
|
|
),
|
|
content_type="application/json",
|
|
)
|
|
|
|
async def get_silences(self, request: web_request.Request) -> web.Response:
|
|
return web.Response(
|
|
body=json.dumps(self.app["silences"]), content_type="application/json"
|
|
)
|
|
|
|
async def create_silence(self, request: web_request.Request) -> web.Response:
|
|
silences = self.app["silences"]
|
|
|
|
silence = await request.json()
|
|
if silence["id"] is None:
|
|
silence["id"] = "silence1"
|
|
silence["state"] = "active"
|
|
silences.append(silence)
|
|
|
|
return web.Response(
|
|
body=json.dumps({"silenceID": silence["id"]}),
|
|
content_type="application/json",
|
|
)
|
|
|
|
async def delete_silence(self, request: web_request.Request) -> web.Response:
|
|
silence_id = request.match_info["silence"]
|
|
for i, silence in enumerate(self.app["silences"]):
|
|
if silence["id"] == silence_id:
|
|
del self.app["silences"][i]
|
|
break
|
|
|
|
return web.Response(status=200, content_type="application/json")
|
|
|
|
|
|
class FakeAlertmanagerServerWithoutAlert(FakeAlertmanagerServer):
|
|
async def get_alerts(self, request: web_request.Request) -> web.Response:
|
|
return web.Response(body=json.dumps([]), content_type="application/json")
|
|
|
|
|
|
class FakeAlertmanagerServerWithErrorAlerts(FakeAlertmanagerServer):
|
|
async def get_alerts(self, request: web_request.Request) -> web.Response:
|
|
return web.Response(status=500)
|
|
|
|
|
|
class FakeAlertmanagerServerWithoutSilence(FakeAlertmanagerServer):
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
self.app["silences"] = []
|
|
|
|
|
|
class FakeAlertmanagerServerWithErrorSilences(FakeAlertmanagerServer):
|
|
async def get_silences(self, request: web_request.Request) -> web.Response:
|
|
return web.Response(status=500)
|
|
|
|
|
|
class FakeAlertmanagerServerWithErrorCreateSilence(FakeAlertmanagerServer):
|
|
async def create_silence(self, request: web_request.Request) -> web.Response:
|
|
return web.Response(status=500)
|
|
|
|
|
|
class FakeAlertmanagerServerWithErrorDeleteSilence(FakeAlertmanagerServer):
|
|
async def delete_silence(self, request: web_request.Request) -> web.Response:
|
|
return web.Response(status=500)
|
|
|
|
|
|
class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
|
|
async def asyncSetUp(self) -> None:
|
|
self.fake_fingerprints = Mock(return_value=["fingerprint1", "fingerprint2"])
|
|
self.fake_cache = MagicMock(spec=Cache)
|
|
self.fake_cache.__getitem__ = self.fake_fingerprints
|
|
|
|
async def test_get_alerts_happy(self) -> None:
|
|
async with FakeAlertmanagerServer() as fake_alertmanager_server:
|
|
port = fake_alertmanager_server.port
|
|
alertmanager = AlertmanagerClient(
|
|
f"http://localhost:{port}", self.fake_cache
|
|
)
|
|
async with aiotools.closing_async(alertmanager):
|
|
alerts = await alertmanager.get_alerts()
|
|
|
|
self.assertEqual(
|
|
[
|
|
{
|
|
"fingerprint": "fingerprint1",
|
|
"labels": {"alertname": "alert1"},
|
|
"status": {"state": "active"},
|
|
},
|
|
{
|
|
"fingerprint": "fingerprint2",
|
|
"labels": {"alertname": "alert2"},
|
|
"status": {
|
|
"state": "suppressed",
|
|
"silencedBy": ["silence1", "silence2"],
|
|
},
|
|
},
|
|
],
|
|
alerts,
|
|
)
|
|
|
|
async def test_get_alerts_empty(self) -> None:
|
|
async with FakeAlertmanagerServerWithoutAlert() as fake_alertmanager_server:
|
|
port = fake_alertmanager_server.port
|
|
alertmanager = AlertmanagerClient(
|
|
f"http://localhost:{port}", self.fake_cache
|
|
)
|
|
async with aiotools.closing_async(alertmanager):
|
|
alerts = await alertmanager.get_alerts()
|
|
|
|
self.assertEqual([], alerts)
|
|
|
|
async def test_get_alerts_raise_alertmanager_error(self) -> None:
|
|
async with FakeAlertmanagerServerWithErrorAlerts() as fake_alertmanager_server:
|
|
port = fake_alertmanager_server.port
|
|
alertmanager = AlertmanagerClient(
|
|
f"http://localhost:{port}", self.fake_cache
|
|
)
|
|
async with aiotools.closing_async(alertmanager):
|
|
with self.assertRaises(AlertmanagerServerError):
|
|
await alertmanager.get_alerts()
|
|
|
|
async def test_get_silences_happy(self) -> None:
|
|
async with FakeAlertmanagerServer() as fake_alertmanager_server:
|
|
port = fake_alertmanager_server.port
|
|
alertmanager = AlertmanagerClient(
|
|
f"http://localhost:{port}", self.fake_cache
|
|
)
|
|
async with aiotools.closing_async(alertmanager):
|
|
silences = await alertmanager.get_silences()
|
|
|
|
self.assertEqual(
|
|
[
|
|
{"id": "silence1", "state": "active"},
|
|
{"id": "silence2", "state": "expired"},
|
|
],
|
|
silences,
|
|
)
|
|
|
|
async def test_get_silences_empty(self) -> None:
|
|
async with FakeAlertmanagerServerWithoutSilence() as fake_alertmanager_server:
|
|
port = fake_alertmanager_server.port
|
|
alertmanager = AlertmanagerClient(
|
|
f"http://localhost:{port}", self.fake_cache
|
|
)
|
|
async with aiotools.closing_async(alertmanager):
|
|
silences = await alertmanager.get_silences()
|
|
|
|
self.assertEqual([], silences)
|
|
|
|
async def test_get_silences_raise_alertmanager_error(self) -> None:
|
|
async with FakeAlertmanagerServerWithErrorSilences() as fake_alertmanager_server:
|
|
port = fake_alertmanager_server.port
|
|
alertmanager = AlertmanagerClient(
|
|
f"http://localhost:{port}", self.fake_cache
|
|
)
|
|
async with aiotools.closing_async(alertmanager):
|
|
with self.assertRaises(AlertmanagerServerError):
|
|
await alertmanager.get_silences()
|
|
|
|
async def test_get_alert_happy(self) -> None:
|
|
async with FakeAlertmanagerServer() as fake_alertmanager_server:
|
|
port = fake_alertmanager_server.port
|
|
alertmanager = AlertmanagerClient(
|
|
f"http://localhost:{port}", self.fake_cache
|
|
)
|
|
async with aiotools.closing_async(alertmanager):
|
|
alert = await alertmanager.get_alert("fingerprint1")
|
|
|
|
self.assertEqual(
|
|
{
|
|
"fingerprint": "fingerprint1",
|
|
"labels": {"alertname": "alert1"},
|
|
"status": {"state": "active"},
|
|
},
|
|
alert,
|
|
)
|
|
|
|
async def test_get_alert_raise_alert_not_found(self) -> None:
|
|
async with FakeAlertmanagerServerWithoutAlert() as fake_alertmanager_server:
|
|
port = fake_alertmanager_server.port
|
|
alertmanager = AlertmanagerClient(
|
|
f"http://localhost:{port}", self.fake_cache
|
|
)
|
|
async with aiotools.closing_async(alertmanager):
|
|
with self.assertRaises(AlertNotFoundError):
|
|
await alertmanager.get_alert("fingerprint1")
|
|
|
|
async def test_get_alert_raise_alertmanager_error(self) -> None:
|
|
async with FakeAlertmanagerServerWithErrorAlerts() as fake_alertmanager_server:
|
|
port = fake_alertmanager_server.port
|
|
alertmanager = AlertmanagerClient(
|
|
f"http://localhost:{port}", self.fake_cache
|
|
)
|
|
async with aiotools.closing_async(alertmanager):
|
|
with self.assertRaises(AlertmanagerServerError):
|
|
await alertmanager.get_alert("fingerprint1")
|
|
|
|
async def test_get_silence_happy(self) -> None:
|
|
async with FakeAlertmanagerServer() as fake_alertmanager_server:
|
|
port = fake_alertmanager_server.port
|
|
alertmanager = AlertmanagerClient(
|
|
f"http://localhost:{port}", self.fake_cache
|
|
)
|
|
async with aiotools.closing_async(alertmanager):
|
|
silence1 = await alertmanager.get_silence("silence1")
|
|
silence2 = await alertmanager.get_silence("silence2")
|
|
|
|
self.assertEqual(
|
|
{"id": "silence1", "state": "active"},
|
|
silence1,
|
|
)
|
|
self.assertEqual(
|
|
{"id": "silence2", "state": "expired"},
|
|
silence2,
|
|
)
|
|
|
|
async def test_get_silence_raise_silence_not_found(self) -> None:
|
|
async with FakeAlertmanagerServerWithoutSilence() as fake_alertmanager_server:
|
|
port = fake_alertmanager_server.port
|
|
alertmanager = AlertmanagerClient(
|
|
f"http://localhost:{port}", self.fake_cache
|
|
)
|
|
async with aiotools.closing_async(alertmanager):
|
|
with self.assertRaises(SilenceNotFoundError):
|
|
await alertmanager.get_silence("silence1")
|
|
|
|
async def test_get_silence_raise_alertmanager_error(self) -> None:
|
|
async with FakeAlertmanagerServerWithErrorSilences() as fake_alertmanager_server:
|
|
port = fake_alertmanager_server.port
|
|
alertmanager = AlertmanagerClient(
|
|
f"http://localhost:{port}", self.fake_cache
|
|
)
|
|
async with aiotools.closing_async(alertmanager):
|
|
with self.assertRaises(AlertmanagerServerError):
|
|
await alertmanager.get_silence("silence1")
|
|
|
|
@freeze_time(datetime.utcfromtimestamp(0))
|
|
async def test_create_silence(self) -> None:
|
|
async with FakeAlertmanagerServerWithoutSilence() as fake_alertmanager_server:
|
|
port = fake_alertmanager_server.port
|
|
alertmanager = AlertmanagerClient(
|
|
f"http://localhost:{port}", self.fake_cache
|
|
)
|
|
async with aiotools.closing_async(alertmanager):
|
|
silence_id = await alertmanager.create_silence(
|
|
"fingerprint1", "user", 86400
|
|
)
|
|
silence = await alertmanager.get_silence("silence1")
|
|
|
|
self.assertEqual("silence1", silence_id)
|
|
self.assertEqual(
|
|
{
|
|
"id": "silence1",
|
|
"state": "active",
|
|
"matchers": [
|
|
{
|
|
"name": "alertname",
|
|
"value": "alert1",
|
|
"isRegex": False,
|
|
"isEqual": True,
|
|
}
|
|
],
|
|
"createdBy": "user",
|
|
"startsAt": "1970-01-01T00:00:00",
|
|
"endsAt": "1970-01-02T00:00:00",
|
|
"comment": "Acknowledge alert from Matrix",
|
|
},
|
|
silence,
|
|
)
|
|
|
|
@freeze_time(datetime.utcfromtimestamp(0))
|
|
async def test_create_silence_with_id(self) -> None:
|
|
async with FakeAlertmanagerServerWithoutSilence() as fake_alertmanager_server:
|
|
port = fake_alertmanager_server.port
|
|
alertmanager = AlertmanagerClient(
|
|
f"http://localhost:{port}", self.fake_cache
|
|
)
|
|
async with aiotools.closing_async(alertmanager):
|
|
silence_id = await alertmanager.create_silence(
|
|
"fingerprint1", "user", 86400, "silence2"
|
|
)
|
|
silence = await alertmanager.get_silence("silence2")
|
|
|
|
self.assertEqual("silence2", silence_id)
|
|
self.assertEqual(
|
|
{
|
|
"id": "silence2",
|
|
"state": "active",
|
|
"matchers": [
|
|
{
|
|
"name": "alertname",
|
|
"value": "alert1",
|
|
"isRegex": False,
|
|
"isEqual": True,
|
|
}
|
|
],
|
|
"createdBy": "user",
|
|
"startsAt": "1970-01-01T00:00:00",
|
|
"endsAt": "1970-01-02T00:00:00",
|
|
"comment": "Acknowledge alert from Matrix",
|
|
},
|
|
silence,
|
|
)
|
|
|
|
@freeze_time(datetime.utcfromtimestamp(0))
|
|
async def test_create_silence_with_indefinite_duration(self) -> None:
|
|
async with FakeAlertmanagerServerWithoutSilence() as fake_alertmanager_server:
|
|
port = fake_alertmanager_server.port
|
|
alertmanager = AlertmanagerClient(
|
|
f"http://localhost:{port}", self.fake_cache
|
|
)
|
|
async with aiotools.closing_async(alertmanager):
|
|
silence_id = await alertmanager.create_silence("fingerprint1", "user")
|
|
silence = await alertmanager.get_silence("silence1")
|
|
|
|
self.assertEqual("silence1", silence_id)
|
|
self.assertEqual(
|
|
{
|
|
"id": "silence1",
|
|
"state": "active",
|
|
"matchers": [
|
|
{
|
|
"name": "alertname",
|
|
"value": "alert1",
|
|
"isRegex": False,
|
|
"isEqual": True,
|
|
}
|
|
],
|
|
"createdBy": "user",
|
|
"startsAt": "1970-01-01T00:00:00",
|
|
"endsAt": "1980-01-01T00:00:00",
|
|
"comment": "Acknowledge alert from Matrix",
|
|
},
|
|
silence,
|
|
)
|
|
|
|
@freeze_time(datetime.utcfromtimestamp(0))
|
|
async def test_create_silence_with_max_duration(self) -> None:
|
|
async with FakeAlertmanagerServerWithoutSilence() as fake_alertmanager_server:
|
|
port = fake_alertmanager_server.port
|
|
alertmanager = AlertmanagerClient(
|
|
f"http://localhost:{port}", self.fake_cache
|
|
)
|
|
async with aiotools.closing_async(alertmanager):
|
|
silence_id = await alertmanager.create_silence(
|
|
"fingerprint1", "user", int(timedelta.max.total_seconds())
|
|
)
|
|
silence = await alertmanager.get_silence("silence1")
|
|
|
|
self.assertEqual("silence1", silence_id)
|
|
self.assertEqual(
|
|
{
|
|
"id": "silence1",
|
|
"state": "active",
|
|
"matchers": [
|
|
{
|
|
"name": "alertname",
|
|
"value": "alert1",
|
|
"isRegex": False,
|
|
"isEqual": True,
|
|
}
|
|
],
|
|
"createdBy": "user",
|
|
"startsAt": "1970-01-01T00:00:00",
|
|
"endsAt": "1980-01-01T00:00:00",
|
|
"comment": "Acknowledge alert from Matrix",
|
|
},
|
|
silence,
|
|
)
|
|
|
|
@freeze_time(datetime.utcfromtimestamp(0))
|
|
async def test_create_silence_raise_duration_error(self) -> None:
|
|
async with FakeAlertmanagerServerWithoutSilence() as fake_alertmanager_server:
|
|
port = fake_alertmanager_server.port
|
|
alertmanager = AlertmanagerClient(
|
|
f"http://localhost:{port}", self.fake_cache
|
|
)
|
|
async with aiotools.closing_async(alertmanager):
|
|
with self.assertRaises(InvalidDurationError):
|
|
await alertmanager.create_silence("fingerprint1", "user", -1)
|
|
|
|
async def test_create_silence_raise_alert_not_found(self) -> None:
|
|
async with FakeAlertmanagerServerWithoutAlert() as fake_alertmanager_server:
|
|
port = fake_alertmanager_server.port
|
|
alertmanager = AlertmanagerClient(
|
|
f"http://localhost:{port}", self.fake_cache
|
|
)
|
|
async with aiotools.closing_async(alertmanager):
|
|
with self.assertRaises(AlertNotFoundError):
|
|
await alertmanager.create_silence("fingerprint1", "user")
|
|
|
|
async def test_create_silence_raise_alertmanager_error(self) -> None:
|
|
async with FakeAlertmanagerServerWithErrorCreateSilence() as fake_alertmanager_server:
|
|
port = fake_alertmanager_server.port
|
|
alertmanager = AlertmanagerClient(
|
|
f"http://localhost:{port}", self.fake_cache
|
|
)
|
|
async with aiotools.closing_async(alertmanager):
|
|
await alertmanager.get_alert("fingerprint1")
|
|
|
|
with self.assertRaises(AlertmanagerServerError):
|
|
await alertmanager.create_silence("fingerprint1", "user")
|
|
|
|
async def test_delete_silence(self) -> None:
|
|
async with FakeAlertmanagerServer() as fake_alertmanager_server:
|
|
port = fake_alertmanager_server.port
|
|
alertmanager = AlertmanagerClient(
|
|
f"http://localhost:{port}", self.fake_cache
|
|
)
|
|
async with aiotools.closing_async(alertmanager):
|
|
await alertmanager.delete_silence("silence1")
|
|
silences = await alertmanager.get_silences()
|
|
|
|
self.assertEqual([{"id": "silence2", "state": "expired"}], silences)
|
|
|
|
async def test_delete_silence_raise_silence_expired(self) -> None:
|
|
async with FakeAlertmanagerServer() as fake_alertmanager_server:
|
|
port = fake_alertmanager_server.port
|
|
alertmanager = AlertmanagerClient(
|
|
f"http://localhost:{port}", self.fake_cache
|
|
)
|
|
async with aiotools.closing_async(alertmanager):
|
|
with self.assertRaises(SilenceExpiredError):
|
|
await alertmanager.delete_silence("silence2")
|
|
silences = await alertmanager.get_silences()
|
|
|
|
self.assertEqual(
|
|
[
|
|
{"id": "silence1", "state": "active"},
|
|
{"id": "silence2", "state": "expired"},
|
|
],
|
|
silences,
|
|
)
|
|
|
|
async def test_delete_silence_raise_alertmanager_error(self) -> None:
|
|
async with FakeAlertmanagerServerWithErrorDeleteSilence() as fake_alertmanager_server:
|
|
port = fake_alertmanager_server.port
|
|
alertmanager = AlertmanagerClient(
|
|
f"http://localhost:{port}", self.fake_cache
|
|
)
|
|
async with aiotools.closing_async(alertmanager):
|
|
await alertmanager.get_alert("fingerprint1")
|
|
|
|
with self.assertRaises(AlertmanagerServerError):
|
|
await alertmanager.delete_silence("silence1")
|
|
|
|
async def test_find_alert_happy(self) -> None:
|
|
alertmanager = AlertmanagerClient("http://localhost", self.fake_cache)
|
|
alert = alertmanager._find_alert(
|
|
"fingerprint1", [{"fingerprint": "fingerprint1"}]
|
|
)
|
|
self.assertEqual({"fingerprint": "fingerprint1"}, alert)
|
|
|
|
async def test_find_alert_raise_alert_not_found(self) -> None:
|
|
alertmanager = AlertmanagerClient("http://localhost", self.fake_cache)
|
|
|
|
with self.assertRaises(AlertNotFoundError):
|
|
alertmanager._find_alert("fingerprint1", [])
|
|
|
|
with self.assertRaises(AlertNotFoundError):
|
|
alertmanager._find_alert("fingerprint2", [{"fingerprint": "fingerprint1"}])
|
|
|
|
async def test_find_silence_happy(self) -> None:
|
|
alertmanager = AlertmanagerClient("http://localhost", self.fake_cache)
|
|
silence = alertmanager._find_silence("silence1", [{"id": "silence1"}])
|
|
self.assertEqual({"id": "silence1"}, silence)
|
|
|
|
async def test_find_silence_raise_silence_not_found(self) -> None:
|
|
alertmanager = AlertmanagerClient("http://localhost", self.fake_cache)
|
|
|
|
with self.assertRaises(SilenceNotFoundError):
|
|
alertmanager._find_silence("silence1", [])
|
|
|
|
with self.assertRaises(SilenceNotFoundError):
|
|
alertmanager._find_silence("silence2", [{"id": "silence1"}])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|