#!/usr/bin/env python3 import asyncio import logging import sys from asyncio import TimeoutError from time import sleep import aiotools from aiohttp import ClientConnectionError, ServerDisconnectedError from diskcache import Cache from nio import ( AsyncClient, AsyncClientConfig, InviteMemberEvent, LocalProtocolError, LoginError, MegolmEvent, RoomMessageText, UnknownEvent, ) from matrix_alertbot.alertmanager import AlertmanagerClient from matrix_alertbot.callback import Callbacks from matrix_alertbot.config import Config from matrix_alertbot.webhook import Webhook logger = logging.getLogger(__name__) def create_matrix_client(config: Config) -> AsyncClient: # Configuration options for the AsyncClient client_config = AsyncClientConfig( max_limit_exceeded=0, max_timeouts=0, store_sync_tokens=True, encryption_enabled=True, ) # Initialize the matrix client client = AsyncClient( config.homeserver_url, config.user_id, device_id=config.device_id, store_path=config.store_path, config=client_config, ) if config.user_token: client.access_token = config.user_token client.user_id = config.user_id return client async def start_matrix_client(cache: Cache, config: Config) -> bool: async with aiotools.closing_async(create_matrix_client(config)) as client: # Configure Alertmanager client async with AlertmanagerClient(config.alertmanager_url, cache) as alertmanager: # Set up event callbacks callbacks = Callbacks(client, alertmanager, cache, config) client.add_event_callback(callbacks.message, (RoomMessageText,)) client.add_event_callback( callbacks.invite_event_filtered_callback, (InviteMemberEvent,) ) client.add_event_callback(callbacks.decryption_failure, (MegolmEvent,)) client.add_event_callback(callbacks.unknown, (UnknownEvent,)) # Keep trying to reconnect on failure (with some time in-between) while True: try: if config.user_token: # Use token to log in client.load_store() # Sync encryption keys with the server if client.should_upload_keys: await client.keys_upload() else: # Try to login with the configured username/password try: login_response = await client.login( password=config.user_password, device_name=config.device_name, ) # Check if login failed if type(login_response) == LoginError: logger.error( "Failed to login: %s", login_response.message ) return False except LocalProtocolError as e: # There's an edge case here where the user hasn't installed the correct C # dependencies. In that case, a LocalProtocolError is raised on login. logger.fatal( "Failed to login. Have you installed the correct dependencies? " "https://github.com/poljar/matrix-nio#installation " "Error: %s", e, ) return False # Login succeeded! logger.info(f"Logged in as {config.user_id}") await client.sync_forever(timeout=30000, full_state=True) except (ClientConnectionError, ServerDisconnectedError, TimeoutError): logger.warning( "Unable to connect to homeserver, retrying in 15s..." ) # Sleep so we don't bombard the server with login requests sleep(15) async def start_webhook_server(cache: Cache, config: Config) -> None: async with aiotools.closing_async(create_matrix_client(config)) as client: async with Webhook(client, cache, config) as webhook_server: await webhook_server.start() def main() -> None: """The first function that is run when starting the bot""" # Read user-configured options from a config file. # A different config file path can be specified as the first command line argument if len(sys.argv) > 1: config_path = sys.argv[1] else: config_path = "config.yaml" # Read the parsed config file and create a Config object config = Config(config_path) # Configure the cache cache = Cache(config.cache_dir) loop = asyncio.get_event_loop() loop.create_task(start_webhook_server(cache, config)) loop.create_task(start_matrix_client(cache, config)) try: loop.run_forever() except Exception as e: logger.error(e)