unit tests for command and callbacks ; make alertmanager requests async

This commit is contained in:
HgO 2022-07-08 21:11:25 +02:00
parent d94a8c1098
commit 9a3f775195
10 changed files with 580 additions and 181 deletions

View file

@ -1,12 +1,12 @@
import datetime import datetime
from typing import Dict, List from typing import Any, Dict, List
import diskcache import aiohttp
import pytimeparse import pytimeparse
import requests from aiohttp import ClientError
from requests import RequestException from aiotools import AsyncContextManager
from diskcache import Cache
from matrix_alertbot.config import Config
from matrix_alertbot.errors import ( from matrix_alertbot.errors import (
AlertmanagerError, AlertmanagerError,
AlertNotFoundError, AlertNotFoundError,
@ -14,30 +14,37 @@ from matrix_alertbot.errors import (
) )
class AlertmanagerClient: class AlertmanagerClient(AsyncContextManager):
def __init__(self, config: Config) -> None: def __init__(self, url: str, cache: Cache) -> None:
url = config.alertmanager_url
self.api_url = f"{url}/api/v2" self.api_url = f"{url}/api/v2"
self.cache = diskcache.Cache(config.cache_dir) self.cache = cache
self.session = aiohttp.ClientSession()
def get_alerts(self) -> List[Dict]: async def __aexit__(self, *args: Any, **kwargs: Any) -> None:
await super().__aexit__(*args, **kwargs)
await self.close()
async def close(self) -> None:
await self.session.close()
async def get_alerts(self) -> List[Dict]:
try: try:
response = requests.get(f"{self.api_url}/alerts") async with self.session.get(f"{self.api_url}") as response:
response.raise_for_status() response.raise_for_status()
except RequestException as e: return await response.json()
except ClientError as e:
raise AlertmanagerError(f"Cannot fetch alerts from Alertmanager") from e raise AlertmanagerError(f"Cannot fetch alerts from Alertmanager") from e
return response.json()
def get_alert(self, fingerprint: str) -> Dict: async def get_alert(self, fingerprint: str) -> Dict:
alerts = self.get_alerts() alerts = await self.get_alerts()
return self._find_alert(fingerprint, alerts) return self._find_alert(fingerprint, alerts)
def get_alert_labels(self, fingerprint: str) -> Dict[str, str]: async def get_alert_labels(self, fingerprint: str) -> Dict[str, str]:
alert = self.get_alert(fingerprint) alert = await self.get_alert(fingerprint)
return alert["labels"] return alert["labels"]
def create_silence(self, fingerprint: str, duration: str, user: str) -> str: async def create_silence(self, fingerprint: str, duration: str, user: str) -> str:
labels = self.get_alert_labels(fingerprint) labels = await self.get_alert_labels(fingerprint)
matchers = [] matchers = []
for label_name, label_value in labels.items(): for label_name, label_value in labels.items():
matchers.append( matchers.append(
@ -56,18 +63,22 @@ class AlertmanagerClient:
"createdBy": user, "createdBy": user,
"comment": "Acknowledge alert from Matrix", "comment": "Acknowledge alert from Matrix",
} }
try: try:
response = requests.post(f"{self.api_url}/silences", json=silence) async with self.session.post(
f"{self.api_url}/silences", json=silence
) as response:
response.raise_for_status() response.raise_for_status()
except RequestException as e: data = await response.json()
except ClientError as e:
raise AlertmanagerError( raise AlertmanagerError(
f"Cannot create silence for alert fingerprint {fingerprint}" f"Cannot create silence for alert fingerprint {fingerprint}"
) from e ) from e
data = response.json()
return data["silenceID"] return data["silenceID"]
def delete_silence(self, fingerprint: str) -> None: async def delete_silence(self, fingerprint: str) -> None:
alert = self.get_alert(fingerprint) alert = await self.get_alert(fingerprint)
alert_state = alert["status"]["state"] alert_state = alert["status"]["state"]
if alert_state != "suppressed": if alert_state != "suppressed":
@ -77,13 +88,16 @@ class AlertmanagerClient:
silences = alert["status"]["silencedBy"] silences = alert["status"]["silencedBy"]
for silence in silences: for silence in silences:
await self._delete_silence_by_id(silence)
async def _delete_silence_by_id(self, silence: str) -> None:
try: try:
response = requests.delete(f"{self.api_url}/silence/{silence}") async with self.session.delete(
f"{self.api_url}/silence/{silence}"
) as response:
response.raise_for_status() response.raise_for_status()
except RequestException as e: except ClientError as e:
raise AlertmanagerError( raise AlertmanagerError(f"Cannot delete silence with ID {silence}") from e
f"Cannot delete silence with ID {silence}"
) from e
@staticmethod @staticmethod
def _find_alert(fingerprint: str, alerts: List[Dict]) -> Dict: def _find_alert(fingerprint: str, alerts: List[Dict]) -> Dict:

View file

@ -13,7 +13,7 @@ from nio import (
) )
from matrix_alertbot.alertmanager import AlertmanagerClient from matrix_alertbot.alertmanager import AlertmanagerClient
from matrix_alertbot.bot_commands import Command from matrix_alertbot.command import Command
from matrix_alertbot.chat_functions import make_pill, send_text_to_room, strip_fallback from matrix_alertbot.chat_functions import make_pill, send_text_to_room, strip_fallback
from matrix_alertbot.config import Config from matrix_alertbot.config import Config
@ -25,6 +25,7 @@ class Callbacks:
self, self,
client: AsyncClient, client: AsyncClient,
alertmanager: AlertmanagerClient, alertmanager: AlertmanagerClient,
cache: Cache,
config: Config, config: Config,
): ):
""" """
@ -38,7 +39,7 @@ class Callbacks:
config: Bot configuration parameters. config: Bot configuration parameters.
""" """
self.client = client self.client = client
self.cache = Cache(config.cache_dir) self.cache = cache
self.alertmanager = alertmanager self.alertmanager = alertmanager
self.config = config self.config = config
self.command_prefix = config.command_prefix self.command_prefix = config.command_prefix
@ -59,17 +60,15 @@ class Callbacks:
return return
# Ignore messages from unauthorized room # Ignore messages from unauthorized room
if room.room_id != self.config.room: if room.room_id != self.config.room_id:
return return
logger.debug( logger.debug(
f"Bot message received for room {room.display_name} | " f"Bot message received for room {room.display_name} | "
f"{room.user_name(event.sender)}: {msg}" f"{room.user_name(event.sender)}: {msg}"
) )
# Process as message if in a public room without command prefix # Process as message if in a public room without command prefix
has_command_prefix = msg.startswith(self.command_prefix) has_command_prefix = msg.startswith(self.command_prefix)
if not has_command_prefix: if not has_command_prefix:
logger.debug( logger.debug(
f"Message received without command prefix {self.command_prefix}: Aborting." f"Message received without command prefix {self.command_prefix}: Aborting."
@ -78,11 +77,11 @@ class Callbacks:
# Remove the command prefix # Remove the command prefix
msg = msg[len(self.command_prefix) :] msg = msg[len(self.command_prefix) :]
command = Command( command = Command(
self.client, self.cache, self.alertmanager, self.config, msg, room, event self.client, self.cache, self.alertmanager, self.config, msg, room, event
) )
await command.process() await command.process()
# print("test:", command.command)
async def invite(self, room: MatrixRoom, event: InviteMemberEvent) -> None: async def invite(self, room: MatrixRoom, event: InviteMemberEvent) -> None:
"""Callback for when an invite is received. Join the room specified in the invite. """Callback for when an invite is received. Join the room specified in the invite.
@ -93,7 +92,7 @@ class Callbacks:
event: The invite event. event: The invite event.
""" """
# Ignore invites from unauthorized room # Ignore invites from unauthorized room
if room.room_id != self.config.room: if room.room_id != self.config.room_id:
return return
logger.debug(f"Got invite to {room.room_id} from {event.sender}.") logger.debug(f"Got invite to {room.room_id} from {event.sender}.")
@ -141,7 +140,7 @@ class Callbacks:
reacted_to_id: The event ID that the reaction points to. reacted_to_id: The event ID that the reaction points to.
""" """
# Ignore reactions from unauthorized room # Ignore reactions from unauthorized room
if room.room_id != self.config.room: if room.room_id != self.config.room_id:
return return
# Ignore reactions from ourselves # Ignore reactions from ourselves
@ -187,7 +186,7 @@ class Callbacks:
event: The encrypted event that we were unable to decrypt. event: The encrypted event that we were unable to decrypt.
""" """
# Ignore events from unauthorized room # Ignore events from unauthorized room
if room.room_id != self.config.room: if room.room_id != self.config.room_id:
return return
logger.error( logger.error(
@ -211,7 +210,7 @@ class Callbacks:
event: The event itself. event: The event itself.
""" """
# Ignore events from unauthorized room # Ignore events from unauthorized room
if room.room_id != self.config.room: if room.room_id != self.config.room_id:
return return
if event.type == "m.reaction": if event.type == "m.reaction":

View file

@ -84,12 +84,13 @@ class Command:
count_created_silences = 0 count_created_silences = 0
alert_fingerprints = self.cache[alert_event_id] alert_fingerprints = self.cache[alert_event_id]
print(self.cache[alert_event_id])
for alert_fingerprint in alert_fingerprints: for alert_fingerprint in alert_fingerprints:
logger.debug( logger.debug(
f"Create silence for alert with fingerprint {alert_fingerprint} for a duration of {duration}" f"Create silence for alert with fingerprint {alert_fingerprint} for a duration of {duration}"
) )
try: try:
silence_id = self.alertmanager.create_silence( silence_id = await self.alertmanager.create_silence(
alert_fingerprint, duration, self.room.user_name(self.event.sender) alert_fingerprint, duration, self.room.user_name(self.event.sender)
) )
except (AlertNotFoundError, AlertmanagerError) as e: except (AlertNotFoundError, AlertmanagerError) as e:
@ -100,7 +101,7 @@ class Command:
await send_text_to_room( await send_text_to_room(
self.client, self.client,
self.room.room_id, self.room.room_id,
f"Created {count_created_silences} silences with a duration of {duration}", f"Created {count_created_silences} silences with a duration of {duration}.",
) )
async def _unack(self) -> None: async def _unack(self) -> None:
@ -125,7 +126,7 @@ class Command:
f"Delete silence for alert with fingerprint {alert_fingerprint}" f"Delete silence for alert with fingerprint {alert_fingerprint}"
) )
try: try:
self.alertmanager.delete_silence(alert_fingerprint) await self.alertmanager.delete_silence(alert_fingerprint)
except (AlertNotFoundError, SilenceNotFoundError, AlertmanagerError) as e: except (AlertNotFoundError, SilenceNotFoundError, AlertmanagerError) as e:
logger.error(f"Unable to delete silence: {e}") logger.error(f"Unable to delete silence: {e}")
continue continue
@ -134,7 +135,7 @@ class Command:
await send_text_to_room( await send_text_to_room(
self.client, self.client,
self.room.room_id, self.room.room_id,
f"Removed {count_removed_silences} silences", f"Removed {count_removed_silences} silences.",
) )
async def _react(self) -> None: async def _react(self) -> None:

View file

@ -94,7 +94,7 @@ class Config:
["matrix", "device_name"], default="nio-template" ["matrix", "device_name"], default="nio-template"
) )
self.homeserver_url = self._get_cfg(["matrix", "url"], required=True) self.homeserver_url = self._get_cfg(["matrix", "url"], required=True)
self.room = self._get_cfg(["matrix", "room"], required=True) self.room_id = self._get_cfg(["matrix", "room"], required=True)
self.address = self._get_cfg(["webhook", "address"], required=False) self.address = self._get_cfg(["webhook", "address"], required=False)
self.port = self._get_cfg(["webhook", "port"], required=False) self.port = self._get_cfg(["webhook", "port"], required=False)

View file

@ -5,8 +5,8 @@ import sys
from asyncio import TimeoutError from asyncio import TimeoutError
from time import sleep from time import sleep
import diskcache
from aiohttp import ClientConnectionError, ServerDisconnectedError from aiohttp import ClientConnectionError, ServerDisconnectedError
from diskcache import Cache
from nio import ( from nio import (
AsyncClient, AsyncClient,
AsyncClientConfig, AsyncClientConfig,
@ -19,78 +19,14 @@ from nio import (
) )
from matrix_alertbot.alertmanager import AlertmanagerClient from matrix_alertbot.alertmanager import AlertmanagerClient
from matrix_alertbot.callbacks import Callbacks from matrix_alertbot.callback import Callbacks
from matrix_alertbot.config import Config from matrix_alertbot.config import Config
from matrix_alertbot.webhook import Webhook from matrix_alertbot.webhook import Webhook
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def start_matrix_client(client: AsyncClient, config: Config) -> bool: def create_matrix_client(config: Config) -> AsyncClient:
# Keep trying to reconnect on failure (with some time in-between)
while True:
try:
if config.user_token:
# Use token to log in
client.load_store()
# Sync encryption keys with the server
if client.should_upload_keys:
await client.keys_upload()
else:
# Try to login with the configured username/password
try:
login_response = await client.login(
password=config.user_password,
device_name=config.device_name,
)
# Check if login failed
if type(login_response) == LoginError:
logger.error("Failed to login: %s", login_response.message)
return False
except LocalProtocolError as e:
# There's an edge case here where the user hasn't installed the correct C
# dependencies. In that case, a LocalProtocolError is raised on login.
logger.fatal(
"Failed to login. Have you installed the correct dependencies? "
"https://github.com/poljar/matrix-nio#installation "
"Error: %s",
e,
)
return False
# Login succeeded!
logger.info(f"Logged in as {config.user_id}")
await client.sync_forever(timeout=30000, full_state=True)
except (ClientConnectionError, ServerDisconnectedError, TimeoutError):
logger.warning("Unable to connect to homeserver, retrying in 15s...")
# Sleep so we don't bombard the server with login requests
sleep(15)
finally:
# Make sure to close the client connection on disconnect
await client.close()
def main() -> None:
"""The first function that is run when starting the bot"""
# Read user-configured options from a config file.
# A different config file path can be specified as the first command line argument
if len(sys.argv) > 1:
config_path = sys.argv[1]
else:
config_path = "config.yaml"
# Read the parsed config file and create a Config object
config = Config(config_path)
# Configure Alertmanager client
alertmanager = AlertmanagerClient(config)
# Configuration options for the AsyncClient # Configuration options for the AsyncClient
client_config = AsyncClientConfig( client_config = AsyncClientConfig(
max_limit_exceeded=0, max_limit_exceeded=0,
@ -112,8 +48,15 @@ def main() -> None:
client.access_token = config.user_token client.access_token = config.user_token
client.user_id = config.user_id client.user_id = config.user_id
return client
async def start_matrix_client(cache: Cache, config: Config) -> bool:
async with create_matrix_client(config) as client:
# Configure Alertmanager client
async with AlertmanagerClient(config.alertmanager_url, cache) as alertmanager:
# Set up event callbacks # Set up event callbacks
callbacks = Callbacks(client, alertmanager, config) callbacks = Callbacks(client, alertmanager, cache, config)
client.add_event_callback(callbacks.message, (RoomMessageText,)) client.add_event_callback(callbacks.message, (RoomMessageText,))
client.add_event_callback( client.add_event_callback(
callbacks.invite_event_filtered_callback, (InviteMemberEvent,) callbacks.invite_event_filtered_callback, (InviteMemberEvent,)
@ -121,15 +64,82 @@ def main() -> None:
client.add_event_callback(callbacks.decryption_failure, (MegolmEvent,)) client.add_event_callback(callbacks.decryption_failure, (MegolmEvent,))
client.add_event_callback(callbacks.unknown, (UnknownEvent,)) client.add_event_callback(callbacks.unknown, (UnknownEvent,))
webhook_server = Webhook(client, config) # Keep trying to reconnect on failure (with some time in-between)
while True:
try:
if config.user_token:
# Use token to log in
client.load_store()
# Sync encryption keys with the server
if client.should_upload_keys:
await client.keys_upload()
else:
# Try to login with the configured username/password
try:
login_response = await client.login(
password=config.user_password,
device_name=config.device_name,
)
# Check if login failed
if type(login_response) == LoginError:
logger.error(
"Failed to login: %s", login_response.message
)
return False
except LocalProtocolError as e:
# There's an edge case here where the user hasn't installed the correct C
# dependencies. In that case, a LocalProtocolError is raised on login.
logger.fatal(
"Failed to login. Have you installed the correct dependencies? "
"https://github.com/poljar/matrix-nio#installation "
"Error: %s",
e,
)
return False
# Login succeeded!
logger.info(f"Logged in as {config.user_id}")
await client.sync_forever(timeout=30000, full_state=True)
except (ClientConnectionError, ServerDisconnectedError, TimeoutError):
logger.warning(
"Unable to connect to homeserver, retrying in 15s..."
)
# Sleep so we don't bombard the server with login requests
sleep(15)
async def start_webhook_server(cache: Cache, config: Config) -> None:
async with create_matrix_client(config) as client:
webhook_server = Webhook(client, cache, config)
await webhook_server.start()
def main() -> None:
"""The first function that is run when starting the bot"""
# Read user-configured options from a config file.
# A different config file path can be specified as the first command line argument
if len(sys.argv) > 1:
config_path = sys.argv[1]
else:
config_path = "config.yaml"
# Read the parsed config file and create a Config object
config = Config(config_path)
# Configure the cache
cache = Cache(config.cache_dir)
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
loop.create_task(webhook_server.start()) loop.create_task(start_webhook_server(cache, config))
loop.create_task(start_matrix_client(client, config)) loop.create_task(start_matrix_client(cache, config))
try: try:
loop.run_forever() loop.run_forever()
except Exception as e: except Exception as e:
logger.error(e) logger.error(e)
finally:
loop.run_until_complete(webhook_server.close())

View file

@ -34,7 +34,7 @@ async def create_alert(request: web_request.Request) -> web.Response:
try: try:
event = await send_text_to_room( event = await send_text_to_room(
client, config.room, plaintext, html, notice=False client, config.room_id, plaintext, html, notice=False
) )
except SendRetryError as e: except SendRetryError as e:
logger.error(e) logger.error(e)
@ -48,9 +48,7 @@ async def create_alert(request: web_request.Request) -> web.Response:
class Webhook: class Webhook:
def __init__(self, client: AsyncClient, config: Config) -> None: def __init__(self, client: AsyncClient, cache: Cache, config: Config) -> None:
cache = Cache(config.cache_dir)
self.app = web.Application(logger=logger) self.app = web.Application(logger=logger)
self.app["client"] = client self.app["client"] = client
self.app["config"] = config self.app["config"] = config

2
pytest.ini Normal file
View file

@ -0,0 +1,2 @@
[pytest]
asyncio_mode=strict

94
tests/test_callback.py Normal file
View file

@ -0,0 +1,94 @@
import unittest
from unittest.mock import Mock, patch
import nio
from diskcache import Cache
import matrix_alertbot.callback
from matrix_alertbot.alertmanager import AlertmanagerClient
from matrix_alertbot.callback import Callbacks
from tests.utils import make_awaitable
class CallbacksTestCase(unittest.IsolatedAsyncioTestCase):
def setUp(self) -> None:
# Create a Callbacks object and give it some Mock'd objects to use
self.fake_client = Mock(spec=nio.AsyncClient)
self.fake_client.user = "@fake_user:example.com"
self.fake_cache = Mock(spec=Cache)
self.fake_alertmanager = Mock(spec=AlertmanagerClient)
# Create a fake room to play with
self.fake_room = Mock(spec=nio.MatrixRoom)
self.fake_room.room_id = "!abcdefg:example.com"
self.fake_room.display_name = "Fake Room"
# We don't spec config, as it doesn't currently have well defined attributes
self.fake_config = Mock()
self.fake_config.room_id = self.fake_room.room_id
self.fake_config.command_prefix = "!alert "
self.callbacks = Callbacks(
self.fake_client, self.fake_cache, self.fake_alertmanager, self.fake_config
)
async def test_invite(self) -> None:
"""Tests the callback for InviteMemberEvents"""
# Tests that the bot attempts to join a room after being invited to it
fake_invite_event = Mock(spec=nio.InviteMemberEvent)
fake_invite_event.sender = "@some_other_fake_user:example.com"
# Pretend that attempting to join a room is always successful
self.fake_client.join.return_value = make_awaitable(None)
# Pretend that we received an invite event
await self.callbacks.invite(self.fake_room, fake_invite_event)
# Check that we attempted to join the room
self.fake_client.join.assert_called_once_with(self.fake_room.room_id)
@patch.object(matrix_alertbot.callback, "Command", autospec=True)
async def test_message_without_prefix(self, fake_command: Mock) -> None:
"""Tests the callback for RoomMessageText without any command prefix"""
# Tests that the bot process messages in the room
fake_message_event = Mock(spec=nio.RoomMessageText)
fake_message_event.sender = "@some_other_fake_user:example.com"
fake_message_event.body = "Hello world!"
# Pretend that we received a text message event
await self.callbacks.message(self.fake_room, fake_message_event)
# Check that the command was not executed
fake_command.assert_not_called()
@patch.object(matrix_alertbot.callback, "Command", autospec=True)
async def test_message_with_prefix(self, fake_command: Mock) -> None:
"""Tests the callback for RoomMessageText with the command prefix"""
# Tests that the bot process messages in the room that contain a command
fake_command_instance = fake_command.return_value
fake_command_instance.process.side_effect = lambda: print("hello")
fake_message_event = Mock(spec=nio.RoomMessageText)
fake_message_event.sender = "@some_other_fake_user:example.com"
fake_message_event.body = "!alert help"
# Pretend that we received a text message event
await self.callbacks.message(self.fake_room, fake_message_event)
# Check that we attempted to execute the command
fake_command.assert_called_once_with(
self.fake_client,
self.fake_alertmanager,
self.fake_cache,
self.fake_config,
"help",
self.fake_room,
fake_message_event,
)
fake_command_instance.process.assert_called_once()
if __name__ == "__main__":
unittest.main()

View file

@ -1,50 +0,0 @@
import unittest
from unittest.mock import Mock
import nio
from matrix_alertbot.alertmanager import AlertmanagerClient
from matrix_alertbot.callbacks import Callbacks
from tests.utils import make_awaitable, run_coroutine
class CallbacksTestCase(unittest.TestCase):
def setUp(self) -> None:
# Create a Callbacks object and give it some Mock'd objects to use
self.fake_client = Mock(spec=nio.AsyncClient)
self.fake_client.user = "@fake_user:example.com"
self.fake_alertmanager = Mock(spec=AlertmanagerClient)
# We don't spec config, as it doesn't currently have well defined attributes
self.fake_config = Mock()
self.callbacks = Callbacks(
self.fake_client, self.fake_alertmanager, self.fake_config
)
def test_invite(self) -> None:
"""Tests the callback for InviteMemberEvents"""
# Tests that the bot attempts to join a room after being invited to it
# Create a fake room and invite event to call the 'invite' callback with
fake_room = Mock(spec=nio.MatrixRoom)
fake_room_id = "!abcdefg:example.com"
fake_room.room_id = fake_room_id
fake_invite_event = Mock(spec=nio.InviteMemberEvent)
fake_invite_event.sender = "@some_other_fake_user:example.com"
# Pretend that attempting to join a room is always successful
self.fake_client.join.return_value = make_awaitable(None)
# Pretend that we received an invite event
run_coroutine(self.callbacks.invite(fake_room, fake_invite_event))
# Check that we attempted to join the room
self.fake_client.join.assert_called_once_with(fake_room_id)
if __name__ == "__main__":
unittest.main()

331
tests/test_command.py Normal file
View file

@ -0,0 +1,331 @@
from typing import Dict
import unittest
from unittest.mock import MagicMock, Mock, call, patch
import nio
from diskcache import Cache
import matrix_alertbot.callback
from matrix_alertbot.alertmanager import AlertmanagerClient
from matrix_alertbot.callback import Callbacks
from matrix_alertbot.command import Command
from tests.utils import make_awaitable
class CommandTestCase(unittest.IsolatedAsyncioTestCase):
def setUp(self) -> None:
# Create a Command object and give it some Mock'd objects to use
self.fake_client = Mock(spec=nio.AsyncClient)
self.fake_client.user = "@fake_user:example.com"
# Pretend that attempting to send a message is always successful
self.fake_client.room_send.return_value = make_awaitable(None)
self.fake_fingerprints = Mock(return_value=["fingerprint1", "fingerprint2"])
self.fake_cache = MagicMock(spec=Cache)
self.fake_cache.__getitem__ = self.fake_fingerprints
self.fake_alertmanager = Mock(spec=AlertmanagerClient)
# Create a fake room to play with
self.fake_room = Mock(spec=nio.MatrixRoom)
self.fake_room.room_id = "!abcdefg:example.com"
self.fake_room.display_name = "Fake Room"
self.fake_room.user_name.side_effect = lambda x: x
self.fake_source_not_in_reply = {"content": {}}
self.fake_source_in_reply = {
"content": {
"m.relates_to": {"m.in_reply_to": {"event_id": "some event id"}}
}
}
# We don't spec config, as it doesn't currently have well defined attributes
self.fake_config = Mock()
self.fake_config.room_id = self.fake_room.room_id
self.fake_config.command_prefix = "!alert "
@patch.object(matrix_alertbot.command.Command, "_ack")
async def test_process_ack_command(self, fake_ack: Mock) -> None:
"""Tests the callback for InviteMemberEvents"""
# Tests that the bot attempts to join a room after being invited to it
fake_message_event = Mock(spec=nio.RoomMessageText)
command = Command(
self.fake_client,
self.fake_cache,
self.fake_alertmanager,
self.fake_config,
"ack",
self.fake_room,
fake_message_event,
)
await command.process()
@patch.object(matrix_alertbot.command.Command, "_unack")
async def test_process_unack_command(self, fake_unack: Mock) -> None:
"""Tests the callback for InviteMemberEvents"""
# Tests that the bot attempts to join a room after being invited to it
fake_message_event = Mock(spec=nio.RoomMessageText)
for command_word in ("unack", "nack"):
command = Command(
self.fake_client,
self.fake_cache,
self.fake_alertmanager,
self.fake_config,
command_word,
self.fake_room,
fake_message_event,
)
await command.process()
# Check that we attempted to process the command
fake_unack.assert_has_calls([call(), call()])
@patch.object(matrix_alertbot.command.Command, "_show_help")
async def test_process_help_command(self, fake_help: Mock) -> None:
"""Tests the callback for InviteMemberEvents"""
# Tests that the bot attempts to join a room after being invited to it
fake_message_event = Mock(spec=nio.RoomMessageText)
command = Command(
self.fake_client,
self.fake_cache,
self.fake_alertmanager,
self.fake_config,
"help",
self.fake_room,
fake_message_event,
)
await command.process()
# Check that we attempted to process the command
fake_help.assert_called_once()
@patch.object(matrix_alertbot.command.Command, "_unknown_command")
async def test_process_unknown_command(self, fake_unknown: Mock) -> None:
"""Tests the callback for InviteMemberEvents"""
# Tests that the bot attempts to join a room after being invited to it
fake_message_event = Mock(spec=nio.RoomMessageText)
command = Command(
self.fake_client,
self.fake_cache,
self.fake_alertmanager,
self.fake_config,
"",
self.fake_room,
fake_message_event,
)
await command.process()
# Check that we attempted to process the command
fake_unknown.assert_called_once()
async def test_ack_not_in_reply_without_duration(self) -> None:
"""Tests the callback for InviteMemberEvents"""
# Tests that the bot attempts to join a room after being invited to it
fake_message_event = Mock(spec=nio.RoomMessageText)
fake_message_event.sender = "@some_other_fake_user:example.com"
fake_message_event.body = ""
fake_message_event.source = self.fake_source_not_in_reply
command = Command(
self.fake_client,
self.fake_cache,
self.fake_alertmanager,
self.fake_config,
"ack",
self.fake_room,
fake_message_event,
)
await command._ack()
# Check that we didn't attempt to create silences
self.fake_alertmanager.create_silence.assert_not_called()
self.fake_client.room_send.assert_not_called()
async def test_ack_not_in_reply_with_duration(self) -> None:
"""Tests the callback for InviteMemberEvents"""
# Tests that the bot attempts to join a room after being invited to it
fake_message_event = Mock(spec=nio.RoomMessageText)
fake_message_event.sender = "@some_other_fake_user:example.com"
fake_message_event.body = ""
fake_message_event.source = self.fake_source_not_in_reply
command = Command(
self.fake_client,
self.fake_cache,
self.fake_alertmanager,
self.fake_config,
"ack 2d",
self.fake_room,
fake_message_event,
)
await command._ack()
# Check that we didn't attempt to create silences
self.fake_alertmanager.create_silence.assert_not_called()
self.fake_client.room_send.assert_not_called()
@patch.object(matrix_alertbot.command, "send_text_to_room")
async def test_ack_in_reply_without_duration(
self, fake_send_text_to_room: Mock
) -> None:
"""Tests the callback for InviteMemberEvents"""
# Tests that the bot attempts to join a room after being invited to it
fake_message_event = Mock(spec=nio.RoomMessageText)
fake_message_event.sender = "@some_other_fake_user:example.com"
fake_message_event.body = ""
fake_message_event.source = self.fake_source_in_reply
command = Command(
self.fake_client,
self.fake_cache,
self.fake_alertmanager,
self.fake_config,
"ack",
self.fake_room,
fake_message_event,
)
await command._ack()
# Check that we attempted to create silences
self.fake_alertmanager.create_silence.assert_has_calls(
list(
call(
fingerprint,
"1d",
fake_message_event.sender,
)
for fingerprint in self.fake_fingerprints.return_value
)
)
fake_send_text_to_room.assert_called_once_with(
self.fake_client,
self.fake_room.room_id,
"Created 2 silences with a duration of 1d.",
)
@patch.object(matrix_alertbot.command, "send_text_to_room")
async def test_ack_in_reply_with_duration(
self, fake_send_text_to_room: Mock
) -> None:
"""Tests the callback for InviteMemberEvents"""
# Tests that the bot attempts to join a room after being invited to it
fake_message_event = Mock(spec=nio.RoomMessageText)
fake_message_event.sender = "@some_other_fake_user:example.com"
fake_message_event.body = ""
fake_message_event.source = self.fake_source_in_reply
command = Command(
self.fake_client,
self.fake_cache,
self.fake_alertmanager,
self.fake_config,
"ack 2d",
self.fake_room,
fake_message_event,
)
await command._ack()
# Check that we attempted to create silences
self.fake_alertmanager.create_silence.assert_has_calls(
list(
call(
fingerprint,
"2d",
fake_message_event.sender,
)
for fingerprint in self.fake_fingerprints.return_value
)
)
fake_send_text_to_room.assert_called_once_with(
self.fake_client,
self.fake_room.room_id,
"Created 2 silences with a duration of 2d.",
)
@patch.object(matrix_alertbot.command, "send_text_to_room")
async def test_unack_in_reply(self, fake_send_text_to_room: Mock) -> None:
"""Tests the callback for InviteMemberEvents"""
# Tests that the bot attempts to join a room after being invited to it
fake_message_event = Mock(spec=nio.RoomMessageText)
fake_message_event.sender = "@some_other_fake_user:example.com"
fake_message_event.body = ""
fake_message_event.source = self.fake_source_in_reply
command = Command(
self.fake_client,
self.fake_cache,
self.fake_alertmanager,
self.fake_config,
"unack",
self.fake_room,
fake_message_event,
)
await command._unack()
# Check that we attempted to create silences
self.fake_alertmanager.delete_silence.assert_has_calls(
list(
call(fingerprint) for fingerprint in self.fake_fingerprints.return_value
)
)
fake_send_text_to_room.assert_called_with(
self.fake_client, self.fake_room.room_id, "Removed 2 silences."
)
# @patch.object(matrix_alertbot.callback, "Command", autospec=True)
# async def test_message_without_prefix(self, fake_command: Mock) -> None:
# """Tests the callback for RoomMessageText without any command prefix"""
# # Tests that the bot process messages in the room
# fake_message_event = Mock(spec=nio.RoomMessageText)
# fake_message_event.sender = "@some_other_fake_user:example.com"
# fake_message_event.body = "Hello world!"
# # Pretend that we received a text message event
# await self.callbacks.message(self.fake_room, fake_message_event)
# # Check that the command was not executed
# fake_command.assert_not_called()
# @patch.object(matrix_alertbot.callback, "Command", autospec=True)
# async def test_message_with_prefix(self, fake_command: Mock) -> None:
# """Tests the callback for RoomMessageText with the command prefix"""
# # Tests that the bot process messages in the room that contain a command
# fake_command_instance = fake_command.return_value
# fake_command_instance.process.side_effect = lambda: print("hello")
# fake_message_event = Mock(spec=nio.RoomMessageText)
# fake_message_event.sender = "@some_other_fake_user:example.com"
# fake_message_event.body = "!alert help"
# # Pretend that we received a text message event
# await self.callbacks.message(self.fake_room, fake_message_event)
# # Check that we attempted to execute the command
# fake_command.assert_called_once_with(
# self.fake_client,
# self.fake_alertmanager,
# self.fake_cache,
# self.fake_config,
# "help",
# self.fake_room,
# fake_message_event,
# )
# fake_command_instance.process.assert_called_once()
if __name__ == "__main__":
unittest.main()