190 lines
6.5 KiB
Python
190 lines
6.5 KiB
Python
import logging
|
|
from typing import List
|
|
|
|
from diskcache import Cache
|
|
from nio import AsyncClient, MatrixRoom
|
|
|
|
from matrix_alertbot.alertmanager import AlertmanagerClient
|
|
from matrix_alertbot.chat_functions import send_text_to_room
|
|
from matrix_alertbot.config import Config
|
|
from matrix_alertbot.errors import AlertmanagerError
|
|
from matrix_alertbot.matcher import AlertMatcher, AlertRegexMatcher
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class Command:
|
|
def __init__(
|
|
self,
|
|
client: AsyncClient,
|
|
cache: Cache,
|
|
alertmanager: AlertmanagerClient,
|
|
config: Config,
|
|
command: str,
|
|
room: MatrixRoom,
|
|
sender: str,
|
|
event_id: str,
|
|
):
|
|
"""A command made by a user.
|
|
|
|
Args:
|
|
client: The client to communicate to matrix with.
|
|
|
|
cache: Bot cache.
|
|
|
|
config: Bot configuration parameters.
|
|
|
|
command: The command and arguments.
|
|
|
|
room: The room the command was sent in.
|
|
|
|
event: The event describing the command.
|
|
"""
|
|
self.client = client
|
|
self.cache = cache
|
|
self.alertmanager = alertmanager
|
|
self.config = config
|
|
self.command = command
|
|
self.room = room
|
|
self.sender = sender
|
|
self.event_id = event_id
|
|
self.args = self.command.split()[1:]
|
|
|
|
async def process(self) -> None:
|
|
"""Process the command"""
|
|
if self.command.startswith("ack"):
|
|
await self._ack()
|
|
elif self.command.startswith("unack") or self.command.startswith("nack"):
|
|
await self._unack()
|
|
elif self.command.startswith("help"):
|
|
await self._show_help()
|
|
else:
|
|
await self._unknown_command()
|
|
|
|
async def _ack(self) -> None:
|
|
"""Acknowledge an alert and silence it for a certain duration in Alertmanager"""
|
|
matchers: List[AlertMatcher] = []
|
|
durations = []
|
|
for arg in self.args:
|
|
if "=~" in arg:
|
|
label, regex = arg.split("=~")
|
|
regex_matcher = AlertRegexMatcher(label, regex)
|
|
matchers.append(regex_matcher)
|
|
elif "=" in arg:
|
|
label, value = arg.split("=")
|
|
matcher = AlertMatcher(label, value)
|
|
matchers.append(matcher)
|
|
else:
|
|
durations.append(arg)
|
|
|
|
if len(durations) > 0:
|
|
duration = " ".join(durations)
|
|
else:
|
|
duration = "1d"
|
|
|
|
logger.debug(
|
|
f"Receiving a command to create a silence for a duration of {duration}"
|
|
)
|
|
|
|
logger.debug(f"Read alert fingerprints for event {self.event_id} from cache")
|
|
|
|
if self.event_id not in self.cache:
|
|
logger.error(f"Cannot find fingerprints for event {self.event_id} in cache")
|
|
return
|
|
|
|
alert_fingerprints = self.cache[self.event_id]
|
|
logger.debug(f"Found {len(alert_fingerprints)} in cache")
|
|
|
|
count_created_silences = 0
|
|
for alert_fingerprint in alert_fingerprints:
|
|
logger.debug(
|
|
f"Create silence for alert with fingerprint {alert_fingerprint} for a duration of {duration}"
|
|
)
|
|
try:
|
|
await self.alertmanager.create_silence(
|
|
alert_fingerprint,
|
|
duration,
|
|
self.room.user_name(self.sender),
|
|
matchers,
|
|
)
|
|
count_created_silences += 1
|
|
except AlertmanagerError as e:
|
|
logger.exception(f"Unable to create silence: {e}", exc_info=e)
|
|
|
|
await send_text_to_room(
|
|
self.client,
|
|
self.room.room_id,
|
|
f"Created {count_created_silences} silences with a duration of {duration}.",
|
|
)
|
|
|
|
async def _unack(self) -> None:
|
|
"""Delete an alert's acknowledgement of an alert and remove corresponding silence in Alertmanager"""
|
|
matchers: List[AlertMatcher] = []
|
|
for arg in self.args:
|
|
if "=~" in arg:
|
|
label, regex = arg.split("=~")
|
|
regex_matcher = AlertRegexMatcher(label, regex)
|
|
matchers.append(regex_matcher)
|
|
elif "=" in arg:
|
|
label, value = arg.split("=")
|
|
matcher = AlertMatcher(label, value)
|
|
matchers.append(matcher)
|
|
|
|
logger.debug("Receiving a command to delete a silence")
|
|
logger.debug(f"Read alert fingerprints for event {self.event_id} from cache")
|
|
|
|
if self.event_id not in self.cache:
|
|
logger.error(f"Cannot find fingerprints for event {self.event_id} in cache")
|
|
return
|
|
|
|
alert_fingerprints = self.cache[self.event_id]
|
|
logger.debug(f"Found {len(alert_fingerprints)} in cache")
|
|
|
|
count_removed_silences = 0
|
|
for alert_fingerprint in alert_fingerprints:
|
|
logger.debug(
|
|
f"Delete silence for alert with fingerprint {alert_fingerprint}"
|
|
)
|
|
try:
|
|
removed_silences = await self.alertmanager.delete_silences(
|
|
alert_fingerprint, matchers
|
|
)
|
|
count_removed_silences += len(removed_silences)
|
|
except AlertmanagerError as e:
|
|
logger.exception(f"Unable to delete silence: {e}", exc_info=e)
|
|
|
|
await send_text_to_room(
|
|
self.client,
|
|
self.room.room_id,
|
|
f"Removed {count_removed_silences} silences.",
|
|
)
|
|
|
|
async def _show_help(self) -> None:
|
|
"""Show the help text"""
|
|
logger.debug(f"Displaying help to room {self.room.display_name}")
|
|
if not self.args:
|
|
text = (
|
|
"Hello, I am a bot made with matrix-nio! Use `help commands` to view "
|
|
"available commands."
|
|
)
|
|
await send_text_to_room(self.client, self.room.room_id, text)
|
|
return
|
|
|
|
topic = self.args[0]
|
|
if topic == "rules":
|
|
text = "These are the rules!"
|
|
elif topic == "commands":
|
|
text = "Available commands: ..."
|
|
else:
|
|
text = "Unknown help topic!"
|
|
await send_text_to_room(self.client, self.room.room_id, text)
|
|
|
|
async def _unknown_command(self) -> None:
|
|
logger.debug(
|
|
f"Sending unknown command response to room {self.room.display_name}"
|
|
)
|
|
await send_text_to_room(
|
|
self.client,
|
|
self.room.room_id,
|
|
f"Unknown command '{self.command}'. Try the 'help' command for more information.",
|
|
)
|