feature to send alerts in direct message

This commit is contained in:
HgO 2024-08-04 11:08:43 +02:00
parent d277d742e7
commit 13976ea254
19 changed files with 1211 additions and 136 deletions

View file

@ -39,6 +39,16 @@ matrix:
# Default is listed here.
allowed_reactions: [🤫, 😶, 🤐, 🙊, 🔇, 🔕]
dm:
filter_labels:
matrix: dm
select_label: uuid
room_title: Alerts for {user}
users:
- matrix_id: "@user:matrix.example.com"
user_id:
- ec76b3e6-b49c-46c3-bd35-a329eaeafc4c
webhook:
# Address and port for which the bot should listen to
address: 0.0.0.0

View file

@ -22,15 +22,15 @@
# We use an initial docker container to build all of the runtime dependencies,
# then transfer those dependencies to the container we're going to ship,
# before throwing this one away
ARG PYTHON_VERSION=3.10
FROM python:${PYTHON_VERSION}-alpine as builder
ARG PYTHON_VERSION=3.11
FROM python:${PYTHON_VERSION}-alpine AS builder
##
## Build libolm for matrix-nio e2e support
##
# Install libolm build dependencies
ARG LIBOLM_VERSION=3.2.10
ARG LIBOLM_VERSION=3.2.16
RUN apk add --no-cache \
make \
cmake \
@ -49,6 +49,9 @@ ENV PATH="$VIRTUAL_ENV/bin:$PATH"
RUN python -m venv "${VIRTUAL_ENV}"
WORKDIR "${PROJECT_DIR}"
RUN pip install setuptools
# Build libolm
#
# Also build the libolm python bindings and place them at /python-libs

View file

@ -25,3 +25,9 @@ scrape_configs:
static_configs:
- targets: ["localhost:9090"]
- targets: ['example.com']
labels:
uuid: "d8798985-a1d2-431a-9275-106b9cf63922"
- targets: ['matrix.org']
labels:
uuid: "08119079-ba91-4b23-b9a5-519fcb3b5fad"

View file

@ -6,6 +6,7 @@ groups:
expr: up == 1
labels:
severity: critical
matrix: dm
annotations:
description: 'Instance {{ $labels.instance }} is up'
summary: 'Instance is up'

View file

@ -1,6 +1,7 @@
from __future__ import annotations
import logging
import re
from typing import Dict, Optional
from jinja2 import (
@ -25,6 +26,7 @@ class Alert:
labels: Dict[str, str],
annotations: Dict[str, str],
firing: bool = True,
user_id: Optional[str] = None,
):
self.fingerprint = fingerprint
self.url = url
@ -39,6 +41,8 @@ class Alert:
else:
self.status = "resolved"
self.user_id = user_id
@staticmethod
def from_dict(data: Dict) -> Alert:
return Alert(
@ -57,6 +61,17 @@ class Alert:
def color(self) -> str:
return self.COLORS[self.status]
def match_label(self, label_name: str, pattern: re.Pattern[str]) -> bool:
if label_name not in self.labels:
return False
return pattern.match(self.labels[label_name]) is not None
def match_all_labels(self, labels: Dict[str, re.Pattern[str]]) -> bool:
for label_name, pattern in labels.items():
if not self.match_label(label_name, pattern):
return False
return True
class AlertRenderer:
def __init__(self, template_dir: Optional[str] = None) -> None:

View file

@ -10,6 +10,7 @@ from aiohttp_prometheus_exporter.trace import PrometheusTraceConfig
from diskcache import Cache
from matrix_alertbot.errors import (
AlertmanagerClientError,
AlertmanagerServerError,
AlertNotFoundError,
SilenceExpiredError,
@ -46,12 +47,27 @@ class AlertmanagerClient:
def __init__(self, url: str, cache: Cache) -> None:
self.api_url = f"{url}/api/v2"
self.cache = cache
self.session = None
async def start(self) -> None:
self.session = aiohttp.ClientSession(trace_configs=[PrometheusTraceConfig()])
async def close(self) -> None:
await self.session.close()
if self.session is not None:
await self.session.close()
async def __aenter__(self) -> AlertmanagerClient:
if self.session is None:
await self.start()
return self
async def __aexit__(self, *args) -> None:
await self.close()
async def get_alerts(self) -> List[AlertDict]:
if self.session is None:
raise AlertmanagerClientError("Alertmanager client is not started")
try:
async with self.session.get(f"{self.api_url}/alerts") as response:
response.raise_for_status()
@ -67,6 +83,9 @@ class AlertmanagerClient:
return self._find_alert(fingerprint, alerts)
async def get_silences(self) -> List[SilenceDict]:
if self.session is None:
raise AlertmanagerClientError("Alertmanager client is not started")
try:
async with self.session.get(f"{self.api_url}/silences") as response:
response.raise_for_status()
@ -170,6 +189,9 @@ class AlertmanagerClient:
duration_seconds: Optional[int] = None,
silence_id: Optional[str] = None,
) -> str:
if self.session is None:
raise AlertmanagerClientError("Alertmanager client is not started")
if duration_seconds is None:
duration_delta = DEFAULT_DURATION
elif duration_seconds > MAX_DURATION.total_seconds():
@ -204,6 +226,9 @@ class AlertmanagerClient:
return data["silenceID"]
async def delete_silence(self, silence_id: str) -> None:
if self.session is None:
raise AlertmanagerClientError("Alertmanager client is not started")
silence = await self.get_silence(silence_id)
silence_state = silence["status"]["state"]

View file

@ -77,7 +77,10 @@ class Callbacks:
return
# Ignore messages from unauthorized room
if room.room_id not in self.config.allowed_rooms:
if (
room.room_id not in self.config.allowed_rooms
and event.sender not in self.config.dm_users.inverse
):
return
# Extract the message text
@ -167,7 +170,11 @@ class Callbacks:
event: The invite event.
"""
# Ignore invites from unauthorized room
if room.room_id not in self.config.allowed_rooms:
if (
room.room_id not in self.config.allowed_rooms
and event.sender not in self.config.user_ids
and event.sender not in self.config.dm_users.inverse
):
return
logger.debug(
@ -229,7 +236,10 @@ class Callbacks:
return
# Ignore reactions from unauthorized room
if room.room_id not in self.config.allowed_rooms:
if (
room.room_id not in self.config.allowed_rooms
and event.sender not in self.config.dm_users.inverse
):
return
# Ignore reactions from ourselves
@ -317,7 +327,10 @@ class Callbacks:
return
# Ignore events from unauthorized room
if room.room_id not in self.config.allowed_rooms:
if (
room.room_id not in self.config.allowed_rooms
and event.sender not in self.config.dm_users.inverse
):
return
# Ignore redactions from ourselves
@ -359,7 +372,10 @@ class Callbacks:
event: The encrypted event that we were unable to decrypt.
"""
# Ignore events from unauthorized room
if room.room_id not in self.config.allowed_rooms:
if (
room.room_id not in self.config.allowed_rooms
and event.sender not in self.config.dm_users.inverse
):
return
logger.error(

View file

@ -4,7 +4,7 @@ import logging
import os
import re
import sys
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, TypeVar
import pytimeparse2
import yaml
@ -53,6 +53,29 @@ INSULT_REACTIONS = {
"🖕",
}
K = TypeVar("K")
V = TypeVar("V")
class BiDict(dict[K, V]):
def __init__(self, *args, **kwargs):
super(BiDict, self).__init__(*args, **kwargs)
self.inverse = {}
for key, value in self.items():
self.inverse.setdefault(value, set()).add(key)
def __setitem__(self, key: K, value: V):
if key in self:
self.inverse[self[key]].remove(key)
super(BiDict, self).__setitem__(key, value)
self.inverse.setdefault(value, set()).add(key)
def __delitem__(self, key: K):
self.inverse.setdefault(self[key], set()).remove(key)
if self[key] in self.inverse and not self.inverse[self[key]]:
del self.inverse[self[key]]
super(BiDict, self).__delitem__(key)
class AccountConfig:
def __init__(self, account: Dict[str, str]) -> None:
@ -197,6 +220,23 @@ class Config:
"Supplied both webhook.socket and both webhook.address"
)
self.dm_users: BiDict[str, str] = BiDict()
for user in self._get_cfg(["dm", "users"], default=[]):
for user_id in user["user_id"]:
self.dm_users[user_id] = user["matrix_id"]
self.dm_room_title: str = self._get_cfg(["dm", "room_title"], required=False)
filter_labels: Dict[str, str] = self._get_cfg(
["dm", "filter_labels"], default={}, required=False
)
self.dm_filter_labels: Dict[str, re.Pattern[str]] = {}
for label_name, pattern in filter_labels.items():
self.dm_filter_labels[label_name] = re.compile(pattern)
self.dm_select_label: str = self._get_cfg(
["dm", "select_label"], required=False
)
def _get_cfg(
self,
path: List[str],

View file

@ -59,6 +59,12 @@ class SilenceExtendError(AlertmanagerError):
pass
class AlertmanagerClientError(AlertmanagerError):
"""An error encountered with Alertmanager client."""
pass
class AlertmanagerServerError(AlertmanagerError):
"""An error encountered with Alertmanager server."""

View file

@ -40,7 +40,7 @@ def main() -> None:
webhook_server = Webhook(matrix_client_pool, alertmanager_client, cache, config)
loop = asyncio.get_event_loop()
loop.create_task(matrix_client_pool.switch_active_client())
loop.create_task(alertmanager_client.start())
loop.create_task(webhook_server.start())
for account in config.accounts:
loop.create_task(matrix_client_pool.start(account, config))

View file

@ -10,6 +10,7 @@ from typing import Dict, List, Optional, Tuple
from aiohttp import ClientConnectionError, ServerDisconnectedError
from diskcache import Cache
from nio import RoomPreset, RoomVisibility
from nio.client import AsyncClient, AsyncClientConfig
from nio.events import (
InviteMemberEvent,
@ -23,8 +24,14 @@ from nio.events import (
RoomMessageText,
RoomMessageUnknown,
)
from nio.exceptions import LocalProtocolError
from nio.responses import LoginError, WhoamiError
from nio.exceptions import LocalProtocolError, LocalTransportError
from nio.responses import (
JoinedMembersError,
LoginError,
ProfileGetDisplayNameError,
RoomCreateError,
WhoamiError,
)
import matrix_alertbot.callback
from matrix_alertbot.alertmanager import AlertmanagerClient
@ -51,6 +58,17 @@ class MatrixClientPool:
self.account = next(iter(self._accounts))
self.matrix_client = self._matrix_clients[self.account]
self.dm_rooms = {}
def unactive_user_ids(self):
active_user_id = self.account.id
user_ids = []
for account in self._accounts:
user_id = account.id
if active_user_id is not user_id:
user_ids.append(user_id)
return user_ids
async def switch_active_client(
self,
) -> Optional[Tuple[AsyncClient, AccountConfig]]:
@ -59,6 +77,9 @@ class MatrixClientPool:
if account is self.account:
continue
logger.info(
f"Bot {account.id} | Checking if matrix client is connected"
)
matrix_client = self._matrix_clients[account]
try:
whoami = await matrix_client.whoami()
@ -161,6 +182,113 @@ class MatrixClientPool:
return matrix_client
async def find_existing_dm_rooms(
self, account: AccountConfig, matrix_client: AsyncClient, config: Config
) -> Dict[str, str]:
unactive_user_ids = self.unactive_user_ids()
dm_rooms = {}
for room_id in matrix_client.rooms:
if room_id in config.allowed_rooms:
continue
room_members_response = await matrix_client.joined_members(room_id)
if isinstance(room_members_response, JoinedMembersError):
logger.warning(
f"Bot {account.id} | Cannot get joined members for room {room_id}"
)
continue
room_members = []
for room_member in room_members_response.members:
room_members.append(room_member.user_id)
logger.info(
f"Bot {account.id} | Found {len(room_members)} room members in {room_id}"
)
all_accounts_in_room = True
for user_id in unactive_user_ids:
if user_id not in room_members:
all_accounts_in_room = False
if not all_accounts_in_room:
continue
logger.info(f"Bot {account.id} | All matrix clients are in {room_id}")
for room_member in room_members:
if room_member not in config.dm_users.inverse:
continue
if room_member in dm_rooms:
logger.warning(
f"Bot {account.id} | Found more than one direct room with user {room_member}: {room_id}"
)
continue
dm_rooms[room_member] = room_id
logger.info(
f"Bot {account.id} | Found direct room {room_id} with user {room_member}"
)
return dm_rooms
async def create_dm_rooms(
self, account: AccountConfig, matrix_client: AsyncClient, config: Config
) -> None:
async with self._lock:
if matrix_client is self.matrix_client:
unactive_accounts = self.unactive_user_ids()
self.dm_rooms = await self.find_existing_dm_rooms(
account=account, matrix_client=matrix_client, config=config
)
for user_id in config.dm_users.inverse:
if user_id in self.dm_rooms:
continue
display_name_response = await matrix_client.get_displayname(user_id)
if isinstance(display_name_response, ProfileGetDisplayNameError):
error = display_name_response.message
logger.warning(
f"Bot {account.id} | Cannot fetch user name for {user_id}: {error}"
)
continue
user_name = display_name_response.displayname
if config.dm_room_title:
room_title = config.dm_room_title.format(user=user_name)
else:
room_title = None
logger.info(
f"Bot {account.id} | Creating direct room with user {user_id}"
)
invitations = unactive_accounts + [user_id]
create_room_response = await matrix_client.room_create(
visibility=RoomVisibility.private,
name=room_title,
invite=invitations,
is_direct=True,
preset=RoomPreset.private_chat,
)
if isinstance(create_room_response, RoomCreateError):
error = create_room_response.message
logger.warning(
f"Bot {account.id} | Cannot create direct room with user {user_id}: {error}"
)
continue
dm_room_id = create_room_response.room_id
if dm_room_id is None:
logger.warning(
f"Bot {account.id} | Cannot find direct room id with user {user_id}"
)
continue
logger.info(
f"Bot {account.id} | Created direct room {dm_room_id} with user {user_id}"
)
self.dm_rooms[user_id] = dm_room_id
async def start(
self,
account: AccountConfig,
@ -196,14 +324,14 @@ class MatrixClientPool:
f"Bot {account.id} | Failed to login: {login_response.message}"
)
return False
except LocalProtocolError as e:
except LocalProtocolError as error:
# There's an edge case here where the user hasn't installed the correct C
# dependencies. In that case, a LocalProtocolError is raised on login.
logger.fatal(
f"Bot {account.id} | Failed to login. Have you installed the correct dependencies? "
"https://github.com/poljar/matrix-nio#installation "
"Error: %s",
e,
error,
)
return False
@ -233,8 +361,19 @@ class MatrixClientPool:
logger.info(f"Bot {account.id} | Logged in.")
await matrix_client.sync(timeout=30000, full_state=True)
await self.create_dm_rooms(
account=account, matrix_client=matrix_client, config=config
)
await matrix_client.sync_forever(timeout=30000, full_state=True)
except (ClientConnectionError, ServerDisconnectedError, TimeoutError):
except (
ClientConnectionError,
LocalTransportError,
ServerDisconnectedError,
TimeoutError,
):
await matrix_client.close()
logger.warning(

View file

@ -103,6 +103,7 @@ async def create_alerts(request: web_request.Request) -> web.Response:
except KeyError as e:
logger.error(f"Cannot parse alert dict: {e}")
return web.Response(status=400, body=f"Invalid alert: {alert_dict}.")
alerts.append(alert)
for alert in alerts:
@ -153,6 +154,21 @@ async def create_alert(
cache: Cache = request.app["cache"]
config: Config = request.app["config"]
if config.dm_select_label and config.dm_select_label in alert.labels:
if alert.match_all_labels(config.dm_filter_labels):
dm_select_value = alert.labels[config.dm_select_label]
if dm_select_value not in config.dm_users:
logger.warning(
f"Cannot find user with label {config.dm_select_label}={dm_select_value}"
)
return
user_id = config.dm_users[dm_select_value]
if user_id not in matrix_client_pool.dm_rooms:
logger.warning(f"Cannot find a matrix room for user {user_id}")
return
room_id = matrix_client_pool.dm_rooms[user_id]
if alert.firing:
try:
silence_id = await alertmanager_client.update_silence(alert.fingerprint)

View file

@ -71,6 +71,19 @@ alertmanager:
# Url to Alertmanager server
url: http://localhost:9093
dm:
filter_labels:
matrix: dm
select_label: uuid
room_title: Alerts for {user}
users:
- matrix_id: "@some_other_user1:example.com"
user_id:
- a7b37c33-574c-45ac-bb07-a3b314c2da54
- matrix_id: "@some_other_user2:example.com"
user_id:
- cfb32a1d-737a-4618-8ee9-09b254d98fee
- 27e73f9b-b40a-4d84-b5b5-225931f6c289
cache:
# The path to a directory for caching alerts and silences
path: "data/cache"

View file

@ -1,4 +1,5 @@
import os
import re
import unittest
from typing import Dict
@ -43,6 +44,39 @@ class AlertTestCase(unittest.TestCase):
self.assertEqual("resolved", alert.status)
self.assertFalse(alert.firing)
def test_match_label(self) -> None:
alert = Alert.from_dict(self.alert_dict)
pattern = re.compile(r"^alert\d+$", re.I)
self.assertTrue(alert.match_label("alertname", pattern))
pattern = re.compile("alert2")
self.assertFalse(alert.match_label("alertname", pattern))
pattern = re.compile(r"^.*$", re.I)
self.assertFalse(alert.match_label("inexistent_label", pattern))
def test_match_all_labels(self) -> None:
alert = Alert.from_dict(self.alert_dict)
patterns = {
"alertname": re.compile(r"^alert\d+$", re.I),
"job": re.compile(r"^job\d+$", re.I),
}
self.assertTrue(alert.match_all_labels(patterns))
patterns = {
"alertname": re.compile(r"^alert\d+$", re.I),
"job": re.compile("job2"),
}
self.assertFalse(alert.match_all_labels(patterns))
patterns = {
"alertname": re.compile(r"^alert\d+$", re.I),
"inexistent_label": re.compile(r"^.*$", re.I),
}
self.assertFalse(alert.match_all_labels(patterns))
class AlertRendererTestCase(unittest.TestCase):
def setUp(self) -> None:

View file

@ -194,7 +194,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
alertmanager_client = AlertmanagerClient(
f"http://localhost:{port}", fake_cache
)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
alerts = await alertmanager_client.get_alerts()
self.assertEqual(
@ -224,7 +224,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
alertmanager_client = AlertmanagerClient(
f"http://localhost:{port}", fake_cache
)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
alerts = await alertmanager_client.get_alerts()
self.assertEqual([], alerts)
@ -237,7 +237,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
alertmanager_client = AlertmanagerClient(
f"http://localhost:{port}", fake_cache
)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
with self.assertRaises(AlertmanagerServerError):
await alertmanager_client.get_alerts()
@ -249,7 +249,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
alertmanager_client = AlertmanagerClient(
f"http://localhost:{port}", fake_cache
)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
silences = await alertmanager_client.get_silences()
self.assertEqual(
@ -278,7 +278,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
alertmanager_client = AlertmanagerClient(
f"http://localhost:{port}", fake_cache
)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
silences = await alertmanager_client.get_silences()
self.assertEqual([], silences)
@ -291,7 +291,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
alertmanager_client = AlertmanagerClient(
f"http://localhost:{port}", fake_cache
)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
with self.assertRaises(AlertmanagerServerError):
await alertmanager_client.get_silences()
@ -303,7 +303,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
alertmanager_client = AlertmanagerClient(
f"http://localhost:{port}", fake_cache
)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
alert = await alertmanager_client.get_alert("fingerprint1")
self.assertEqual(
@ -323,7 +323,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
alertmanager_client = AlertmanagerClient(
f"http://localhost:{port}", fake_cache
)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
with self.assertRaises(AlertNotFoundError):
await alertmanager_client.get_alert("fingerprint1")
@ -335,7 +335,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
alertmanager_client = AlertmanagerClient(
f"http://localhost:{port}", fake_cache
)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
with self.assertRaises(AlertmanagerServerError):
await alertmanager_client.get_alert("fingerprint1")
@ -347,7 +347,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
alertmanager_client = AlertmanagerClient(
f"http://localhost:{port}", fake_cache
)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
silence1 = await alertmanager_client.get_silence("silence1")
silence2 = await alertmanager_client.get_silence("silence2")
@ -378,7 +378,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
alertmanager_client = AlertmanagerClient(
f"http://localhost:{port}", fake_cache
)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
with self.assertRaises(SilenceNotFoundError):
await alertmanager_client.get_silence("silence1")
@ -390,7 +390,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
alertmanager_client = AlertmanagerClient(
f"http://localhost:{port}", fake_cache
)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
with self.assertRaises(AlertmanagerServerError):
await alertmanager_client.get_silence("silence1")
@ -403,7 +403,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
alertmanager_client = AlertmanagerClient(
f"http://localhost:{port}", fake_cache
)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
silence_id = await alertmanager_client.create_silence(
"fingerprint1", "user", 86400
)
@ -440,7 +440,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
alertmanager_client = AlertmanagerClient(
f"http://localhost:{port}", fake_cache
)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
await alertmanager_client.create_silence("fingerprint1", "user", 86400)
with self.assertRaises(SilenceExtendError):
await alertmanager_client.update_silence("fingerprint1")
@ -457,7 +457,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
alertmanager_client = AlertmanagerClient(
f"http://localhost:{port}", fake_cache
)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
silence_id1 = await alertmanager_client.create_silence(
"fingerprint1", "user", 86400
)
@ -498,7 +498,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
alertmanager_client = AlertmanagerClient(
f"http://localhost:{port}", fake_cache
)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
await alertmanager_client.create_silence("fingerprint1", "user1", 86400)
silence_id2 = await alertmanager_client.update_silence(
"fingerprint1", "user2", 864000
@ -543,7 +543,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
fake_cache = Mock(spec=Cache)
alertmanager_client = AlertmanagerClient("http://localhost", fake_cache)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
silence_id1 = await alertmanager_client.create_or_update_silence(
"fingerprint1", "user", 86400
)
@ -563,7 +563,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
fake_update_silence.return_value = "silence1"
alertmanager_client = AlertmanagerClient("http://localhost", fake_cache)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
silence_id1 = await alertmanager_client.create_or_update_silence(
"fingerprint1", "user", 86400
)
@ -584,7 +584,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
alertmanager_client = AlertmanagerClient(
f"http://localhost:{port}", fake_cache
)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
silence_id = await alertmanager_client.create_silence(
"fingerprint1", "user"
)
@ -621,7 +621,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
alertmanager_client = AlertmanagerClient(
f"http://localhost:{port}", fake_cache
)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
silence1_id = await alertmanager_client.create_silence(
"fingerprint1", "user"
)
@ -667,7 +667,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
fake_cache = Mock(spec=Cache)
alertmanager_client = AlertmanagerClient("http://localhost", fake_cache)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
silence_id1 = await alertmanager_client.create_or_update_silence(
"fingerprint1", "user"
)
@ -687,7 +687,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
fake_update_silence.return_value = "silence1"
alertmanager_client = AlertmanagerClient("http://localhost", fake_cache)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
silence_id1 = await alertmanager_client.create_or_update_silence(
"fingerprint1", "user"
)
@ -707,7 +707,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
alertmanager_client = AlertmanagerClient(
f"http://localhost:{port}", fake_cache
)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
silence_id = await alertmanager_client.create_silence(
"fingerprint1", "user", int(timedelta.max.total_seconds()) + 1
)
@ -743,7 +743,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
alertmanager_client = AlertmanagerClient(
f"http://localhost:{port}", fake_cache
)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
with self.assertRaises(AlertNotFoundError):
await alertmanager_client.create_silence("fingerprint1", "user")
@ -756,7 +756,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
alertmanager_client = AlertmanagerClient(
f"http://localhost:{port}", fake_cache
)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
await alertmanager_client.get_alert("fingerprint1")
with self.assertRaises(AlertmanagerServerError):
@ -770,7 +770,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
alertmanager_client = AlertmanagerClient(
f"http://localhost:{port}", fake_cache
)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
with self.assertRaises(SilenceNotFoundError):
await alertmanager_client.update_silence("fingerprint1")
with self.assertRaises(SilenceNotFoundError):
@ -784,7 +784,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
alertmanager_client = AlertmanagerClient(
f"http://localhost:{port}", fake_cache
)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
with self.assertRaises(SilenceExtendError):
await alertmanager_client.update_silence("fingerprint1")
@ -796,7 +796,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
alertmanager_client = AlertmanagerClient(
f"http://localhost:{port}", fake_cache
)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
await alertmanager_client.get_alert("fingerprint1")
with self.assertRaises(AlertmanagerServerError):
@ -810,7 +810,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
alertmanager_client = AlertmanagerClient(
f"http://localhost:{port}", fake_cache
)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
await alertmanager_client.delete_silence("silence1")
silences = await alertmanager_client.get_silences()
@ -834,7 +834,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
alertmanager_client = AlertmanagerClient(
f"http://localhost:{port}", fake_cache
)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
with self.assertRaises(SilenceExpiredError):
await alertmanager_client.delete_silence("silence2")
silences = await alertmanager_client.get_silences()
@ -849,7 +849,7 @@ class AlertmanagerClientTestCase(unittest.IsolatedAsyncioTestCase):
alertmanager_client = AlertmanagerClient(
f"http://localhost:{port}", fake_cache
)
async with aiotools.closing_async(alertmanager_client):
async with alertmanager_client:
await alertmanager_client.get_alert("fingerprint1")
with self.assertRaises(AlertmanagerServerError):

View file

@ -12,14 +12,16 @@ import matrix_alertbot.alertmanager
import matrix_alertbot.callback
import matrix_alertbot.command
import matrix_alertbot.matrix
from matrix_alertbot.config import BiDict
class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
def setUp(self) -> None:
# Create a Callbacks object and give it some Mock'd objects to use
self.fake_matrix_client = Mock(spec=nio.AsyncClient)
self.fake_matrix_client.user_id = "@fake_user:example.com"
# self.fake_matrix_client.user = "@fake_user"
self.fake_matrix_client1 = Mock(spec=nio.AsyncClient)
self.fake_matrix_client1.user_id = "@fake_user1:example.com"
self.fake_matrix_client2 = Mock(spec=nio.AsyncClient)
self.fake_matrix_client2.user_id = "@fake_user2:example.com"
self.fake_cache = MagicMock(spec=Cache)
self.fake_alertmanager_client = Mock(
@ -36,15 +38,21 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
self.fake_config.allowed_rooms = [self.fake_room.room_id]
self.fake_config.allowed_reactions = ["🤫", "🤗"]
self.fake_config.insult_reactions = ["🤗"]
self.fake_config.user_ids = [self.fake_matrix_client.user_id]
self.fake_config.user_ids = [
self.fake_matrix_client1.user_id,
self.fake_matrix_client2.user_id,
]
self.fake_config.dm_users = BiDict(
{"a7b37c33-574c-45ac-bb07-a3b314c2da54": "@fake_dm_user:example.com"}
)
self.fake_matrix_client_pool = Mock(
spec=matrix_alertbot.matrix.MatrixClientPool
)
self.fake_matrix_client_pool.matrix_client = self.fake_matrix_client
self.fake_matrix_client_pool.matrix_client = self.fake_matrix_client1
self.callbacks = matrix_alertbot.callback.Callbacks(
self.fake_matrix_client,
self.fake_matrix_client1,
self.fake_alertmanager_client,
self.fake_cache,
self.fake_config,
@ -61,7 +69,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
await self.callbacks.invite(self.fake_room, fake_invite_event)
# Check that we attempted to join the room
self.fake_matrix_client.join.assert_called_once_with(self.fake_room.room_id)
self.fake_matrix_client1.join.assert_called_once_with(self.fake_room.room_id)
async def test_invite_in_unauthorized_room(self) -> None:
"""Tests the callback for InviteMemberEvents"""
@ -75,7 +83,39 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
await self.callbacks.invite(self.fake_room, fake_invite_event)
# Check that we attempted to join the room
self.fake_matrix_client.join.assert_not_called()
self.fake_matrix_client1.join.assert_not_called()
async def test_invite_from_dm_user(self) -> None:
"""Tests the callback for InviteMemberEvents"""
# Tests that the bot attempts to join a room after being invited to it
fake_invite_event = Mock(spec=nio.InviteMemberEvent)
fake_invite_event.sender = "@fake_dm_user:example.com"
self.fake_room.room_id = "!unauthorizedroom@example.com"
# Pretend that we received an invite event
await self.callbacks.invite(self.fake_room, fake_invite_event)
# Check that we attempted to join the room
self.fake_matrix_client1.join.assert_called_once_with(
"!unauthorizedroom@example.com"
)
async def test_invite_from_other_matrix_client(self) -> None:
"""Tests the callback for InviteMemberEvents"""
# Tests that the bot attempts to join a room after being invited to it
fake_invite_event = Mock(spec=nio.InviteMemberEvent)
fake_invite_event.sender = self.fake_matrix_client2.user_id
self.fake_room.room_id = "!unauthorizedroom@example.com"
# Pretend that we received an invite event
await self.callbacks.invite(self.fake_room, fake_invite_event)
# Check that we attempted to join the room
self.fake_matrix_client1.join.assert_called_once_with(
"!unauthorizedroom@example.com"
)
async def test_invite_raise_join_error(self) -> None:
"""Tests the callback for InviteMemberEvents"""
@ -85,13 +125,13 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
fake_join_error = Mock(spec=nio.JoinError)
fake_join_error.message = "error message"
self.fake_matrix_client.join.return_value = fake_join_error
self.fake_matrix_client1.join.return_value = fake_join_error
# Pretend that we received an invite event
await self.callbacks.invite(self.fake_room, fake_invite_event)
# Check that we attempted to join the room
self.fake_matrix_client.join.assert_has_calls(
self.fake_matrix_client1.join.assert_has_calls(
[
call("!abcdefg:example.com"),
call("!abcdefg:example.com"),
@ -121,7 +161,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
fake_message_event = Mock(spec=nio.RoomMessageText)
fake_message_event.event_id = "some event id"
fake_message_event.sender = "@some_other_fake_user:example.com"
fake_message_event.body = "@fake_user help"
fake_message_event.body = "@fake_user1 help"
fake_message_event.source = {"content": {}}
self.fake_matrix_client_pool.matrix_client = None
@ -141,7 +181,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
fake_message_event = Mock(spec=nio.RoomMessageText)
fake_message_event.event_id = "some event id"
fake_message_event.sender = "@some_other_fake_user:example.com"
fake_message_event.body = "@fake_user help"
fake_message_event.body = "@fake_user1 help"
fake_message_event.source = {"content": {}}
# Pretend that we received a text message event
@ -149,7 +189,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
# Check that the command was not executed
fake_command.assert_called_with(
self.fake_matrix_client,
self.fake_matrix_client1,
self.fake_cache,
self.fake_alertmanager_client,
self.fake_config,
@ -168,7 +208,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
fake_message_event = Mock(spec=nio.RoomMessageText)
fake_message_event.event_id = "some event id"
fake_message_event.sender = "@some_other_fake_user:example.com"
fake_message_event.body = "@fake_user help"
fake_message_event.body = "@fake_user1 help"
fake_message_event.source = {
"content": {
"m.relates_to": {"m.in_reply_to": {"event_id": "some alert event id"}}
@ -180,7 +220,42 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
# Check that we attempted to execute the command
fake_command.assert_called_once_with(
self.fake_matrix_client,
self.fake_matrix_client1,
self.fake_cache,
self.fake_alertmanager_client,
self.fake_config,
self.fake_room,
fake_message_event.sender,
fake_message_event.event_id,
(),
)
fake_command.return_value.process.assert_called_once()
@patch.object(matrix_alertbot.command, "HelpCommand", autospec=True)
async def test_message_help_in_reply_with_mention_sent_by_dm_user(
self, fake_command: Mock
) -> None:
"""Tests the callback for RoomMessageText with a mention of the bot"""
# Tests that the bot process messages in the room that contain a command
fake_message_event = Mock(spec=nio.RoomMessageText)
fake_message_event.event_id = "some event id"
fake_message_event.sender = "@fake_dm_user:example.com"
fake_message_event.body = "@fake_user1 help"
fake_message_event.source = {
"content": {
"m.relates_to": {"m.in_reply_to": {"event_id": "some alert event id"}}
}
}
self.fake_room.room_id = "!unauthorizedroom@example.com"
# Pretend that we received a text message event
await self.callbacks.message(self.fake_room, fake_message_event)
# Check that we attempted to execute the command
fake_command.assert_called_once_with(
self.fake_matrix_client1,
self.fake_cache,
self.fake_alertmanager_client,
self.fake_config,
@ -197,7 +272,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
# Tests that the bot process messages in the room that contain a command
fake_message_event = Mock(spec=nio.RoomMessageText)
fake_message_event.sender = self.fake_matrix_client.user_id
fake_message_event.sender = self.fake_matrix_client1.user_id
# Pretend that we received a text message event
await self.callbacks.message(self.fake_room, fake_message_event)
@ -232,7 +307,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
fake_message_event = Mock(spec=nio.RoomMessageText)
fake_message_event.event_id = "some event id"
fake_message_event.sender = "@some_other_fake_user:example.com"
fake_message_event.body = "@fake_user ack"
fake_message_event.body = "@fake_user1 ack"
fake_message_event.source = {"content": {}}
# Pretend that we received a text message event
@ -250,7 +325,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
fake_message_event = Mock(spec=nio.RoomMessageText)
fake_message_event.event_id = "some event id"
fake_message_event.sender = "@some_other_fake_user:example.com"
fake_message_event.body = "@fake_user:example.com ack"
fake_message_event.body = "@fake_user1:example.com ack"
fake_message_event.source = {
"content": {
"m.relates_to": {"m.in_reply_to": {"event_id": "some alert event id"}}
@ -262,7 +337,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
# Check that the command was not executed
fake_command.assert_called_once_with(
self.fake_matrix_client,
self.fake_matrix_client1,
self.fake_cache,
self.fake_alertmanager_client,
self.fake_config,
@ -283,7 +358,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
fake_message_event = Mock(spec=nio.RoomMessageText)
fake_message_event.event_id = "some event id"
fake_message_event.sender = "@some_other_fake_user:example.com"
fake_message_event.body = "fake_user ack"
fake_message_event.body = "fake_user1 ack"
fake_message_event.source = {
"content": {
"m.relates_to": {"m.in_reply_to": {"event_id": "some alert event id"}}
@ -295,7 +370,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
# Check that the command was not executed
fake_command.assert_called_once_with(
self.fake_matrix_client,
self.fake_matrix_client1,
self.fake_cache,
self.fake_alertmanager_client,
self.fake_config,
@ -316,7 +391,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
fake_message_event = Mock(spec=nio.RoomMessageText)
fake_message_event.event_id = "some event id"
fake_message_event.sender = "@some_other_fake_user:example.com"
fake_message_event.body = "@fake_user:example.com @fake_user:example.com: ack"
fake_message_event.body = "@fake_user1:example.com @fake_user2:example.com: ack"
fake_message_event.source = {
"content": {
"m.relates_to": {"m.in_reply_to": {"event_id": "some alert event id"}}
@ -328,7 +403,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
# Check that the command was not executed
fake_command.assert_called_once_with(
self.fake_matrix_client,
self.fake_matrix_client1,
self.fake_cache,
self.fake_alertmanager_client,
self.fake_config,
@ -349,7 +424,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
fake_message_event = Mock(spec=nio.RoomMessageText)
fake_message_event.event_id = "some event id"
fake_message_event.sender = "@some_other_fake_user:example.com"
fake_message_event.body = "@fake_user unack"
fake_message_event.body = "@fake_user1 unack"
fake_message_event.source = {"content": {}}
# Pretend that we received a text message event
@ -367,7 +442,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
fake_message_event = Mock(spec=nio.RoomMessageText)
fake_message_event.event_id = "some event id"
fake_message_event.sender = "@some_other_fake_user:example.com"
fake_message_event.body = "@fake_user unack"
fake_message_event.body = "@fake_user1 unack"
fake_message_event.source = {
"content": {
"m.relates_to": {"m.in_reply_to": {"event_id": "some alert event id"}}
@ -379,7 +454,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
# Check that the command was not executed
fake_command.assert_called_once_with(
self.fake_matrix_client,
self.fake_matrix_client1,
self.fake_cache,
self.fake_alertmanager_client,
self.fake_config,
@ -401,7 +476,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
fake_message_event = Mock(spec=nio.RoomMessageText)
fake_message_event.event_id = "some event id"
fake_message_event.sender = "@some_other_fake_user:example.com"
fake_message_event.body = "@fake_user ack"
fake_message_event.body = "@fake_user1 ack"
fake_message_event.source = {
"content": {
"m.relates_to": {"m.in_reply_to": {"event_id": "some alert event id"}}
@ -417,7 +492,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
# Check that the command was not executed
fake_command.assert_called_once_with(
self.fake_matrix_client,
self.fake_matrix_client1,
self.fake_cache,
self.fake_alertmanager_client,
self.fake_config,
@ -437,7 +512,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
# Tests that the bot process messages in the room that contain a command
fake_alert_event = Mock(spec=nio.RoomMessageText)
fake_alert_event.event_id = "some alert event id"
fake_alert_event.sender = self.fake_matrix_client.user_id
fake_alert_event.sender = self.fake_matrix_client1.user_id
fake_reaction_event = Mock(spec=nio.ReactionEvent)
fake_reaction_event.event_id = "some event id"
@ -447,7 +522,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
fake_event_response = Mock(spec=nio.RoomGetEventResponse)
fake_event_response.event = fake_alert_event
self.fake_matrix_client.room_get_event.return_value = fake_event_response
self.fake_matrix_client1.room_get_event.return_value = fake_event_response
self.fake_matrix_client_pool.matrix_client = None
@ -466,7 +541,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
# Tests that the bot process messages in the room that contain a command
fake_alert_event = Mock(spec=nio.RoomMessageText)
fake_alert_event.event_id = "some alert event id"
fake_alert_event.sender = self.fake_matrix_client.user_id
fake_alert_event.sender = self.fake_matrix_client1.user_id
fake_reaction_event = Mock(spec=nio.ReactionEvent)
fake_reaction_event.event_id = "some event id"
@ -476,14 +551,14 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
fake_event_response = Mock(spec=nio.RoomGetEventResponse)
fake_event_response.event = fake_alert_event
self.fake_matrix_client.room_get_event.return_value = fake_event_response
self.fake_matrix_client1.room_get_event.return_value = fake_event_response
# Pretend that we received a text message event
await self.callbacks.reaction(self.fake_room, fake_reaction_event)
# Check that we attempted to execute the command
fake_command.assert_called_once_with(
self.fake_matrix_client,
self.fake_matrix_client1,
self.fake_cache,
self.fake_alertmanager_client,
self.fake_config,
@ -493,7 +568,51 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
"some alert event id",
)
fake_command.return_value.process.assert_called_once()
self.fake_matrix_client.room_get_event.assert_called_once_with(
self.fake_matrix_client1.room_get_event.assert_called_once_with(
self.fake_room.room_id, fake_alert_event.event_id
)
fake_angry_user_command.assert_not_called()
@patch.object(matrix_alertbot.callback, "AngryUserCommand", autospec=True)
@patch.object(matrix_alertbot.callback, "AckAlertCommand", autospec=True)
async def test_reaction_from_dm_user(
self, fake_command: Mock, fake_angry_user_command
) -> None:
"""Tests the callback for RoomMessageText with a mention of the bot"""
# Tests that the bot process messages in the room that contain a command
fake_alert_event = Mock(spec=nio.RoomMessageText)
fake_alert_event.event_id = "some alert event id"
fake_alert_event.sender = self.fake_matrix_client1.user_id
fake_reaction_event = Mock(spec=nio.ReactionEvent)
fake_reaction_event.event_id = "some event id"
fake_reaction_event.sender = "@fake_dm_user:example.com"
fake_reaction_event.reacts_to = fake_alert_event.event_id
fake_reaction_event.key = "🤫"
fake_event_response = Mock(spec=nio.RoomGetEventResponse)
fake_event_response.event = fake_alert_event
self.fake_matrix_client1.room_get_event.return_value = fake_event_response
self.fake_room.room_id = "!unauthorizedroom@example.com"
# Pretend that we received a text message event
await self.callbacks.reaction(self.fake_room, fake_reaction_event)
# Check that we attempted to execute the command
fake_command.assert_called_once_with(
self.fake_matrix_client1,
self.fake_cache,
self.fake_alertmanager_client,
self.fake_config,
self.fake_room,
fake_reaction_event.sender,
fake_reaction_event.event_id,
"some alert event id",
)
fake_command.return_value.process.assert_called_once()
self.fake_matrix_client1.room_get_event.assert_called_once_with(
self.fake_room.room_id, fake_alert_event.event_id
)
@ -508,7 +627,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
# Tests that the bot process messages in the room that contain a command
fake_alert_event = Mock(spec=nio.RoomMessageText)
fake_alert_event.event_id = "some alert event id"
fake_alert_event.sender = self.fake_matrix_client.user_id
fake_alert_event.sender = self.fake_matrix_client1.user_id
fake_reaction_event = Mock(spec=nio.ReactionEvent)
fake_reaction_event.event_id = "some event id"
@ -518,14 +637,14 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
fake_event_response = Mock(spec=nio.RoomGetEventResponse)
fake_event_response.event = fake_alert_event
self.fake_matrix_client.room_get_event.return_value = fake_event_response
self.fake_matrix_client1.room_get_event.return_value = fake_event_response
# Pretend that we received a text message event
await self.callbacks.reaction(self.fake_room, fake_reaction_event)
# Check that we attempted to execute the command
fake_command.assert_called_once_with(
self.fake_matrix_client,
self.fake_matrix_client1,
self.fake_cache,
self.fake_alertmanager_client,
self.fake_config,
@ -535,12 +654,12 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
"some alert event id",
)
fake_command.return_value.process.assert_called_once()
self.fake_matrix_client.room_get_event.assert_called_once_with(
self.fake_matrix_client1.room_get_event.assert_called_once_with(
self.fake_room.room_id, fake_alert_event.event_id
)
fake_angry_user_command.assert_called_once_with(
self.fake_matrix_client,
self.fake_matrix_client1,
self.fake_cache,
self.fake_alertmanager_client,
self.fake_config,
@ -564,7 +683,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
fake_reaction_event.key = "🤫"
fake_event_response = Mock(spec=nio.RoomGetEventError)
self.fake_matrix_client.room_get_event.return_value = fake_event_response
self.fake_matrix_client1.room_get_event.return_value = fake_event_response
# Pretend that we received a text message event
await self.callbacks.reaction(self.fake_room, fake_reaction_event)
@ -572,7 +691,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
# Check that we attempted to execute the command
fake_command.assert_not_called()
self.fake_cache.set.assert_not_called()
self.fake_matrix_client.room_get_event.assert_called_once_with(
self.fake_matrix_client1.room_get_event.assert_called_once_with(
self.fake_room.room_id, fake_alert_event_id
)
@ -595,7 +714,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
fake_event_response = Mock(spec=nio.RoomGetEventResponse)
fake_event_response.event = fake_alert_event
self.fake_matrix_client.room_get_event.return_value = fake_event_response
self.fake_matrix_client1.room_get_event.return_value = fake_event_response
# Pretend that we received a text message event
await self.callbacks.reaction(self.fake_room, fake_reaction_event)
@ -603,7 +722,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
# Check that we attempted to execute the command
fake_command.assert_not_called()
self.fake_cache.set.assert_not_called()
self.fake_matrix_client.room_get_event.assert_called_once_with(
self.fake_matrix_client1.room_get_event.assert_called_once_with(
self.fake_room.room_id, fake_alert_event.event_id
)
@ -616,7 +735,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
# Tests that the bot process messages in the room that contain a command
fake_alert_event = Mock(spec=nio.RoomMessageText)
fake_alert_event.event_id = "some alert event id"
fake_alert_event.sender = self.fake_matrix_client.user_id
fake_alert_event.sender = self.fake_matrix_client1.user_id
fake_reaction_event = Mock(spec=nio.ReactionEvent)
fake_reaction_event.event_id = "some event id"
@ -626,7 +745,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
fake_event_response = Mock(spec=nio.RoomGetEventResponse)
fake_event_response.event = fake_alert_event
self.fake_matrix_client.room_get_event.return_value = fake_event_response
self.fake_matrix_client1.room_get_event.return_value = fake_event_response
fake_command.return_value.process.side_effect = (
nio.exceptions.LocalProtocolError
@ -637,7 +756,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
# Check that we attempted to execute the command
fake_command.assert_called_once_with(
self.fake_matrix_client,
self.fake_matrix_client1,
self.fake_cache,
self.fake_alertmanager_client,
self.fake_config,
@ -647,7 +766,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
"some alert event id",
)
fake_command.return_value.process.assert_called_once()
self.fake_matrix_client.room_get_event.assert_called_once_with(
self.fake_matrix_client1.room_get_event.assert_called_once_with(
self.fake_room.room_id, fake_alert_event.event_id
)
@ -671,7 +790,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
# Check that we attempted to execute the command
fake_command.assert_not_called()
self.fake_matrix_client.room_get_event.assert_not_called()
self.fake_matrix_client1.room_get_event.assert_not_called()
@patch.object(matrix_alertbot.callback, "AckAlertCommand", autospec=True)
async def test_ignore_reaction_sent_by_bot_user(self, fake_command: Mock) -> None:
@ -682,7 +801,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
fake_reaction_event = Mock(spec=nio.ReactionEvent)
fake_reaction_event.type = "m.reaction"
fake_reaction_event.event_id = "some event id"
fake_reaction_event.sender = self.fake_matrix_client.user_id
fake_reaction_event.sender = self.fake_matrix_client1.user_id
fake_reaction_event.reacts_to = fake_alert_event_id
fake_reaction_event.key = "unknown"
@ -691,7 +810,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
# Check that we attempted to execute the command
fake_command.assert_not_called()
self.fake_matrix_client.room_get_event.assert_not_called()
self.fake_matrix_client1.room_get_event.assert_not_called()
@patch.object(matrix_alertbot.callback, "AckAlertCommand", autospec=True)
async def test_ignore_reaction_in_unauthorized_room(
@ -715,7 +834,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
# Check that we attempted to execute the command
fake_command.assert_not_called()
self.fake_matrix_client.room_get_event.assert_not_called()
self.fake_matrix_client1.room_get_event.assert_not_called()
@patch.object(matrix_alertbot.callback, "UnackAlertCommand", autospec=True)
async def test_redaction_client_not_in_pool(self, fake_command: Mock) -> None:
@ -758,7 +877,39 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
# Check that we attempted to execute the command
fake_command.assert_called_once_with(
self.fake_matrix_client,
self.fake_matrix_client1,
self.fake_cache,
self.fake_alertmanager_client,
self.fake_config,
self.fake_room,
fake_redaction_event.sender,
fake_redaction_event.event_id,
fake_redaction_event.redacts,
)
fake_command.return_value.process.assert_called_once()
@patch.object(matrix_alertbot.callback, "UnackAlertCommand", autospec=True)
async def test_redaction_by_dm_user(self, fake_command: Mock) -> None:
"""Tests the callback for RoomMessageText with a mention of the bot"""
# Tests that the bot process messages in the room that contain a command
fake_alert_event_id = "some alert event id"
fake_redaction_event = Mock(spec=nio.RedactionEvent)
fake_redaction_event.redacts = "some other event id"
fake_redaction_event.event_id = "some event id"
fake_redaction_event.sender = "@fake_dm_user:example.com"
fake_cache_dict = {fake_redaction_event.redacts: fake_alert_event_id}
self.fake_cache.__getitem__.side_effect = fake_cache_dict.__getitem__
self.fake_room.room_id = "!unauthorizedroom@example.com"
# Pretend that we received a text message event
await self.callbacks.redaction(self.fake_room, fake_redaction_event)
# Check that we attempted to execute the command
fake_command.assert_called_once_with(
self.fake_matrix_client1,
self.fake_cache,
self.fake_alertmanager_client,
self.fake_config,
@ -795,7 +946,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
# Check that we attempted to execute the command
fake_command.assert_called_once_with(
self.fake_matrix_client,
self.fake_matrix_client1,
self.fake_cache,
self.fake_alertmanager_client,
self.fake_config,
@ -813,7 +964,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
"""Tests the callback for RoomMessageText with a mention of the bot"""
# Tests that the bot process messages in the room that contain a command
fake_redaction_event = Mock(spec=nio.RedactionEvent)
fake_redaction_event.sender = self.fake_matrix_client.user_id
fake_redaction_event.sender = self.fake_matrix_client1.user_id
fake_cache_dict: Dict = {}
self.fake_cache.__getitem__.side_effect = fake_cache_dict.__getitem__
@ -859,16 +1010,16 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
fake_sas = Mock()
fake_transactions_dict = {fake_transaction_id: fake_sas}
self.fake_matrix_client.key_verifications = fake_transactions_dict
self.fake_matrix_client1.key_verifications = fake_transactions_dict
# Pretend that we received a text message event
await self.callbacks.key_verification_start(fake_key_verification_event)
# Check that we attempted to execute the command
self.fake_matrix_client.accept_key_verification.assert_called_once_with(
self.fake_matrix_client1.accept_key_verification.assert_called_once_with(
fake_transaction_id
)
self.fake_matrix_client.to_device.assert_called_once_with(fake_sas.share_key())
self.fake_matrix_client1.to_device.assert_called_once_with(fake_sas.share_key())
async def test_key_verification_start_with_emoji_not_supported(self) -> None:
"""Tests the callback for RoomMessageText with a mention of the bot"""
@ -883,14 +1034,14 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
fake_sas = Mock()
fake_transactions_dict = {fake_transaction_id: fake_sas}
self.fake_matrix_client.key_verifications = fake_transactions_dict
self.fake_matrix_client1.key_verifications = fake_transactions_dict
# Pretend that we received a text message event
await self.callbacks.key_verification_start(fake_key_verification_event)
# Check that we attempted to execute the command
self.fake_matrix_client.accept_key_verification.assert_not_called()
self.fake_matrix_client.to_device.assert_not_called()
self.fake_matrix_client1.accept_key_verification.assert_not_called()
self.fake_matrix_client1.to_device.assert_not_called()
async def test_key_verification_start_with_accept_key_verification_error(
self,
@ -905,22 +1056,22 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
fake_key_verification_event.short_authentication_string = ["emoji"]
fake_key_verification_event.transaction_id = fake_transaction_id
self.fake_matrix_client.accept_key_verification.return_value = Mock(
self.fake_matrix_client1.accept_key_verification.return_value = Mock(
spec=nio.ToDeviceError
)
fake_sas = Mock()
fake_transactions_dict = {fake_transaction_id: fake_sas}
self.fake_matrix_client.key_verifications = fake_transactions_dict
self.fake_matrix_client1.key_verifications = fake_transactions_dict
# Pretend that we received a text message event
await self.callbacks.key_verification_start(fake_key_verification_event)
# Check that we attempted to execute the command
self.fake_matrix_client.accept_key_verification.assert_called_once_with(
self.fake_matrix_client1.accept_key_verification.assert_called_once_with(
fake_transaction_id
)
self.fake_matrix_client.to_device.assert_not_called()
self.fake_matrix_client1.to_device.assert_not_called()
async def test_key_verification_start_with_to_device_error(
self,
@ -935,20 +1086,20 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
fake_key_verification_event.short_authentication_string = ["emoji"]
fake_key_verification_event.transaction_id = fake_transaction_id
self.fake_matrix_client.to_device.return_value = Mock(spec=nio.ToDeviceError)
self.fake_matrix_client1.to_device.return_value = Mock(spec=nio.ToDeviceError)
fake_sas = Mock()
fake_transactions_dict = {fake_transaction_id: fake_sas}
self.fake_matrix_client.key_verifications = fake_transactions_dict
self.fake_matrix_client1.key_verifications = fake_transactions_dict
# Pretend that we received a text message event
await self.callbacks.key_verification_start(fake_key_verification_event)
# Check that we attempted to execute the command
self.fake_matrix_client.accept_key_verification.assert_called_once_with(
self.fake_matrix_client1.accept_key_verification.assert_called_once_with(
fake_transaction_id
)
self.fake_matrix_client.to_device.assert_called_once_with(fake_sas.share_key())
self.fake_matrix_client1.to_device.assert_called_once_with(fake_sas.share_key())
async def test_key_verification_cancel(self) -> None:
"""Tests the callback for RoomMessageText with a mention of the bot"""
@ -977,13 +1128,13 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
("emoji2", "alt text2"),
]
fake_transactions_dict = {fake_transaction_id: fake_sas}
self.fake_matrix_client.key_verifications = fake_transactions_dict
self.fake_matrix_client1.key_verifications = fake_transactions_dict
# Pretend that we received a text message event
await self.callbacks.key_verification_confirm(fake_key_verification_event)
# Check that we attempted to execute the command
self.fake_matrix_client.confirm_short_auth_string.assert_called_once_with(
self.fake_matrix_client1.confirm_short_auth_string.assert_called_once_with(
fake_transaction_id
)
@ -996,7 +1147,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
fake_key_verification_event.sender = "@some_other_fake_user:example.com"
fake_key_verification_event.transaction_id = fake_transaction_id
self.fake_matrix_client.confirm_short_auth_string.return_value = Mock(
self.fake_matrix_client1.confirm_short_auth_string.return_value = Mock(
spec=nio.ToDeviceError
)
@ -1006,13 +1157,13 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
("emoji2", "alt text2"),
]
fake_transactions_dict = {fake_transaction_id: fake_sas}
self.fake_matrix_client.key_verifications = fake_transactions_dict
self.fake_matrix_client1.key_verifications = fake_transactions_dict
# Pretend that we received a text message event
await self.callbacks.key_verification_confirm(fake_key_verification_event)
# Check that we attempted to execute the command
self.fake_matrix_client.confirm_short_auth_string.assert_called_once_with(
self.fake_matrix_client1.confirm_short_auth_string.assert_called_once_with(
fake_transaction_id
)
@ -1028,14 +1179,14 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
fake_sas = Mock()
fake_sas.verified_devices = ["HGFEDCBA"]
fake_transactions_dict = {fake_transaction_id: fake_sas}
self.fake_matrix_client.key_verifications = fake_transactions_dict
self.fake_matrix_client1.key_verifications = fake_transactions_dict
# Pretend that we received a text message event
await self.callbacks.key_verification_end(fake_key_verification_event)
# Check that we attempted to execute the command
fake_sas.get_mac.assert_called_once_with()
self.fake_matrix_client.to_device.assert_called_once_with(fake_sas.get_mac())
self.fake_matrix_client1.to_device.assert_called_once_with(fake_sas.get_mac())
async def test_key_verification_end_with_missing_transaction_id(self) -> None:
"""Tests the callback for RoomMessageText with a mention of the bot"""
@ -1048,14 +1199,14 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
fake_sas = Mock()
fake_transactions_dict = {}
self.fake_matrix_client.key_verifications = fake_transactions_dict
self.fake_matrix_client1.key_verifications = fake_transactions_dict
# Pretend that we received a text message event
await self.callbacks.key_verification_end(fake_key_verification_event)
# Check that we attempted to execute the command
fake_sas.get_mac.assert_not_called()
self.fake_matrix_client.to_device.assert_not_called()
self.fake_matrix_client1.to_device.assert_not_called()
async def test_key_verification_end_with_mac_error(self) -> None:
"""Tests the callback for RoomMessageText with a mention of the bot"""
@ -1069,14 +1220,14 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
fake_sas = Mock()
fake_sas.get_mac.side_effect = nio.exceptions.LocalProtocolError
fake_transactions_dict = {fake_transaction_id: fake_sas}
self.fake_matrix_client.key_verifications = fake_transactions_dict
self.fake_matrix_client1.key_verifications = fake_transactions_dict
# Pretend that we received a text message event
await self.callbacks.key_verification_end(fake_key_verification_event)
# Check that we attempted to execute the command
fake_sas.get_mac.assert_called_once_with()
self.fake_matrix_client.to_device.assert_not_called()
self.fake_matrix_client1.to_device.assert_not_called()
async def test_key_verification_end_with_to_device_error(self) -> None:
"""Tests the callback for RoomMessageText with a mention of the bot"""
@ -1087,18 +1238,18 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
fake_key_verification_event.sender = "@some_other_fake_user:example.com"
fake_key_verification_event.transaction_id = fake_transaction_id
self.fake_matrix_client.to_device.return_value = Mock(spec=nio.ToDeviceError)
self.fake_matrix_client1.to_device.return_value = Mock(spec=nio.ToDeviceError)
fake_sas = Mock()
fake_transactions_dict = {fake_transaction_id: fake_sas}
self.fake_matrix_client.key_verifications = fake_transactions_dict
self.fake_matrix_client1.key_verifications = fake_transactions_dict
# Pretend that we received a text message event
await self.callbacks.key_verification_end(fake_key_verification_event)
# Check that we attempted to execute the command
fake_sas.get_mac.assert_called_once_with()
self.fake_matrix_client.to_device.assert_called_once_with(fake_sas.get_mac())
self.fake_matrix_client1.to_device.assert_called_once_with(fake_sas.get_mac())
@patch.object(matrix_alertbot.callback, "logger", autospec=True)
async def test_decryption_failure(self, fake_logger) -> None:
@ -1110,6 +1261,18 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
fake_logger.error.assert_called_once()
@patch.object(matrix_alertbot.callback, "logger", autospec=True)
async def test_decryption_failure_from_dm_user(self, fake_logger) -> None:
fake_megolm_event = Mock(spec=nio.MegolmEvent)
fake_megolm_event.sender = "@fake_dm_user:example.com"
fake_megolm_event.event_id = "some event id"
self.fake_room.room_id = "!unauthorizedroom@example.com"
await self.callbacks.decryption_failure(self.fake_room, fake_megolm_event)
fake_logger.error.assert_called_once()
@patch.object(matrix_alertbot.callback, "logger", autospec=True)
async def test_decryption_failure_in_unauthorized_room(self, fake_logger) -> None:
fake_megolm_event = Mock(spec=nio.MegolmEvent)
@ -1134,7 +1297,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
await self.callbacks.unknown_message(self.fake_room, fake_room_unknown_event)
self.fake_matrix_client.room_send.assert_called_once_with(
self.fake_matrix_client1.room_send.assert_called_once_with(
self.fake_room.room_id,
"m.room.message",
{
@ -1159,7 +1322,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
await self.callbacks.unknown_message(self.fake_room, fake_room_unknown_event)
self.fake_matrix_client.room_send.assert_not_called()
self.fake_matrix_client1.room_send.assert_not_called()
async def test_unknown_message_with_method_not_sas_v1(self) -> None:
fake_room_unknown_event = Mock(spec=nio.RoomMessageUnknown)
@ -1173,7 +1336,7 @@ class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
await self.callbacks.unknown_message(self.fake_room, fake_room_unknown_event)
self.fake_matrix_client.room_send.assert_not_called()
self.fake_matrix_client1.room_send.assert_not_called()
if __name__ == "__main__":

View file

@ -1,4 +1,5 @@
import os
import re
import sys
import unittest
from datetime import timedelta
@ -7,7 +8,7 @@ from unittest.mock import Mock, patch
import yaml
import matrix_alertbot.config
from matrix_alertbot.config import DEFAULT_REACTIONS, Config
from matrix_alertbot.config import DEFAULT_REACTIONS, BiDict, Config
from matrix_alertbot.errors import (
InvalidConfigError,
ParseConfigError,
@ -36,6 +37,72 @@ def mock_path_exists(path: str) -> bool:
return True
class BiDictTestCase(unittest.TestCase):
def test_init_bidict(self) -> None:
data = {"room1": "user1", "room2": "user1", "room3": "user2"}
bidict = BiDict(data)
self.assertDictEqual(data, bidict)
self.assertDictEqual(
{"user1": {"room1", "room2"}, "user2": {"room3"}}, bidict.inverse
)
def test_del_item_bidict(self) -> None:
data = {"room1": "user1", "room2": "user1", "room3": "user2"}
bidict = BiDict(data)
del bidict["room1"]
self.assertDictEqual({"room2": "user1", "room3": "user2"}, bidict)
self.assertDictEqual({"user1": {"room2"}, "user2": {"room3"}}, bidict.inverse)
del bidict["room3"]
self.assertDictEqual({"room2": "user1"}, bidict)
self.assertDictEqual(
{
"user1": {"room2"},
},
bidict.inverse,
)
del bidict["room2"]
self.assertDictEqual({}, bidict)
self.assertDictEqual({}, bidict.inverse)
with self.assertRaises(KeyError):
del bidict["room4"]
def test_add_item_bidict(self) -> None:
data = {"room1": "user1", "room2": "user1", "room3": "user2"}
bidict = BiDict(data)
bidict["room4"] = "user2"
self.assertDictEqual(
{"room1": "user1", "room2": "user1", "room3": "user2", "room4": "user2"},
bidict,
)
self.assertDictEqual(
{"user1": {"room1", "room2"}, "user2": {"room3", "room4"}}, bidict.inverse
)
bidict["room4"] = "user1"
self.assertDictEqual(
{"room1": "user1", "room2": "user1", "room3": "user2", "room4": "user1"},
bidict,
)
self.assertDictEqual(
{"user1": {"room1", "room2", "room4"}, "user2": {"room3"}}, bidict.inverse
)
bidict["room3"] = "user1"
self.assertDictEqual(
{"room1": "user1", "room2": "user1", "room3": "user1", "room4": "user1"},
bidict,
)
self.assertDictEqual(
{"user1": {"room1", "room2", "room3", "room4"}, "user2": set()},
bidict.inverse,
)
class ConfigTestCase(unittest.TestCase):
@patch("os.path.isdir")
@patch("os.path.exists")
@ -141,6 +208,32 @@ class ConfigTestCase(unittest.TestCase):
self.assertEqual({"🤫", "😶", "🤐", "🤗"}, config.allowed_reactions)
self.assertEqual({"🤗"}, config.insult_reactions)
self.assertDictEqual({"matrix": re.compile("dm")}, config.dm_filter_labels)
self.assertEqual("uuid", config.dm_select_label)
self.assertEqual(
"Alerts for FakeUser", config.dm_room_title.format(user="FakeUser")
)
self.assertDictEqual(
{
"a7b37c33-574c-45ac-bb07-a3b314c2da54": "@some_other_user1:example.com",
"cfb32a1d-737a-4618-8ee9-09b254d98fee": "@some_other_user2:example.com",
"27e73f9b-b40a-4d84-b5b5-225931f6c289": "@some_other_user2:example.com",
},
config.dm_users,
)
self.assertDictEqual(
{
"@some_other_user1:example.com": {
"a7b37c33-574c-45ac-bb07-a3b314c2da54"
},
"@some_other_user2:example.com": {
"cfb32a1d-737a-4618-8ee9-09b254d98fee",
"27e73f9b-b40a-4d84-b5b5-225931f6c289",
},
},
config.dm_users.inverse,
)
self.assertIsNone(config.address)
self.assertIsNone(config.port)
self.assertEqual("matrix-alertbot.socket", config.socket)

View file

@ -2,15 +2,24 @@ from __future__ import annotations
import random
import unittest
from unittest.mock import Mock, call, patch
from unittest.mock import AsyncMock, Mock, call, patch
import nio
from diskcache import Cache
from nio.api import RoomPreset, RoomVisibility
from nio.responses import (
JoinedMembersError,
JoinedMembersResponse,
ProfileGetDisplayNameError,
ProfileGetDisplayNameResponse,
RoomCreateError,
RoomCreateResponse,
)
import matrix_alertbot
import matrix_alertbot.matrix
from matrix_alertbot.alertmanager import AlertmanagerClient
from matrix_alertbot.config import AccountConfig, Config
from matrix_alertbot.config import AccountConfig, BiDict, Config
from matrix_alertbot.matrix import MatrixClientPool
@ -26,6 +35,30 @@ def mock_create_matrix_client(
return fake_matrix_client
def mock_joined_members(room_id: str) -> JoinedMembersResponse | JoinedMembersError:
fake_joined_members_response = Mock(spec=JoinedMembersResponse)
if "dmroom" in room_id:
fake_joined_members_response.members = [
Mock(spec=nio.RoomMember, user_id="@fake_dm_user:example.com"),
Mock(spec=nio.RoomMember, user_id="@fake_user:matrix.example.com"),
Mock(spec=nio.RoomMember, user_id="@other_user:chat.example.com"),
]
elif "!missing_other_user:example.com" == room_id:
fake_joined_members_response.members = [
Mock(spec=nio.RoomMember, user_id="@fake_dm_user:example.com"),
Mock(spec=nio.RoomMember, user_id="@fake_user:matrix.example.com"),
]
elif "!missing_dm_user:example.com" == room_id:
fake_joined_members_response.members = [
Mock(spec=nio.RoomMember, user_id="@fake_user:matrix.example.com"),
Mock(spec=nio.RoomMember, user_id="@other_user:chat.example.com"),
]
else:
fake_joined_members_response = Mock(spec=JoinedMembersError)
return fake_joined_members_response
class FakeAsyncClientConfig:
def __init__(
self,
@ -66,6 +99,14 @@ class MatrixClientPoolTestCase(unittest.IsolatedAsyncioTestCase):
self.fake_account_config_1,
self.fake_account_config_2,
]
self.fake_config.allowed_rooms = "!abcdefg:example.com"
self.fake_config.dm_users = BiDict(
{
"a7b37c33-574c-45ac-bb07-a3b314c2da54": "@fake_dm_user:example.com",
"cfb32a1d-737a-4618-8ee9-09b254d98fee": "@other_dm_user:example.com",
}
)
self.fake_config.dm_room_title = "Alerts for {user}"
@patch.object(
matrix_alertbot.matrix.MatrixClientPool, "_create_matrix_client", autospec=True
@ -104,6 +145,24 @@ class MatrixClientPoolTestCase(unittest.IsolatedAsyncioTestCase):
self.assertEqual(2, len(matrix_client_pool._accounts))
self.assertEqual(2, len(matrix_client_pool._matrix_clients))
@patch.object(
matrix_alertbot.matrix.MatrixClientPool, "_create_matrix_client", autospec=True
)
async def test_unactive_user_ids(self, fake_create_matrix_client) -> None:
fake_matrix_client = Mock(spec=nio.AsyncClient)
fake_create_matrix_client.return_value = fake_matrix_client
matrix_client_pool = MatrixClientPool(
alertmanager_client=self.fake_alertmanager_client,
cache=self.fake_cache,
config=self.fake_config,
)
unactive_user_ids = matrix_client_pool.unactive_user_ids()
self.assertEqual(self.fake_account_config_1, matrix_client_pool.account)
self.assertListEqual([self.fake_account_config_2.id], unactive_user_ids)
@patch.object(
matrix_alertbot.matrix.MatrixClientPool, "_create_matrix_client", autospec=True
)
@ -296,6 +355,335 @@ class MatrixClientPoolTestCase(unittest.IsolatedAsyncioTestCase):
self.assertTrue(matrix_client_1.config.store_sync_tokens)
self.assertFalse(matrix_client_1.config.encryption_enabled)
@patch.object(
matrix_alertbot.matrix.MatrixClientPool,
"_create_matrix_client",
autospec=True,
side_effect=mock_create_matrix_client,
)
async def test_find_existing_dm_rooms(self, fake_create_matrix_client) -> None:
matrix_client_pool = MatrixClientPool(
alertmanager_client=self.fake_alertmanager_client,
cache=self.fake_cache,
config=self.fake_config,
)
fake_matrix_client = matrix_client_pool.matrix_client
fake_matrix_client.rooms = [
"!abcdefg:example.com",
"!fake_dmroom:example.com",
"!missing_other_user:example.com",
"!missing_dm_user:example.com",
"!error:example.com",
]
fake_matrix_client.joined_members.side_effect = mock_joined_members
dm_rooms = await matrix_client_pool.find_existing_dm_rooms(
account=matrix_client_pool.account,
matrix_client=matrix_client_pool.matrix_client,
config=self.fake_config,
)
fake_matrix_client.joined_members.assert_has_calls(
[
call("!fake_dmroom:example.com"),
call("!missing_other_user:example.com"),
call("!missing_dm_user:example.com"),
call("!error:example.com"),
]
)
self.assertDictEqual(
{"@fake_dm_user:example.com": "!fake_dmroom:example.com"}, dm_rooms
)
@patch.object(
matrix_alertbot.matrix.MatrixClientPool,
"_create_matrix_client",
autospec=True,
side_effect=mock_create_matrix_client,
)
async def test_find_existing_dm_rooms_with_duplicates(
self, fake_create_matrix_client
) -> None:
matrix_client_pool = MatrixClientPool(
alertmanager_client=self.fake_alertmanager_client,
cache=self.fake_cache,
config=self.fake_config,
)
fake_matrix_client = matrix_client_pool.matrix_client
fake_matrix_client.rooms = [
"!abcdefg:example.com",
"!fake_dmroom:example.com",
"!other_dmroom:example.com",
]
fake_matrix_client.joined_members.side_effect = mock_joined_members
dm_rooms = await matrix_client_pool.find_existing_dm_rooms(
account=matrix_client_pool.account,
matrix_client=matrix_client_pool.matrix_client,
config=self.fake_config,
)
fake_matrix_client.joined_members.assert_has_calls(
[
call("!fake_dmroom:example.com"),
call("!other_dmroom:example.com"),
]
)
self.assertDictEqual(
{"@fake_dm_user:example.com": "!fake_dmroom:example.com"}, dm_rooms
)
@patch.object(
matrix_alertbot.matrix.MatrixClientPool,
"_create_matrix_client",
autospec=True,
side_effect=mock_create_matrix_client,
)
async def test_create_dm_rooms(self, fake_create_matrix_client) -> None:
matrix_client_pool = MatrixClientPool(
alertmanager_client=self.fake_alertmanager_client,
cache=self.fake_cache,
config=self.fake_config,
)
fake_find_existing_dm_rooms = AsyncMock(autospec=True)
fake_find_existing_dm_rooms.return_value = {
"@other_dm_user:example.com": "!other_dmroom:example.com"
}
matrix_client_pool.find_existing_dm_rooms = fake_find_existing_dm_rooms
fake_matrix_client = matrix_client_pool.matrix_client
fake_matrix_client.get_displayname.return_value = Mock(
spec=ProfileGetDisplayNameResponse, displayname="FakeUser"
)
fake_room_create_response = Mock(spec=RoomCreateResponse)
fake_room_create_response.room_id = "!fake_dmroom:example.com"
fake_matrix_client.room_create.return_value = fake_room_create_response
await matrix_client_pool.create_dm_rooms(
account=matrix_client_pool.account,
matrix_client=matrix_client_pool.matrix_client,
config=self.fake_config,
)
fake_matrix_client.get_displayname.assert_called_once_with(
"@fake_dm_user:example.com"
)
fake_matrix_client.room_create.assert_called_once_with(
visibility=RoomVisibility.private,
name="Alerts for FakeUser",
invite=["@other_user:chat.example.com", "@fake_dm_user:example.com"],
is_direct=True,
preset=RoomPreset.private_chat,
)
self.assertDictEqual(
{
"@fake_dm_user:example.com": "!fake_dmroom:example.com",
"@other_dm_user:example.com": "!other_dmroom:example.com",
},
matrix_client_pool.dm_rooms,
)
@patch.object(
matrix_alertbot.matrix.MatrixClientPool,
"_create_matrix_client",
autospec=True,
side_effect=mock_create_matrix_client,
)
async def test_create_dm_rooms_with_empty_room_id(
self, fake_create_matrix_client
) -> None:
matrix_client_pool = MatrixClientPool(
alertmanager_client=self.fake_alertmanager_client,
cache=self.fake_cache,
config=self.fake_config,
)
fake_find_existing_dm_rooms = AsyncMock(autospec=True)
fake_find_existing_dm_rooms.return_value = {
"@other_dm_user:example.com": "!other_dmroom:example.com"
}
matrix_client_pool.find_existing_dm_rooms = fake_find_existing_dm_rooms
fake_matrix_client = matrix_client_pool.matrix_client
fake_matrix_client.get_displayname.return_value = Mock(
spec=ProfileGetDisplayNameResponse, displayname="FakeUser"
)
fake_room_create_response = Mock(spec=RoomCreateResponse)
fake_room_create_response.room_id = None
fake_matrix_client.room_create.return_value = fake_room_create_response
await matrix_client_pool.create_dm_rooms(
account=matrix_client_pool.account,
matrix_client=matrix_client_pool.matrix_client,
config=self.fake_config,
)
fake_matrix_client.get_displayname.assert_called_once_with(
"@fake_dm_user:example.com"
)
fake_matrix_client.room_create.assert_called_once_with(
visibility=RoomVisibility.private,
name="Alerts for FakeUser",
invite=["@other_user:chat.example.com", "@fake_dm_user:example.com"],
is_direct=True,
preset=RoomPreset.private_chat,
)
self.assertDictEqual(
{
"@other_dm_user:example.com": "!other_dmroom:example.com",
},
matrix_client_pool.dm_rooms,
)
@patch.object(
matrix_alertbot.matrix.MatrixClientPool,
"_create_matrix_client",
autospec=True,
side_effect=mock_create_matrix_client,
)
async def test_create_dm_rooms_with_empty_room_title(
self, fake_create_matrix_client
) -> None:
self.fake_config.dm_room_title = None
matrix_client_pool = MatrixClientPool(
alertmanager_client=self.fake_alertmanager_client,
cache=self.fake_cache,
config=self.fake_config,
)
fake_find_existing_dm_rooms = AsyncMock(autospec=True)
fake_find_existing_dm_rooms.return_value = {
"@other_dm_user:example.com": "!other_dmroom:example.com"
}
matrix_client_pool.find_existing_dm_rooms = fake_find_existing_dm_rooms
fake_matrix_client = matrix_client_pool.matrix_client
fake_matrix_client.get_displayname.return_value = Mock(
spec=ProfileGetDisplayNameResponse, displayname="FakeUser"
)
fake_room_create_response = Mock(spec=RoomCreateResponse)
fake_room_create_response.room_id = "!fake_dmroom:example.com"
fake_matrix_client.room_create.return_value = fake_room_create_response
await matrix_client_pool.create_dm_rooms(
account=matrix_client_pool.account,
matrix_client=matrix_client_pool.matrix_client,
config=self.fake_config,
)
fake_matrix_client.get_displayname.assert_called_once_with(
"@fake_dm_user:example.com"
)
fake_matrix_client.room_create.assert_called_once_with(
visibility=RoomVisibility.private,
name=None,
invite=["@other_user:chat.example.com", "@fake_dm_user:example.com"],
is_direct=True,
preset=RoomPreset.private_chat,
)
self.assertDictEqual(
{
"@fake_dm_user:example.com": "!fake_dmroom:example.com",
"@other_dm_user:example.com": "!other_dmroom:example.com",
},
matrix_client_pool.dm_rooms,
)
@patch.object(
matrix_alertbot.matrix.MatrixClientPool,
"_create_matrix_client",
autospec=True,
side_effect=mock_create_matrix_client,
)
async def test_create_dm_rooms_with_error(self, fake_create_matrix_client) -> None:
matrix_client_pool = MatrixClientPool(
alertmanager_client=self.fake_alertmanager_client,
cache=self.fake_cache,
config=self.fake_config,
)
fake_find_existing_dm_rooms = AsyncMock(autospec=True)
fake_find_existing_dm_rooms.return_value = {
"@other_dm_user:example.com": "!other_dmroom:example.com"
}
matrix_client_pool.find_existing_dm_rooms = fake_find_existing_dm_rooms
fake_matrix_client = matrix_client_pool.matrix_client
fake_matrix_client.get_displayname.return_value = Mock(
spec=ProfileGetDisplayNameResponse, displayname="FakeUser"
)
fake_room_create_response = Mock(spec=RoomCreateError)
fake_room_create_response.message = "error"
fake_matrix_client.room_create.return_value = fake_room_create_response
await matrix_client_pool.create_dm_rooms(
account=matrix_client_pool.account,
matrix_client=matrix_client_pool.matrix_client,
config=self.fake_config,
)
fake_matrix_client.get_displayname.assert_called_once_with(
"@fake_dm_user:example.com"
)
fake_matrix_client.room_create.assert_called_once_with(
visibility=RoomVisibility.private,
name="Alerts for FakeUser",
invite=["@other_user:chat.example.com", "@fake_dm_user:example.com"],
is_direct=True,
preset=RoomPreset.private_chat,
)
self.assertDictEqual(
{
"@other_dm_user:example.com": "!other_dmroom:example.com",
},
matrix_client_pool.dm_rooms,
)
@patch.object(
matrix_alertbot.matrix.MatrixClientPool,
"_create_matrix_client",
autospec=True,
side_effect=mock_create_matrix_client,
)
async def test_create_dm_rooms_with_display_name_error(
self, fake_create_matrix_client
) -> None:
matrix_client_pool = MatrixClientPool(
alertmanager_client=self.fake_alertmanager_client,
cache=self.fake_cache,
config=self.fake_config,
)
fake_find_existing_dm_rooms = AsyncMock(autospec=True)
fake_find_existing_dm_rooms.return_value = {
"@other_dm_user:example.com": "!other_dmroom:example.com"
}
matrix_client_pool.find_existing_dm_rooms = fake_find_existing_dm_rooms
fake_matrix_client = matrix_client_pool.matrix_client
fake_matrix_client.get_displayname.return_value = Mock(
spec=ProfileGetDisplayNameError, message="error"
)
fake_room_create_response = Mock(spec=RoomCreateResponse)
fake_room_create_response.room_id = None
fake_matrix_client.room_create.return_value = fake_room_create_response
await matrix_client_pool.create_dm_rooms(
account=matrix_client_pool.account,
matrix_client=matrix_client_pool.matrix_client,
config=self.fake_config,
)
fake_matrix_client.get_displayname.assert_called_once_with(
"@fake_dm_user:example.com"
)
fake_matrix_client.room_create.assert_not_called()
self.assertDictEqual(
{
"@other_dm_user:example.com": "!other_dmroom:example.com",
},
matrix_client_pool.dm_rooms,
)
if __name__ == "__main__":
unittest.main()

View file

@ -1,3 +1,4 @@
import re
import unittest
from typing import Dict
from unittest.mock import Mock, call, patch
@ -10,7 +11,7 @@ from diskcache import Cache
import matrix_alertbot.webhook
from matrix_alertbot.alert import Alert, AlertRenderer
from matrix_alertbot.alertmanager import AlertmanagerClient
from matrix_alertbot.config import Config
from matrix_alertbot.config import BiDict, Config
from matrix_alertbot.errors import (
AlertmanagerError,
MatrixClientError,
@ -38,6 +39,9 @@ class WebhookApplicationTestCase(aiohttp.test_utils.AioHTTPTestCase):
self.fake_matrix_client = Mock(spec=nio.AsyncClient)
self.fake_matrix_client_pool = Mock(spec=MatrixClientPool)
self.fake_matrix_client_pool.matrix_client = self.fake_matrix_client
self.fake_matrix_client_pool.dm_rooms = {
"@fake_dm_user:example.com": "!dmroom:example.com"
}
self.fake_alertmanager_client = Mock(spec=AlertmanagerClient)
self.fake_alert_renderer = Mock(spec=AlertRenderer)
self.fake_cache = Mock(spec=Cache)
@ -51,6 +55,11 @@ class WebhookApplicationTestCase(aiohttp.test_utils.AioHTTPTestCase):
self.fake_config.allowed_rooms = [self.fake_room_id]
self.fake_config.cache_expire_time = 0
self.fake_config.template_dir = None
self.fake_config.dm_select_label = "uuid"
self.fake_config.dm_filter_labels = {"matrix-alertbot": re.compile("dm")}
self.fake_config.dm_users = BiDict(
{"a7b37c33-574c-45ac-bb07-a3b314c2da54": "@fake_dm_user:example.com"}
)
self.fake_request = Mock(spec=web_request.Request)
self.fake_request.app = {
@ -89,6 +98,20 @@ class WebhookApplicationTestCase(aiohttp.test_utils.AioHTTPTestCase):
self.fake_alert_2,
]
}
self.fake_dm_alert = {
"fingerprint": "fingerprint",
"generatorURL": "http://example.com/alert",
"status": "firing",
"labels": {
"alertname": "alert",
"severity": "warning",
"job": "job",
"uuid": "a7b37c33-574c-45ac-bb07-a3b314c2da54",
"matrix-alertbot": "dm",
},
"annotations": {"description": "some description"},
}
self.fake_dm_alerts = {"alerts": [self.fake_dm_alert]}
webhook = Webhook(
self.fake_matrix_client_pool,
@ -261,6 +284,85 @@ class WebhookApplicationTestCase(aiohttp.test_utils.AioHTTPTestCase):
self.fake_cache.set.assert_not_called()
self.fake_cache.delete.assert_not_called()
@patch.object(matrix_alertbot.webhook, "send_text_to_room")
async def test_post_alerts_for_dm_user(self, fake_send_text_to_room: Mock) -> None:
self.fake_alertmanager_client.update_silence.side_effect = (
update_silence_raise_silence_not_found
)
data = self.fake_dm_alerts
async with self.client.request(
"POST", f"/alerts/{self.fake_room_id}", json=data
) as response:
self.assertEqual(200, response.status)
self.fake_alertmanager_client.update_silence.assert_called_once_with(
"fingerprint"
)
self.assertEqual(1, fake_send_text_to_room.call_count)
fake_send_text_to_room.assert_has_calls(
[
call(
self.fake_matrix_client,
self.fake_matrix_client_pool.dm_rooms["@fake_dm_user:example.com"],
"[⚠️ WARNING] alert: some description",
'<font color="#ffc107">\n <b>[⚠️ WARNING]</b>\n</font> '
'<a href="http://example.com/alert">alert</a>\n (job)<br/>\n'
"some description",
notice=False,
),
]
)
self.fake_cache.set.assert_called_once_with(
fake_send_text_to_room.return_value.event_id,
"fingerprint",
expire=self.fake_config.cache_expire_time,
)
self.assertEqual(1, self.fake_cache.delete.call_count)
self.fake_cache.delete.assert_has_calls([call("fingerprint")])
@patch.object(matrix_alertbot.webhook, "send_text_to_room")
async def test_post_alerts_for_unknown_dm_user(
self, fake_send_text_to_room: Mock
) -> None:
self.fake_alertmanager_client.update_silence.side_effect = (
update_silence_raise_silence_not_found
)
self.fake_config.dm_users = BiDict()
data = self.fake_dm_alerts
async with self.client.request(
"POST", f"/alerts/{self.fake_room_id}", json=data
) as response:
self.assertEqual(200, response.status)
self.fake_alertmanager_client.update_silence.assert_not_called()
fake_send_text_to_room.assert_not_called()
self.fake_cache.set.assert_not_called()
self.fake_cache.delete.assert_not_called()
@patch.object(matrix_alertbot.webhook, "send_text_to_room")
async def test_post_alerts_for_dm_user_with_unknown_room(
self, fake_send_text_to_room: Mock
) -> None:
self.fake_alertmanager_client.update_silence.side_effect = (
update_silence_raise_silence_not_found
)
self.fake_matrix_client_pool.dm_rooms = {}
data = self.fake_dm_alerts
async with self.client.request(
"POST", f"/alerts/{self.fake_room_id}", json=data
) as response:
self.assertEqual(200, response.status)
self.fake_alertmanager_client.update_silence.assert_not_called()
fake_send_text_to_room.assert_not_called()
self.fake_cache.set.assert_not_called()
self.fake_cache.delete.assert_not_called()
@patch.object(matrix_alertbot.webhook, "send_text_to_room")
async def test_post_alerts_with_empty_data(
self, fake_send_text_to_room: Mock
@ -418,6 +520,7 @@ class WebhookApplicationTestCase(aiohttp.test_utils.AioHTTPTestCase):
fake_alert = Mock(spec=Alert)
fake_alert.firing = True
fake_alert.fingerprint = "fingerprint"
fake_alert.labels = []
await create_alert(fake_alert, self.fake_room_id, self.fake_request)
@ -433,6 +536,7 @@ class WebhookApplicationTestCase(aiohttp.test_utils.AioHTTPTestCase):
fake_alert = Mock(spec=Alert)
fake_alert.firing = True
fake_alert.fingerprint = "fingerprint"
fake_alert.labels = []
self.fake_alertmanager_client.update_silence.side_effect = SilenceNotFoundError
@ -461,6 +565,7 @@ class WebhookApplicationTestCase(aiohttp.test_utils.AioHTTPTestCase):
fake_alert = Mock(spec=Alert)
fake_alert.firing = True
fake_alert.fingerprint = "fingerprint"
fake_alert.labels = []
self.fake_alertmanager_client.update_silence.side_effect = SilenceExtendError
@ -487,6 +592,7 @@ class WebhookApplicationTestCase(aiohttp.test_utils.AioHTTPTestCase):
fake_alert = Mock(spec=Alert)
fake_alert.firing = False
fake_alert.fingerprint = "fingerprint"
fake_alert.labels = []
await create_alert(fake_alert, self.fake_room_id, self.fake_request)
@ -507,6 +613,7 @@ class WebhookApplicationTestCase(aiohttp.test_utils.AioHTTPTestCase):
fake_alert = Mock(spec=Alert)
fake_alert.firing = False
fake_alert.fingerprint = "fingerprint"
fake_alert.labels = []
self.fake_matrix_client_pool.matrix_client = None