matrix-alertbot/matrix_alertbot/command.py
2022-07-12 00:29:55 +02:00

230 lines
8 KiB
Python

import logging
from typing import List
import pytimeparse2
from diskcache import Cache
from nio import AsyncClient, MatrixRoom
from matrix_alertbot.alertmanager import AlertmanagerClient
from matrix_alertbot.chat_functions import send_text_to_room
from matrix_alertbot.config import Config
from matrix_alertbot.errors import (
AlertmanagerError,
AlertNotFoundError,
SilenceNotFoundError,
)
from matrix_alertbot.matcher import AlertMatcher, AlertRegexMatcher
logger = logging.getLogger(__name__)
class Command:
def __init__(
self,
client: AsyncClient,
cache: Cache,
alertmanager: AlertmanagerClient,
config: Config,
command: str,
room: MatrixRoom,
sender: str,
event_id: str,
):
"""A command made by a user.
Args:
client: The client to communicate to matrix with.
cache: Bot cache.
config: Bot configuration parameters.
command: The command and arguments.
room: The room the command was sent in.
event: The event describing the command.
"""
self.client = client
self.cache = cache
self.alertmanager = alertmanager
self.config = config
self.command = command
self.room = room
self.sender = sender
self.event_id = event_id
self.args = self.command.split()[1:]
async def process(self) -> None:
"""Process the command"""
if self.command.startswith("ack"):
await self._ack()
elif self.command.startswith("unack") or self.command.startswith("nack"):
await self._unack()
elif self.command.startswith("help"):
await self._show_help()
else:
await self._unknown_command()
async def _ack(self) -> None:
"""Acknowledge an alert and silence it for a certain duration in Alertmanager"""
matchers: List[AlertMatcher] = []
durations = []
for arg in self.args:
if "=~" in arg:
label, regex = arg.split("=~")
regex_matcher = AlertRegexMatcher(label, regex)
matchers.append(regex_matcher)
elif "=" in arg:
label, value = arg.split("=")
matcher = AlertMatcher(label, value)
matchers.append(matcher)
else:
durations.append(arg)
if len(durations) > 0:
duration = " ".join(durations)
else:
duration = "1d"
logger.debug(
f"Receiving a command to create a silence for a duration of {duration}"
)
duration_seconds = pytimeparse2.parse(duration)
if duration_seconds is None:
logger.error(f"Unable to create silence: Invalid duration '{duration}'")
await send_text_to_room(
self.client,
self.room.room_id,
f"I tried really hard, but I can't convert the duration '{duration}' to a number of seconds.",
)
return
logger.debug(f"Read alert fingerprints for event {self.event_id} from cache")
if self.event_id not in self.cache:
logger.error(f"Cannot find fingerprints for event {self.event_id} in cache")
return
alert_fingerprints = self.cache[self.event_id]
logger.debug(f"Found {len(alert_fingerprints)} in cache")
count_alert_not_found = 0
count_created_silences = 0
for alert_fingerprint in alert_fingerprints:
logger.debug(
f"Create silence for alert with fingerprint {alert_fingerprint} for a duration of {duration}"
)
try:
await self.alertmanager.create_silence(
alert_fingerprint,
duration_seconds,
self.room.user_name(self.sender),
matchers,
)
count_created_silences += 1
except AlertNotFoundError as e:
logger.warning(f"Unable to create silence: {e}")
count_alert_not_found += 1
except AlertmanagerError as e:
logger.exception(f"Unable to create silence: {e}", exc_info=e)
if count_alert_not_found > 0:
await send_text_to_room(
self.client,
self.room.room_id,
f"Sorry, I couldn't find {count_alert_not_found} alerts, therefore I couldn't create their silence.",
)
if count_created_silences > 0:
await send_text_to_room(
self.client,
self.room.room_id,
f"Created {count_created_silences} silences with a duration of {duration}.",
)
async def _unack(self) -> None:
"""Delete an alert's acknowledgement of an alert and remove corresponding silence in Alertmanager"""
matchers: List[AlertMatcher] = []
for arg in self.args:
if "=~" in arg:
label, regex = arg.split("=~")
regex_matcher = AlertRegexMatcher(label, regex)
matchers.append(regex_matcher)
elif "=" in arg:
label, value = arg.split("=")
matcher = AlertMatcher(label, value)
matchers.append(matcher)
logger.debug("Receiving a command to delete a silence")
logger.debug(f"Read alert fingerprints for event {self.event_id} from cache")
if self.event_id not in self.cache:
logger.error(f"Cannot find fingerprints for event {self.event_id} in cache")
return
alert_fingerprints = self.cache[self.event_id]
logger.debug(f"Found {len(alert_fingerprints)} in cache")
count_alert_not_found = 0
count_removed_silences = 0
for alert_fingerprint in alert_fingerprints:
logger.debug(
f"Delete silence for alert with fingerprint {alert_fingerprint}"
)
try:
removed_silences = await self.alertmanager.delete_silences(
alert_fingerprint, matchers
)
count_removed_silences += len(removed_silences)
except (AlertNotFoundError, SilenceNotFoundError) as e:
logger.error(f"Unable to delete silence: {e}")
count_alert_not_found += 1
except AlertmanagerError as e:
logger.exception(f"Unable to delete silence: {e}", exc_info=e)
if count_alert_not_found > 0:
await send_text_to_room(
self.client,
self.room.room_id,
f"Sorry, I couldn't find {count_alert_not_found} alerts, therefore I couldn't remove their silences.",
)
if count_removed_silences > 0:
await send_text_to_room(
self.client,
self.room.room_id,
f"Removed {count_removed_silences} silences.",
)
async def _show_help(self) -> None:
"""Show the help text"""
logger.debug(f"Displaying help to room {self.room.display_name}")
if not self.args:
text = (
"Hello, I am a bot made with matrix-nio! Use `help commands` to view "
"available commands."
)
await send_text_to_room(self.client, self.room.room_id, text)
return
topic = self.args[0]
if topic == "rules":
text = "These are the rules!"
elif topic == "commands":
text = "Available commands: ..."
else:
text = "Unknown help topic!"
await send_text_to_room(self.client, self.room.room_id, text)
async def _unknown_command(self) -> None:
logger.debug(
f"Sending unknown command response to room {self.room.display_name}"
)
await send_text_to_room(
self.client,
self.room.room_id,
f"Unknown command '{self.command}'. Try the 'help' command for more information.",
)