diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..e70438f --- /dev/null +++ b/.dockerignore @@ -0,0 +1,15 @@ +# PyCharm +.idea/ +.vscode/ + +# Python virtualenv environment folders +env/ +env3/ +.env/ +.venv/ + +# Python +__pycache__/ +*.egg-info/ +build/ +dist/ diff --git a/.gitignore b/.gitignore index e13a948..fbc86f6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ # PyCharm .idea/ +.vscode/ # Python virtualenv environment folders env/ diff --git a/docker/Dockerfile.dev b/docker/Dockerfile.dev index b0f2047..ca11d32 100644 --- a/docker/Dockerfile.dev +++ b/docker/Dockerfile.dev @@ -69,3 +69,5 @@ VOLUME ["/data"] # Start the app ENTRYPOINT ["matrix-alertbot", "/data/config.yaml"] + +EXPOSE 8080 diff --git a/matrix-alertbot b/matrix-alertbot index 06a9835..1567931 100755 --- a/matrix-alertbot +++ b/matrix-alertbot @@ -1,10 +1,9 @@ #!/usr/bin/env python3 -import asyncio try: from matrix_alertbot import main # Run the main function of the bot - asyncio.get_event_loop().run_until_complete(main.main()) + main.main() except ImportError as e: print("Unable to import matrix_alertbot.main:", e) diff --git a/matrix_alertbot/alertmanager.py b/matrix_alertbot/alertmanager.py new file mode 100644 index 0000000..da1fd1b --- /dev/null +++ b/matrix_alertbot/alertmanager.py @@ -0,0 +1,58 @@ +import datetime +from typing import Dict, List + +import pytimeparse +import requests + +from matrix_alertbot.cache import Cache +from matrix_alertbot.errors import AlertNotFoundError + + +class AlertmanagerClient: + def __init__(self, url: str, cache: Cache) -> None: + self.api_url = f"{url}/api/v2" + self.cache = cache + + def get_alerts(self) -> List[Dict]: + response = requests.get(f"{self.api_url}/alert") + response.raise_for_status() + return response.json() + + def get_alert_labels(self, fingerprint: str) -> Dict[str, str]: + if fingerprint not in self.cache: + alerts = self.get_alerts() + alert = self._find_alert(alerts, fingerprint) + self.cache[fingerprint] = alert["labels"] + return self.cache[fingerprint] + + def create_silence(self, fingerprint: str, duration: str, user: str) -> str: + labels = self.get_alert_labels(fingerprint) + matchers = [] + for label_name, label_value in labels.items(): + matchers.append( + {"name": label_name, "value": label_value, "isRegex": False} + ) + + start_time = datetime.datetime.now() + duration_seconds = pytimeparse.parse(duration) + duration_delta = datetime.timedelta(seconds=duration_seconds) + end_time = start_time + duration_delta + + silence = { + "matchers": matchers, + "startsAt": start_time, + "endsAt": end_time, + "createdBy": user, + "comment": "Acknowledge alert from Matrix", + } + response = requests.post(f"{self.api_url}/silences", json=silence) + response.raise_for_status() + data = response.json() + return data["silenceID"] + + @staticmethod + def _find_alert(alerts: List[Dict], fingerprint: str) -> Dict: + for alert in alerts: + if alert["fingerprint"] == fingerprint: + return alert + raise AlertNotFoundError(f"Cannot find alert with fingerprint {fingerprint}") diff --git a/matrix_alertbot/bot_commands.py b/matrix_alertbot/bot_commands.py index 099bc79..4fd7fe3 100644 --- a/matrix_alertbot/bot_commands.py +++ b/matrix_alertbot/bot_commands.py @@ -1,15 +1,21 @@ +import logging + from nio import AsyncClient, MatrixRoom, RoomMessageText +from matrix_alertbot.alertmanager import AlertmanagerClient +from matrix_alertbot.cache import Cache from matrix_alertbot.chat_functions import react_to_event, send_text_to_room from matrix_alertbot.config import Config -from matrix_alertbot.storage import Storage + +logger = logging.getLogger(__name__) class Command: def __init__( self, client: AsyncClient, - store: Storage, + cache: Cache, + alertmanager: AlertmanagerClient, config: Config, command: str, room: MatrixRoom, @@ -20,7 +26,7 @@ class Command: Args: client: The client to communicate to matrix with. - store: Bot storage. + cache: Bot cache. config: Bot configuration parameters. @@ -31,7 +37,8 @@ class Command: event: The event describing the command. """ self.client = client - self.store = store + self.cache = cache + self.alertmanager = alertmanager self.config = config self.command = command self.room = room @@ -40,8 +47,8 @@ class Command: async def process(self) -> None: """Process the command""" - if self.command.startswith("echo"): - await self._echo() + if self.command.startswith("ack"): + await self._ack() elif self.command.startswith("react"): await self._react() elif self.command.startswith("help"): @@ -49,15 +56,46 @@ class Command: else: await self._unknown_command() - async def _echo(self) -> None: - """Echo back the command's arguments""" - response = " ".join(self.args) - await send_text_to_room(self.client, self.room.room_id, response) + async def _ack(self) -> None: + """Acknowledge an alert and silence it for a certain duration in Alertmanager""" + if len(self.args) > 0: + duration = " ".join(self.args) + else: + duration = "1d" + logger.debug( + f"Acknowledging alert with fingerprint {self.room.display_name} for a duration of {duration} | " + f"{self.room.user_name(self.event.sender)}: {self.event.body}" + ) + + source_content = self.event.source["content"] + try: + alert_event_id = source_content["m.relates_to"]["m.in_reply_to"]["event_id"] + except KeyError: + logger.debug("Unable to find the event ID of the alert") + return + logger.debug(f"Read alert fingerprint for event {alert_event_id} from cache") + alert_fingerprint = self.cache[alert_event_id] + logger.debug( + f"Create silence for alert with fingerprint {alert_fingerprint} for a duration of {duration}" + ) + silence_id = self.alertmanager.create_silence( + alert_fingerprint, duration, self.room.user_name(self.event.sender) + ) + await send_text_to_room( + self.client, + self.room.room_id, + f"Created silence {silence_id} for {duration}", + reply_to_event_id=alert_event_id, + ) async def _react(self) -> None: """Make the bot react to the command message""" # React with a start emoji reaction = "⭐" + logger.debug( + f"Reacting with {reaction} to room {self.room.display_name} | " + f"{self.room.user_name(self.event.sender)}: {self.event.body}" + ) await react_to_event( self.client, self.room.room_id, self.event.event_id, reaction ) @@ -70,6 +108,10 @@ class Command: async def _show_help(self) -> None: """Show the help text""" + logger.debug( + f"Displaying help to room {self.room.display_name} | " + f"{self.room.user_name(self.event.sender)}: {self.event.body}" + ) if not self.args: text = ( "Hello, I am a bot made with matrix-nio! Use `help commands` to view " @@ -88,6 +130,10 @@ class Command: await send_text_to_room(self.client, self.room.room_id, text) async def _unknown_command(self) -> None: + logger.debug( + f"Sending unknown command response to room {self.room.display_name} | " + f"{self.room.user_name(self.event.sender)}: {self.event.body}" + ) await send_text_to_room( self.client, self.room.room_id, diff --git a/matrix_alertbot/cache.py b/matrix_alertbot/cache.py new file mode 100644 index 0000000..8f625ff --- /dev/null +++ b/matrix_alertbot/cache.py @@ -0,0 +1,18 @@ +from typing import Any + +import diskcache + + +class Cache: + def __init__(self, directory: str, expire: int): + self.cache = diskcache.Cache(directory) + self.expire = expire + + def __getitem__(self, key: str) -> Any: + return self.cache[key] + + def __setitem__(self, key: str, value: Any) -> None: + self.cache.set(key, value, expire=self.expire) + + def __contains__(self, key: str) -> bool: + return key in self.cache diff --git a/matrix_alertbot/callbacks.py b/matrix_alertbot/callbacks.py index 960ba6d..57c8321 100644 --- a/matrix_alertbot/callbacks.py +++ b/matrix_alertbot/callbacks.py @@ -11,27 +11,41 @@ from nio import ( UnknownEvent, ) +from matrix_alertbot.alertmanager import AlertmanagerClient from matrix_alertbot.bot_commands import Command -from matrix_alertbot.chat_functions import make_pill, react_to_event, send_text_to_room +from matrix_alertbot.cache import Cache +from matrix_alertbot.chat_functions import ( + make_pill, + react_to_event, + send_text_to_room, + strip_fallback, +) from matrix_alertbot.config import Config -from matrix_alertbot.message_responses import Message -from matrix_alertbot.storage import Storage logger = logging.getLogger(__name__) class Callbacks: - def __init__(self, client: AsyncClient, store: Storage, config: Config): + def __init__( + self, + client: AsyncClient, + cache: Cache, + alertmanager: AlertmanagerClient, + config: Config, + ): """ Args: client: nio client used to interact with matrix. - store: Bot storage. + cache: Bot cache. + + alertmanager: Client used to interact with alertmanager. config: Bot configuration parameters. """ self.client = client - self.store = store + self.cache = cache + self.alertmanager = alertmanager self.config = config self.command_prefix = config.command_prefix @@ -44,12 +58,16 @@ class Callbacks: event: The event defining the message. """ # Extract the message text - msg = event.body + msg = strip_fallback(event.body) # Ignore messages from ourselves if event.sender == self.client.user: return + # Ignore messages from unauthorized room + if room.room_id != self.config.room: + return + logger.debug( f"Bot message received for room {room.display_name} | " f"{room.user_name(event.sender)}: {msg}" @@ -58,23 +76,18 @@ class Callbacks: # Process as message if in a public room without command prefix has_command_prefix = msg.startswith(self.command_prefix) - # room.is_group is often a DM, but not always. - # room.is_group does not allow room aliases - # room.member_count > 2 ... we assume a public room - # room.member_count <= 2 ... we assume a DM - if not has_command_prefix and room.member_count > 2: - # General message listener - message = Message(self.client, self.store, self.config, msg, room, event) - await message.process() + if not has_command_prefix: + logger.debug( + f"Message received without command prefix {self.command_prefix}: Aborting." + ) return - # Otherwise if this is in a 1-1 with the bot or features a command prefix, - # treat it as a command - if has_command_prefix: - # Remove the command prefix - msg = msg[len(self.command_prefix) :] + # Remove the command prefix + msg = msg[len(self.command_prefix) :] - command = Command(self.client, self.store, self.config, msg, room, event) + command = Command( + self.client, self.cache, self.alertmanager, self.config, msg, room, event + ) await command.process() async def invite(self, room: MatrixRoom, event: InviteMemberEvent) -> None: @@ -85,6 +98,10 @@ class Callbacks: event: The invite event. """ + # Ignore invites from unauthorized room + if room.room_id != self.config.room: + return + logger.debug(f"Got invite to {room.room_id} from {event.sender}.") # Attempt to join 3 times before giving up @@ -129,6 +146,14 @@ class Callbacks: reacted_to_id: The event ID that the reaction points to. """ + # Ignore reactions from unauthorized room + if room.room_id != self.config.room: + return + + # Ignore reactions from ourselves + if event.sender == self.client.user: + return + logger.debug(f"Got reaction to {room.room_id} from {event.sender}.") # Get the original event that was reacted to @@ -167,6 +192,10 @@ class Callbacks: event: The encrypted event that we were unable to decrypt. """ + # Ignore events from unauthorized room + if room.room_id != self.config.room: + return + logger.error( f"Failed to decrypt event '{event.event_id}' in room '{room.room_id}'!" f"\n\n" @@ -177,16 +206,6 @@ class Callbacks: f"commands a second time)." ) - red_x_and_lock_emoji = "❌ 🔐" - - # React to the undecryptable event with some emoji - await react_to_event( - self.client, - room.room_id, - event.event_id, - red_x_and_lock_emoji, - ) - async def unknown(self, room: MatrixRoom, event: UnknownEvent) -> None: """Callback for when an event with a type that is unknown to matrix-nio is received. Currently this is used for reaction events, which are not yet part of a released @@ -197,6 +216,10 @@ class Callbacks: event: The event itself. """ + # Ignore events from unauthorized room + if room.room_id != self.config.room: + return + if event.type == "m.reaction": # Get the ID of the event this was a reaction to relation_dict = event.source.get("content", {}).get("m.relates_to", {}) diff --git a/matrix_alertbot/chat_functions.py b/matrix_alertbot/chat_functions.py index ac77848..75aca05 100644 --- a/matrix_alertbot/chat_functions.py +++ b/matrix_alertbot/chat_functions.py @@ -1,7 +1,6 @@ import logging -from typing import Optional, Union, Dict +from typing import Optional, Union -from markdown import markdown from nio import ( AsyncClient, ErrorResponse, @@ -18,11 +17,11 @@ logger = logging.getLogger(__name__) async def send_text_to_room( client: AsyncClient, room_id: str, - message: str, + plaintext: str, + html: str = None, notice: bool = True, - markdown_convert: bool = True, reply_to_event_id: Optional[str] = None, -) -> None: +) -> RoomSendResponse: """Send text to a matrix room. Args: @@ -30,14 +29,13 @@ async def send_text_to_room( room_id: The ID of the room to send the message to. - message: The message content. + plaintext: The message content. + + html: The message content in HTML format. notice: Whether the message should be sent with an "m.notice" message type (will not ping users). - markdown_convert: Whether to convert the message content to markdown. - Defaults to true. - reply_to_event_id: Whether this message is a reply to another event. The event ID this is message is a reply to. @@ -50,17 +48,17 @@ async def send_text_to_room( content = { "msgtype": msgtype, "format": "org.matrix.custom.html", - "body": message, + "body": plaintext, } - if markdown_convert: - content["formatted_body"] = markdown(message) + if html is not None: + content["formatted_body"] = html if reply_to_event_id: content["m.relates_to"] = {"m.in_reply_to": {"event_id": reply_to_event_id}} try: - await client.room_send( + return await client.room_send( room_id, "m.room.message", content, @@ -129,28 +127,12 @@ async def react_to_event( ) -async def decryption_failure( - client: AsyncClient, room: MatrixRoom, event: MegolmEvent -) -> None: - """Callback for when an event fails to decrypt. Inform the user""" - logger.error( - f"Failed to decrypt event '{event.event_id}' in room '{room.room_id}'!" - f"\n\n" - f"Tip: try using a different device ID in your config file and restart." - f"\n\n" - f"If all else fails, delete your store directory and let the bot recreate " - f"it (your reminders will NOT be deleted, but the bot may respond to existing " - f"commands a second time)." - ) - - user_msg = ( - "Unable to decrypt this message. " - "Check whether you've chosen to only encrypt to trusted devices." - ) - - await send_text_to_room( - client, - room.room_id, - user_msg, - reply_to_event_id=event.event_id, - ) +def strip_fallback(content: str) -> str: + index = 0 + for line in content.splitlines(keepends=True): + if not line.startswith("> "): + break + if index == 0: + index += 1 + index += len(line) + return content[index:] diff --git a/matrix_alertbot/config.py b/matrix_alertbot/config.py index 916db90..5a5b513 100644 --- a/matrix_alertbot/config.py +++ b/matrix_alertbot/config.py @@ -4,6 +4,7 @@ import re import sys from typing import Any, List, Optional +import pytimeparse import yaml from matrix_alertbot.errors import ConfigError @@ -70,22 +71,13 @@ class Config: f"storage.store_path '{self.store_path}' is not a directory" ) - # Database setup - database_path = self._get_cfg(["storage", "database"], required=True) + # Cache setup + self.cache_dir = self._get_cfg(["cache", "directory"], required=True) + expire_time = self._get_cfg(["cache", "expire_time"], default="1w") + self.cache_expire_time = pytimeparse.parse(expire_time) - # Support both SQLite and Postgres backends - # Determine which one the user intends - sqlite_scheme = "sqlite://" - postgres_scheme = "postgres://" - if database_path.startswith(sqlite_scheme): - self.database = { - "type": "sqlite", - "connection_string": database_path[len(sqlite_scheme) :], - } - elif database_path.startswith(postgres_scheme): - self.database = {"type": "postgres", "connection_string": database_path} - else: - raise ConfigError("Invalid connection string for storage.database") + # Alertmanager client setup + self.alertmanager_url = self._get_cfg(["alertmanager", "url"], required=True) # Matrix bot account setup self.user_id = self._get_cfg(["matrix", "user_id"], required=True) @@ -101,8 +93,21 @@ class Config: self.device_name = self._get_cfg( ["matrix", "device_name"], default="nio-template" ) - self.homeserver_url = self._get_cfg(["matrix", "homeserver_url"], required=True) + self.homeserver_url = self._get_cfg(["matrix", "url"], required=True) + self.room = self._get_cfg(["matrix", "room"], required=True) + self.address = self._get_cfg(["webhook", "address"], required=False) + self.port = self._get_cfg(["webhook", "port"], required=False) + self.socket = self._get_cfg(["webhook", "socket"], required=False) + if ( + not (self.address or self.port or self.socket) + or (self.socket and self.address and self.port) + or (self.address and not self.port) + or (not self.address and self.port) + ): + raise ConfigError( + "Must supply either webhook.socket or both webhook.address and webhook.port" + ) self.command_prefix = self._get_cfg(["command_prefix"], default="!c") + " " def _get_cfg( diff --git a/matrix_alertbot/errors.py b/matrix_alertbot/errors.py index 7ec2414..8409082 100644 --- a/matrix_alertbot/errors.py +++ b/matrix_alertbot/errors.py @@ -8,5 +8,14 @@ class ConfigError(RuntimeError): msg: The message displayed to the user on error. """ - def __init__(self, msg: str): - super(ConfigError, self).__init__("%s" % (msg,)) + pass + + +class AlertNotFoundError(RuntimeError): + """An error encountered when an alert cannot be found in database. + + Args: + msg: The message displayed to the user on error. + """ + + pass diff --git a/matrix_alertbot/main.py b/matrix_alertbot/main.py index b66e7c1..a7d7d2d 100644 --- a/matrix_alertbot/main.py +++ b/matrix_alertbot/main.py @@ -2,9 +2,10 @@ import asyncio import logging import sys +from asyncio import TimeoutError from time import sleep -from aiohttp import ClientConnectionError, ServerDisconnectedError +from aiohttp import ClientConnectionError, ServerDisconnectedError, web from nio import ( AsyncClient, AsyncClientConfig, @@ -16,59 +17,16 @@ from nio import ( UnknownEvent, ) +from matrix_alertbot.alertmanager import AlertmanagerClient +from matrix_alertbot.cache import Cache from matrix_alertbot.callbacks import Callbacks from matrix_alertbot.config import Config -from matrix_alertbot.storage import Storage +from matrix_alertbot.webhook import Webhook logger = logging.getLogger(__name__) -async def main() -> bool: - """The first function that is run when starting the bot""" - - # Read user-configured options from a config file. - # A different config file path can be specified as the first command line argument - if len(sys.argv) > 1: - config_path = sys.argv[1] - else: - config_path = "config.yaml" - - # Read the parsed config file and create a Config object - config = Config(config_path) - - # Configure the database - store = Storage(config.database) - - # Configuration options for the AsyncClient - client_config = AsyncClientConfig( - max_limit_exceeded=0, - max_timeouts=0, - store_sync_tokens=True, - encryption_enabled=True, - ) - - # Initialize the matrix client - client = AsyncClient( - config.homeserver_url, - config.user_id, - device_id=config.device_id, - store_path=config.store_path, - config=client_config, - ) - - if config.user_token: - client.access_token = config.user_token - client.user_id = config.user_id - - # Set up event callbacks - callbacks = Callbacks(client, store, config) - client.add_event_callback(callbacks.message, (RoomMessageText,)) - client.add_event_callback( - callbacks.invite_event_filtered_callback, (InviteMemberEvent,) - ) - client.add_event_callback(callbacks.decryption_failure, (MegolmEvent,)) - client.add_event_callback(callbacks.unknown, (UnknownEvent,)) - +async def start_matrix_client(client: AsyncClient, config: Config) -> bool: # Keep trying to reconnect on failure (with some time in-between) while True: try: @@ -107,7 +65,7 @@ async def main() -> bool: logger.info(f"Logged in as {config.user_id}") await client.sync_forever(timeout=30000, full_state=True) - except (ClientConnectionError, ServerDisconnectedError): + except (ClientConnectionError, ServerDisconnectedError, TimeoutError): logger.warning("Unable to connect to homeserver, retrying in 15s...") # Sleep so we don't bombard the server with login requests @@ -117,5 +75,64 @@ async def main() -> bool: await client.close() -# Run the main function in an asyncio event loop -asyncio.get_event_loop().run_until_complete(main()) +def main() -> None: + """The first function that is run when starting the bot""" + + # Read user-configured options from a config file. + # A different config file path can be specified as the first command line argument + if len(sys.argv) > 1: + config_path = sys.argv[1] + else: + config_path = "config.yaml" + + # Read the parsed config file and create a Config object + config = Config(config_path) + + # Configure the cache + cache = Cache(config.cache_dir, config.cache_expire_time) + + # Configure Alertmanager client + alertmanager = AlertmanagerClient(config.alertmanager_url, cache) + + # Configuration options for the AsyncClient + client_config = AsyncClientConfig( + max_limit_exceeded=0, + max_timeouts=0, + store_sync_tokens=True, + encryption_enabled=True, + ) + + # Initialize the matrix client + client = AsyncClient( + config.homeserver_url, + config.user_id, + device_id=config.device_id, + store_path=config.store_path, + config=client_config, + ) + + if config.user_token: + client.access_token = config.user_token + client.user_id = config.user_id + + # Set up event callbacks + callbacks = Callbacks(client, cache, alertmanager, config) + client.add_event_callback(callbacks.message, (RoomMessageText,)) + client.add_event_callback( + callbacks.invite_event_filtered_callback, (InviteMemberEvent,) + ) + client.add_event_callback(callbacks.decryption_failure, (MegolmEvent,)) + client.add_event_callback(callbacks.unknown, (UnknownEvent,)) + + webhook_server = Webhook(client, cache, config) + + loop = asyncio.get_event_loop() + loop.create_task(webhook_server.start()) + loop.create_task(start_matrix_client(client, config)) + + try: + loop.run_forever() + except Exception as e: + logger.error(e) + finally: + loop.run_until_complete(webhook_server.close()) diff --git a/matrix_alertbot/message_responses.py b/matrix_alertbot/message_responses.py deleted file mode 100644 index ed0946f..0000000 --- a/matrix_alertbot/message_responses.py +++ /dev/null @@ -1,52 +0,0 @@ -import logging - -from nio import AsyncClient, MatrixRoom, RoomMessageText - -from matrix_alertbot.chat_functions import send_text_to_room -from matrix_alertbot.config import Config -from matrix_alertbot.storage import Storage - -logger = logging.getLogger(__name__) - - -class Message: - def __init__( - self, - client: AsyncClient, - store: Storage, - config: Config, - message_content: str, - room: MatrixRoom, - event: RoomMessageText, - ): - """Initialize a new Message - - Args: - client: nio client used to interact with matrix. - - store: Bot storage. - - config: Bot configuration parameters. - - message_content: The body of the message. - - room: The room the event came from. - - event: The event defining the message. - """ - self.client = client - self.store = store - self.config = config - self.message_content = message_content - self.room = room - self.event = event - - async def process(self) -> None: - """Process and possibly respond to the message""" - if self.message_content.lower() == "hello world": - await self._hello_world() - - async def _hello_world(self) -> None: - """Say hello""" - text = "Hello, world!" - await send_text_to_room(self.client, self.room.room_id, text) diff --git a/matrix_alertbot/storage.py b/matrix_alertbot/storage.py index 8759f8a..5d955c5 100644 --- a/matrix_alertbot/storage.py +++ b/matrix_alertbot/storage.py @@ -1,126 +1,70 @@ -import logging -from typing import Any, Dict +from __future__ import annotations -# The latest migration version of the database. -# -# Database migrations are applied starting from the number specified in the database's -# `migration_version` table + 1 (or from 0 if this table does not yet exist) up until -# the version specified here. -# -# When a migration is performed, the `migration_version` table should be incremented. -latest_migration_version = 0 +import logging +from typing import Dict logger = logging.getLogger(__name__) -class Storage: - def __init__(self, database_config: Dict[str, str]): - """Setup the database. +class Alert: + EMOJIS = {"critical": "🔥", "warning": "⚠️", "resolved": "🥦"} + COLORS = {"critical": "dc3545", "warning": "ffc107", "resolved": "33cc33"} - Runs an initial setup or migrations depending on whether a database file has already - been created. + def __init__( + self, + id: str, + url: str, + firing: bool = True, + labels: Dict[str, str] = None, + annotations: Dict[str, str] = None, + ): + self.id = id + self.url = url + self.firing = firing - Args: - database_config: a dictionary containing the following keys: - * type: A string, one of "sqlite" or "postgres". - * connection_string: A string, featuring a connection string that - be fed to each respective db library's `connect` method. - """ - self.conn = self._get_database_connection( - database_config["type"], database_config["connection_string"] - ) - self.cursor = self.conn.cursor() - self.db_type = database_config["type"] - - # Try to check the current migration version - migration_level = 0 - try: - self._execute("SELECT version FROM migration_version") - row = self.cursor.fetchone() - migration_level = row[0] - except Exception: - self._initial_setup() - finally: - if migration_level < latest_migration_version: - self._run_migrations(migration_level) - - logger.info(f"Database initialization of type '{self.db_type}' complete") - - def _get_database_connection( - self, database_type: str, connection_string: str - ) -> Any: - """Creates and returns a connection to the database""" - if database_type == "sqlite": - import sqlite3 - - # Initialize a connection to the database, with autocommit on - return sqlite3.connect(connection_string, isolation_level=None) - elif database_type == "postgres": - import psycopg2 - - conn = psycopg2.connect(connection_string) - - # Autocommit on - conn.set_isolation_level(0) - - return conn - - def _initial_setup(self) -> None: - """Initial setup of the database""" - logger.info("Performing initial database setup...") - - # Set up the migration_version table - self._execute( - """ - CREATE TABLE migration_version ( - version INTEGER PRIMARY KEY - ) - """ - ) - - # Initially set the migration version to 0 - self._execute( - """ - INSERT INTO migration_version ( - version - ) VALUES (?) - """, - (0,), - ) - - # Set up any other necessary database tables here - - logger.info("Database setup complete") - - def _run_migrations(self, current_migration_version: int) -> None: - """Execute database migrations. Migrates the database to the - `latest_migration_version`. - - Args: - current_migration_version: The migration version that the database is - currently at. - """ - logger.debug("Checking for necessary database migrations...") - - # if current_migration_version < 1: - # logger.info("Migrating the database from v0 to v1...") - # - # # Add new table, delete old ones, etc. - # - # # Update the stored migration version - # self._execute("UPDATE migration_version SET version = 1") - # - # logger.info("Database migrated to v1") - - def _execute(self, *args: Any) -> None: - """A wrapper around cursor.execute that transforms placeholder ?'s to %s for postgres. - - This allows for the support of queries that are compatible with both postgres and sqlite. - - Args: - args: Arguments passed to cursor.execute. - """ - if self.db_type == "postgres": - self.cursor.execute(args[0].replace("?", "%s"), *args[1:]) + if labels is None: + self.labels = {} else: - self.cursor.execute(*args) + self.labels = labels + + if annotations is None: + self.annotations = {} + else: + self.annotations = annotations + + if self.firing: + self.status = self.labels["severity"] + else: + self.status = "resolved" + + @staticmethod + def from_dict(data: Dict) -> Alert: + return Alert( + id=data["fingerprint"], + url=data["generatorURL"], + firing=data["status"] == "firing", + labels=data["labels"], + annotations=data["annotations"], + ) + + @property + def emoji(self) -> str: + return self.EMOJIS[self.status] + + @property + def color(self) -> str: + return self.COLORS[self.status] + + def plaintext(self) -> str: + alertname = self.labels["alertname"] + description = self.annotations["description"] + return f"[{self.emoji} {self.status.upper()}] {alertname}: {description}" + + def html(self) -> str: + alertname = self.labels["alertname"] + job = self.labels["job"] + description = self.annotations["description"] + return ( + f"[{self.emoji} {self.status.upper()}] " + f"{alertname} ({job})
{description}" + ) diff --git a/matrix_alertbot/webhook.py b/matrix_alertbot/webhook.py new file mode 100644 index 0000000..4131e9f --- /dev/null +++ b/matrix_alertbot/webhook.py @@ -0,0 +1,74 @@ +import logging +from typing import List + +from aiohttp import web, web_request +from nio import AsyncClient, SendRetryError +from matrix_alertbot.cache import Cache + +from matrix_alertbot.chat_functions import send_text_to_room +from matrix_alertbot.config import Config +from matrix_alertbot.storage import Alert + +logger = logging.getLogger(__name__) + +routes = web.RouteTableDef() + + +@routes.post("/alert") +async def create_alert(request: web_request.Request) -> web.Response: + data = await request.json() + logger.info(f"Received alert: {data}") + client = request.app["client"] + cache = request.app['cache'] + + plaintext = "" + html = "" + for i, alert in enumerate(data["alerts"]): + alert = Alert.from_dict(alert) + + if i != 0: + plaintext += "\n" + html += "
\n" + plaintext += alert.plaintext() + html += alert.html() + + try: + event = await send_text_to_room(client, request.app["room_id"], plaintext, html) + except SendRetryError as e: + logger.error(e) + return web.Response(status=500) + + cache[event.event_id] = tuple(alert["fingerprint"] for alert in data["alerts"]) + + return web.Response(status=200) + + +class Webhook: + def __init__(self, client: AsyncClient, cache: Cache, config: Config) -> None: + self.app = web.Application(logger=logger) + self.app["client"] = client + self.app["room_id"] = config.room + self.app["cache"] = cache + self.app.add_routes(routes) + self.runner = web.AppRunner(self.app) + + self.config = config + self.address = config.address + self.port = config.port + self.socket = config.socket + + async def start(self) -> None: + await self.runner.setup() + + site: web.BaseSite + if self.address and self.port: + site = web.TCPSite(self.runner, self.address, self.port) + logger.info(f"Listenning on {self.address}:{self.port}") + elif self.socket: + site = web.UnixSite(self.runner, self.socket) + logger.info(f"Listenning on unix://{self.socket}") + + await site.start() + + async def close(self) -> None: + await self.runner.cleanup() diff --git a/mypy.ini b/mypy.ini index 874a468..5ae56f9 100644 --- a/mypy.ini +++ b/mypy.ini @@ -2,3 +2,4 @@ ignore_missing_imports = True disallow_untyped_defs = True disallow_untyped_calls = True +plugins = sqlalchemy.ext.mypy.plugin diff --git a/tests/test_callbacks.py b/tests/test_callbacks.py index 79bccd2..4dcd815 100644 --- a/tests/test_callbacks.py +++ b/tests/test_callbacks.py @@ -3,8 +3,9 @@ from unittest.mock import Mock import nio +from matrix_alertbot.alertmanager import AlertmanagerClient +from matrix_alertbot.cache import Cache from matrix_alertbot.callbacks import Callbacks -from matrix_alertbot.storage import Storage from tests.utils import make_awaitable, run_coroutine @@ -15,13 +16,14 @@ class CallbacksTestCase(unittest.TestCase): self.fake_client = Mock(spec=nio.AsyncClient) self.fake_client.user = "@fake_user:example.com" - self.fake_storage = Mock(spec=Storage) + self.fake_cache = Mock(spec=Cache) + self.fake_alertmanager = Mock(spec=AlertmanagerClient) # We don't spec config, as it doesn't currently have well defined attributes self.fake_config = Mock() self.callbacks = Callbacks( - self.fake_client, self.fake_storage, self.fake_config + self.fake_client, self.fake_cache, self.fake_alertmanager, self.fake_config ) def test_invite(self) -> None: