matrix-alertbot/matrix_alertbot/callback.py
2022-07-17 00:29:36 +02:00

308 lines
10 KiB
Python

import logging
from diskcache import Cache
from nio import (
AsyncClient,
InviteMemberEvent,
JoinError,
MatrixRoom,
MegolmEvent,
RedactionEvent,
RoomGetEventError,
RoomMessageText,
UnknownEvent,
)
from matrix_alertbot.alertmanager import AlertmanagerClient
from matrix_alertbot.chat_functions import strip_fallback
from matrix_alertbot.command import CommandFactory
from matrix_alertbot.config import Config
logger = logging.getLogger(__name__)
REACTION_DURATIONS = {"🤫": "12h", "😶": "1d", "🤐": "3d", "🙊": "5d", "🔇": "1w", "🔕": "3w"}
class Callbacks:
def __init__(
self,
client: AsyncClient,
alertmanager: AlertmanagerClient,
cache: Cache,
config: Config,
):
"""
Args:
client: nio client used to interact with matrix.
cache: Bot cache.
alertmanager: Client used to interact with alertmanager.
config: Bot configuration parameters.
"""
self.client = client
self.cache = cache
self.alertmanager = alertmanager
self.config = config
self.command_prefix = config.command_prefix
async def message(self, room: MatrixRoom, event: RoomMessageText) -> None:
"""Callback for when a message event is received
Args:
room: The room the event came from.
event: The event defining the message.
"""
# Extract the message text
msg = strip_fallback(event.body)
# Ignore messages from ourselves
if event.sender == self.client.user:
return
# Ignore messages from unauthorized room
if room.room_id != self.config.room_id:
return
logger.debug(
f"Bot message received for room {room.display_name} | "
f"{room.user_name(event.sender)}: {msg}"
)
# Process as message if in a public room without command prefix
has_command_prefix = msg.startswith(self.command_prefix)
if not has_command_prefix:
logger.debug(
f"Message received without command prefix {self.command_prefix}: Aborting."
)
return
source_content = event.source["content"]
alert_event_id = (
source_content.get("m.relates_to", {})
.get("m.in_reply_to", {})
.get("event_id")
)
if alert_event_id is None:
logger.warning("Unable to find the event ID of the alert")
# Remove the command prefix
cmd = msg[len(self.command_prefix) :]
try:
command = CommandFactory.create(
cmd,
self.client,
self.cache,
self.alertmanager,
self.config,
room,
event.sender,
event.event_id,
alert_event_id,
)
except TypeError as e:
logging.error(f"Unable to create the command '{cmd}': {e}")
return
await command.process()
async def invite(self, room: MatrixRoom, event: InviteMemberEvent) -> None:
"""Callback for when an invite is received. Join the room specified in the invite.
Args:
room: The room that we are invited to.
event: The invite event.
"""
# Ignore invites from unauthorized room
if room.room_id != self.config.room_id:
return
logger.debug(f"Got invite to {room.room_id} from {event.sender}.")
# Attempt to join 3 times before giving up
for attempt in range(3):
result = await self.client.join(room.room_id)
if type(result) == JoinError:
logger.error(
f"Error joining room {room.room_id} (attempt %d): %s",
attempt,
result.message,
)
else:
break
else:
logger.error("Unable to join room: %s", room.room_id)
# Successfully joined room
logger.info(f"Joined {room.room_id}")
async def invite_event_filtered_callback(
self, room: MatrixRoom, event: InviteMemberEvent
) -> None:
"""
Since the InviteMemberEvent is fired for every m.room.member state received
in a sync response's `rooms.invite` section, we will receive some that are
not actually our own invite event (such as the inviter's membership).
This makes sure we only call `callbacks.invite` with our own invite events.
"""
if event.state_key == self.client.user_id:
# This is our own membership (invite) event
await self.invite(room, event)
async def _reaction(
self, room: MatrixRoom, event: UnknownEvent, alert_event_id: str
) -> None:
"""A reaction was sent to one of our messages. Let's send a reply acknowledging it.
Args:
room: The room the reaction was sent in.
event: The reaction event.
reacted_to_id: The event ID that the reaction points to.
"""
# Ignore reactions from unauthorized room
if room.room_id != self.config.room_id:
return
# Ignore reactions from ourselves
if event.sender == self.client.user:
return
reaction = event.source.get("content", {}).get("m.relates_to", {}).get("key")
logger.debug(f"Got reaction {reaction} to {room.room_id} from {event.sender}.")
if reaction not in REACTION_DURATIONS:
logger.warning(f"Uknown duration reaction {reaction}")
return
duration = REACTION_DURATIONS[reaction]
# Get the original event that was reacted to
event_response = await self.client.room_get_event(room.room_id, alert_event_id)
if isinstance(event_response, RoomGetEventError):
logger.warning(
f"Error getting event that was reacted to ({alert_event_id})"
)
return
reacted_to_event = event_response.event
# Only acknowledge reactions to events that we sent
if reacted_to_event.sender != self.config.user_id:
return
self.cache.set(
event.event_id,
reacted_to_event.event_id,
expire=self.config.cache_expire_time,
)
# Send a message acknowledging the reaction
cmd = f"ack {duration}"
try:
command = CommandFactory.create(
cmd,
self.client,
self.cache,
self.alertmanager,
self.config,
room,
event.sender,
event.event_id,
alert_event_id,
)
except TypeError as e:
logging.error(f"Unable to create the command '{cmd}': {e}")
return
await command.process()
async def redaction(self, room: MatrixRoom, event: RedactionEvent) -> None:
# Ignore events from unauthorized room
if room.room_id != self.config.room_id:
return
# Ignore redactions from ourselves
if event.sender == self.config.user_id:
return
logger.debug(
f"Read alert event ID for redacted event {event.redacts} from cache"
)
if event.redacts not in self.cache:
logger.warning(
f"Unable to remove silences from event {event.redacts}: Redacted event is not in cache"
)
return
alert_event_id: str = self.cache[event.redacts]
try:
command = CommandFactory.create(
"unack",
self.client,
self.cache,
self.alertmanager,
self.config,
room,
event.sender,
event.redacts,
alert_event_id,
)
except TypeError as e:
logging.error(f"Unable to create the command 'unack': {e}")
return
await command.process()
async def decryption_failure(self, room: MatrixRoom, event: MegolmEvent) -> None:
"""Callback for when an event fails to decrypt. Inform the user.
Args:
room: The room that the event that we were unable to decrypt is in.
event: The encrypted event that we were unable to decrypt.
"""
# Ignore events from unauthorized room
if room.room_id != self.config.room_id:
return
logger.error(
f"Failed to decrypt event '{event.event_id}' in room '{room.room_id}'!"
f"\n\n"
f"Tip: try using a different device ID in your config file and restart."
f"\n\n"
f"If all else fails, delete your store directory and let the bot recreate "
f"it (your reminders will NOT be deleted, but the bot may respond to existing "
f"commands a second time)."
)
async def unknown(self, room: MatrixRoom, event: UnknownEvent) -> None:
"""Callback for when an event with a type that is unknown to matrix-nio is received.
Currently this is used for reaction events, which are not yet part of a released
matrix spec (and are thus unknown to nio).
Args:
room: The room the reaction was sent in.
event: The event itself.
"""
# Ignore events from unauthorized room
if room.room_id != self.config.room_id:
return
if event.type == "m.reaction":
# Get the ID of the event this was a reaction to
relation_dict = event.source.get("content", {}).get("m.relates_to", {})
reacted_to_id = relation_dict.get("event_id")
if reacted_to_id and relation_dict.get("rel_type") == "m.annotation":
await self._reaction(room, event, reacted_to_id)
return
logger.debug(
f"Got unknown event with type to {event.type} from {event.sender} in {room.room_id}."
)