From 395fda40ec13979c9c76b6aa703f1530e483fe05 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Sat, 12 Oct 2019 14:20:18 +0100 Subject: [PATCH] Separate the storage layer into individual classes --- README.md | 15 ++++++++++++--- main.py | 12 +++++++----- storage.py | 30 +----------------------------- sync_token.py | 32 ++++++++++++++++++++++++++++++++ 4 files changed, 52 insertions(+), 37 deletions(-) create mode 100644 sync_token.py diff --git a/README.md b/README.md index 193dcbc..12b2ab1 100644 --- a/README.md +++ b/README.md @@ -44,9 +44,18 @@ to put or retrieve data from it. Table definitions should be specified in `_run_migrations`. There's currently no defined method for how migrations should work though. -The `sync_token` table and `get_sync_token`, `save_sync_tokens` should be left -in tact so that the bot can save its progress when syncing events from the -homeserver. +The `sync_token` table should be left in tact so that the bot can save its +progress when syncing events from the homeserver. + +### `sync_token.py` + +A simple class that can load and save a sync token to/from the database. + +A `SyncToken` is an instance of a sync token, which is simply a string +retrieved from a matrix homeserver when querying the `/sync` endpoint (which +clients use to retrieve new events). It is given to the next call of the +`/sync` endpoint in order to specify the starting point in the event timeline +you would like to receive messages from. ### `callbacks.py` diff --git a/main.py b/main.py index c4aef8b..b3f957b 100644 --- a/main.py +++ b/main.py @@ -12,6 +12,7 @@ from nio import ( from callbacks import Callbacks from config import Config from storage import Storage +from sync_token import SyncToken logger = logging.getLogger(__name__) @@ -45,22 +46,23 @@ async def main(): client.add_event_callback(callbacks.message, (RoomMessageText,)) client.add_event_callback(callbacks.invite, (InviteEvent,)) - # Retrieve the last sync token if it exists - token = store.get_sync_token() + # Create a new sync token, attempting to load one from the database if it has one already + sync_token = SyncToken(store) # Sync loop while True: # Sync with the server - sync_response = await client.sync(timeout=30000, full_state=True, since=token) + sync_response = await client.sync(timeout=30000, full_state=True, + since=sync_token.token) # Check if the sync had an error if type(sync_response) == SyncError: logger.warning("Error in client sync: %s", sync_response.message) continue - # Save the latest sync token + # Save the latest sync token to the database token = sync_response.next_batch if token: - store.save_sync_token(token) + sync_token.update(token) asyncio.get_event_loop().run_until_complete(main()) diff --git a/storage.py b/storage.py index 9f16484..97786b9 100644 --- a/storage.py +++ b/storage.py @@ -6,6 +6,7 @@ latest_db_version = 0 logger = logging.getLogger(__name__) + class Storage(object): def __init__(self, db_path): """Setup the database @@ -44,32 +45,3 @@ class Storage(object): # Initialize a connection to the database conn = sqlite3.connect(self.db_path) self.cursor = conn.cursor() - - pass - - def get_sync_token(self): - """Retrieve the next_batch token from the last sync response. - - Used to sync without retrieving messages we've processed in the past - - Returns: - A str containing the last sync token or None if one does not exist - """ - self.cursor.execute("SELECT token FROM sync_token") - rows = self.cursor.fetchone() - - if not rows: - return None - - return rows[0] - - def save_sync_token(self, token): - """Save a token from a sync response. - - Can be retrieved later to sync from where we left off - - Args: - token (str): A next_batch token as part of a sync response - """ - self.cursor.execute("INSERT OR REPLACE INTO sync_token" - " (token) VALUES (?)", (token,)) diff --git a/sync_token.py b/sync_token.py new file mode 100644 index 0000000..beba3e4 --- /dev/null +++ b/sync_token.py @@ -0,0 +1,32 @@ +class SyncToken(object): + """A SyncToken is an instance of a sync token, which is a token retrieved from a matrix + homeserver. It is given to the /sync endpoint in order to specify at which point in the + event timeline you would like to receive messages after + """ + + def __init__(self, store): + """ + Args: + store (Storage): An object to access the storage layer + """ + self.store = store + + # Attempt to load a token from the provided storage layer + self._load() + + def _load(self): + """Load the latest sync token from the database""" + self.store.cursor.execute("SELECT token FROM sync_token") + rows = self.store.cursor.fetchone() + + if rows: + self.token = rows[0] + + def update(self, token): + """Update the sync token in the database + + Args: + token (str): A sync token from a sync response sent by a matrix homeserver + """ + self.store.cursor.execute("INSERT OR REPLACE INTO sync_token " + "(token) VALUES (?)", (token,))