matrix-alertbot/matrix_alertbot/matrix.py
2024-01-22 11:35:13 +01:00

252 lines
9.4 KiB
Python

from __future__ import annotations
import asyncio
import json
import logging
import os
import random
from typing import Dict, List, Optional, Tuple
from aiohttp import ClientConnectionError, ServerDisconnectedError
from diskcache import Cache
from nio.client import AsyncClient, AsyncClientConfig
from nio.events import (
InviteMemberEvent,
KeyVerificationCancel,
KeyVerificationKey,
KeyVerificationMac,
KeyVerificationStart,
MegolmEvent,
ReactionEvent,
RedactionEvent,
RoomMessageText,
RoomMessageUnknown,
)
from nio.exceptions import LocalProtocolError
from nio.responses import LoginError, WhoamiError
import matrix_alertbot.callback
from matrix_alertbot.alertmanager import AlertmanagerClient
from matrix_alertbot.config import AccountConfig, Config
logger = logging.getLogger(__name__)
class MatrixClientPool:
def __init__(
self, alertmanager_client: AlertmanagerClient, cache: Cache, config: Config
) -> None:
self._lock = asyncio.Lock()
self._matrix_clients: Dict[AccountConfig, AsyncClient] = {}
self._accounts: List[AccountConfig] = []
self._accounts = config.accounts
for account in self._accounts:
matrix_client = self._create_matrix_client(
account, alertmanager_client, cache, config
)
self._matrix_clients[account] = matrix_client
self.account = next(iter(self._accounts))
self.matrix_client = self._matrix_clients[self.account]
async def switch_active_client(
self,
) -> Optional[Tuple[AsyncClient, AccountConfig]]:
async with self._lock:
for account in random.sample(self._accounts, len(self._accounts)):
if account is self.account:
continue
matrix_client = self._matrix_clients[account]
try:
whoami = await matrix_client.whoami()
logged_in = not isinstance(whoami, WhoamiError)
except Exception:
logged_in = False
if logged_in:
self.account = account
self.matrix_client = matrix_client
logger.warning(
f"Bot {self.account.id} | Matrix client for homeserver {self.account.homeserver_url} selected as new leader."
)
return matrix_client, account
if self.matrix_client.logged_in:
logger.warning(
f"Bot {self.account.id} | No active Matrix client available, keeping Matrix client for {self.account.homeserver_url} as the leader."
)
else:
logger.error(
f"Bot {self.account.id} | No active Matrix client connected."
)
return None
async def close(self) -> None:
for matrix_client in self._matrix_clients.values():
await matrix_client.close()
def _create_matrix_client(
self,
account: AccountConfig,
alertmanager_client: AlertmanagerClient,
cache: Cache,
config: Config,
) -> AsyncClient:
# Configuration options for the AsyncClient
try:
matrix_client_config = AsyncClientConfig(
max_limit_exceeded=5,
max_timeouts=3,
store_sync_tokens=True,
encryption_enabled=True,
)
except ImportWarning as e:
logger.warning(e)
matrix_client_config = AsyncClientConfig(
max_limit_exceeded=5,
max_timeouts=3,
store_sync_tokens=True,
encryption_enabled=False,
)
# Load credentials from a previous session
if os.path.exists(account.token_file):
with open(account.token_file, "r") as ifd:
credentials = json.load(ifd)
account.token = credentials["access_token"]
account.device_id = credentials["device_id"]
# Initialize the matrix client based on stored credentials
matrix_client = AsyncClient(
account.homeserver_url,
account.id,
device_id=account.device_id,
store_path=config.store_dir,
config=matrix_client_config,
)
# Set up event callbacks
callbacks = matrix_alertbot.callback.Callbacks(
matrix_client, alertmanager_client, cache, config, self
)
matrix_client.add_event_callback(callbacks.message, (RoomMessageText,))
matrix_client.add_event_callback(
callbacks.invite_event_filtered_callback, (InviteMemberEvent,)
)
# matrix_client.add_event_callback(callbacks.debug, (Event,))
matrix_client.add_event_callback(callbacks.decryption_failure, (MegolmEvent,))
matrix_client.add_event_callback(callbacks.reaction, (ReactionEvent,))
matrix_client.add_event_callback(callbacks.redaction, (RedactionEvent,))
matrix_client.add_event_callback(
callbacks.unknown_message, (RoomMessageUnknown,)
)
matrix_client.add_to_device_callback(
callbacks.key_verification_start, (KeyVerificationStart,)
)
matrix_client.add_to_device_callback(
callbacks.key_verification_cancel, (KeyVerificationCancel,)
)
matrix_client.add_to_device_callback(
callbacks.key_verification_confirm, (KeyVerificationKey,)
)
matrix_client.add_to_device_callback(
callbacks.key_verification_end, (KeyVerificationMac,)
)
return matrix_client
async def start(
self,
account: AccountConfig,
config: Config,
):
matrix_client = self._matrix_clients[account]
# Keep trying to reconnect on failure (with some time in-between)
# We switch homeserver after some retries
while True:
try:
if account.device_id and account.token:
matrix_client.restore_login(
user_id=account.id,
device_id=account.device_id,
access_token=account.token,
)
# Sync encryption keys with the server
if matrix_client.should_upload_keys:
await matrix_client.keys_upload()
else:
# Try to login with the configured username/password
try:
login_response = await matrix_client.login(
password=account.password,
device_name=config.device_name,
)
# Check if login failed
if type(login_response) == LoginError:
logger.error(
f"Bot {account.id} | Failed to login: {login_response.message}"
)
return False
except LocalProtocolError as e:
# There's an edge case here where the user hasn't installed the correct C
# dependencies. In that case, a LocalProtocolError is raised on login.
logger.fatal(
f"Bot {account.id} | Failed to login. Have you installed the correct dependencies? "
"https://github.com/poljar/matrix-nio#installation "
"Error: %s",
e,
)
return False
if isinstance(login_response, LoginError):
logger.fatal(
f"Bot {account.id} | Failed to login: {login_response.message}"
)
return False
# Save user's access token and device ID
# See https://stackoverflow.com/a/45368120
account_token_fd = os.open(
account.token_file,
flags=os.O_CREAT | os.O_WRONLY | os.O_TRUNC,
mode=0o640,
)
with os.fdopen(account_token_fd, "w") as ofd:
json.dump(
{
"device_id": login_response.device_id,
"access_token": login_response.access_token,
},
ofd,
)
# Login succeeded!
logger.info(f"Bot {account.id} | Logged in.")
await matrix_client.sync_forever(timeout=30000, full_state=True)
except (ClientConnectionError, ServerDisconnectedError, TimeoutError):
await matrix_client.close()
logger.warning(
f"Bot {account.id} | Matrix client disconnected, retrying in 15s..."
)
if len(self._accounts) > 1 and self.matrix_client is matrix_client:
logger.warning(
f"Bot {account.id} | Selecting another Matrix client as leader..."
)
await self.switch_active_client()
# Sleep so we don't bombard the server with login requests
await asyncio.sleep(15)
finally:
await matrix_client.close()