import logging from typing import List, Optional import pytimeparse2 from diskcache import Cache from nio import AsyncClient, MatrixRoom from matrix_alertbot.alertmanager import AlertmanagerClient from matrix_alertbot.chat_functions import send_text_to_room from matrix_alertbot.config import Config from matrix_alertbot.errors import ( AlertmanagerError, AlertNotFoundError, SilenceNotFoundError, ) from matrix_alertbot.matcher import AlertMatcher, AlertRegexMatcher logger = logging.getLogger(__name__) class BaseCommand: def __init__( self, client: AsyncClient, cache: Cache, alertmanager: AlertmanagerClient, config: Config, cmd: str, room: MatrixRoom, sender: str, event_id: str, ) -> None: """A command made by a user. Args: client: The client to communicate with Matrix. cache: Bot cache. alertmanager: The client to communicate with Alertmanager. config: Bot configuration parameters. cmd: The command and arguments. room: The room the command was sent in. sender: The sender of the event event_id: The ID of the event describing the command. """ self.client = client self.cache = cache self.alertmanager = alertmanager self.config = config self.cmd = cmd self.args = cmd.split()[1:] self.room = room self.sender = sender self.event_id = event_id async def process(self) -> None: raise NotImplementedError class BaseAlertCommand(BaseCommand): def __init__( self, client: AsyncClient, cache: Cache, alertmanager: AlertmanagerClient, config: Config, cmd: str, room: MatrixRoom, sender: str, event_id: str, alert_event_id: str, ) -> None: super().__init__( client, cache, alertmanager, config, cmd, room, sender, event_id ) self.alert_event_id = alert_event_id class AckAlertCommand(BaseAlertCommand): async def process(self) -> None: """Acknowledge an alert and silence it for a certain duration in Alertmanager""" matchers: List[AlertMatcher] = [] durations = [] for arg in self.args: if "=~" in arg: label, regex = arg.split("=~") regex_matcher = AlertRegexMatcher(label, regex) matchers.append(regex_matcher) elif "=" in arg: label, value = arg.split("=") matcher = AlertMatcher(label, value) matchers.append(matcher) else: durations.append(arg) if len(durations) > 0: duration = " ".join(durations) else: duration = "1d" logger.debug( f"Receiving a command to create a silence for a duration of {duration}" ) duration_seconds = pytimeparse2.parse(duration) if duration_seconds is None: logger.error(f"Unable to create silence: Invalid duration '{duration}'") await send_text_to_room( self.client, self.room.room_id, f"I tried really hard, but I can't convert the duration '{duration}' to a number of seconds.", ) return logger.debug( f"Read alert fingerprints for alert event {self.alert_event_id} from cache" ) if self.alert_event_id not in self.cache: logger.error( f"Cannot find fingerprints for alert event {self.alert_event_id} in cache" ) return alert_fingerprints = self.cache[self.alert_event_id] logger.debug(f"Found {len(alert_fingerprints)} in cache") count_alert_not_found = 0 created_silences = [] for alert_fingerprint in alert_fingerprints: logger.debug( f"Create silence for alert with fingerprint {alert_fingerprint} for a duration of {duration}" ) try: silence_id = await self.alertmanager.create_silence( alert_fingerprint, duration_seconds, self.room.user_name(self.sender), matchers, ) created_silences.append(silence_id) except AlertNotFoundError as e: logger.warning(f"Unable to create silence: {e}") count_alert_not_found += 1 except AlertmanagerError as e: logger.exception(f"Unable to create silence: {e}", exc_info=e) self.cache.set(self.event_id, tuple(created_silences), expire=duration_seconds) if count_alert_not_found > 0: await send_text_to_room( self.client, self.room.room_id, f"Sorry, I couldn't find {count_alert_not_found} alerts, therefore I couldn't create their silence.", ) if len(created_silences) > 0: await send_text_to_room( self.client, self.room.room_id, f"Created {len(created_silences)} silences with a duration of {duration}.", ) class UnackAlertCommand(BaseAlertCommand): async def process(self) -> None: """Delete an alert's acknowledgement of an alert and remove corresponding silence in Alertmanager""" matchers: List[AlertMatcher] = [] for arg in self.args: if "=~" in arg: label, regex = arg.split("=~") regex_matcher = AlertRegexMatcher(label, regex) matchers.append(regex_matcher) elif "=" in arg: label, value = arg.split("=") matcher = AlertMatcher(label, value) matchers.append(matcher) logger.debug("Receiving a command to delete a silence") logger.debug( f"Read alert fingerprints for alert event {self.alert_event_id} from cache" ) if self.alert_event_id not in self.cache: logger.error( f"Cannot find fingerprints for event {self.alert_event_id} in cache" ) return alert_fingerprints = self.cache[self.alert_event_id] logger.debug(f"Found {len(alert_fingerprints)} in cache") count_alert_not_found = 0 count_removed_silences = 0 for alert_fingerprint in alert_fingerprints: logger.debug( f"Delete silence for alert with fingerprint {alert_fingerprint}" ) try: removed_silences = await self.alertmanager.delete_silences( alert_fingerprint, matchers ) count_removed_silences += len(removed_silences) except (AlertNotFoundError, SilenceNotFoundError) as e: logger.error(f"Unable to delete silence: {e}") count_alert_not_found += 1 except AlertmanagerError as e: logger.exception(f"Unable to delete silence: {e}", exc_info=e) if count_alert_not_found > 0: await send_text_to_room( self.client, self.room.room_id, f"Sorry, I couldn't find {count_alert_not_found} alerts, therefore I couldn't remove their silences.", ) if count_removed_silences > 0: await send_text_to_room( self.client, self.room.room_id, f"Removed {count_removed_silences} silences.", ) class HelpCommand(BaseCommand): async def process(self) -> None: """Show the help text""" logger.debug(f"Displaying help to room {self.room.display_name}") if not self.args: text = ( "Hello, I am a bot made with matrix-nio! Use `help commands` to view " "available commands." ) await send_text_to_room(self.client, self.room.room_id, text) return topic = self.args[0] if topic == "rules": text = "These are the rules!" elif topic == "commands": text = "Available commands: ..." else: text = "Unknown help topic!" await send_text_to_room(self.client, self.room.room_id, text) class UnknownCommand(BaseCommand): async def process(self) -> None: logger.debug( f"Sending unknown command response to room {self.room.display_name}" ) await send_text_to_room( self.client, self.room.room_id, f"Unknown command '{self.cmd}'. Try the 'help' command for more information.", ) class CommandFactory: @staticmethod def create( cmd: str, client: AsyncClient, cache: Cache, alertmanager: AlertmanagerClient, config: Config, room: MatrixRoom, sender: str, event_id: str, reacted_to_event_id: Optional[str] = None, ) -> BaseCommand: if cmd.startswith("ack"): if reacted_to_event_id is None: raise TypeError("Alert command must be in reply to an alert event.") return AckAlertCommand( client, cache, alertmanager, config, cmd, room, sender, event_id, reacted_to_event_id, ) elif cmd.startswith("unack") or cmd.startswith("nack"): if reacted_to_event_id is None: raise TypeError("Alert command must be in reply to an alert event.") return UnackAlertCommand( client, cache, alertmanager, config, cmd, room, sender, event_id, reacted_to_event_id, ) elif cmd.startswith("help"): return HelpCommand( client, cache, alertmanager, config, cmd, room, sender, event_id ) else: return UnknownCommand( client, cache, alertmanager, config, cmd, room, sender, event_id )