Separate the storage layer into individual classes
This commit is contained in:
parent
5c7760e2c0
commit
395fda40ec
4 changed files with 52 additions and 37 deletions
15
README.md
15
README.md
|
@ -44,9 +44,18 @@ to put or retrieve data from it. Table definitions should be specified in
|
||||||
`_run_migrations`. There's currently no defined method for how migrations
|
`_run_migrations`. There's currently no defined method for how migrations
|
||||||
should work though.
|
should work though.
|
||||||
|
|
||||||
The `sync_token` table and `get_sync_token`, `save_sync_tokens` should be left
|
The `sync_token` table should be left in tact so that the bot can save its
|
||||||
in tact so that the bot can save its progress when syncing events from the
|
progress when syncing events from the homeserver.
|
||||||
homeserver.
|
|
||||||
|
### `sync_token.py`
|
||||||
|
|
||||||
|
A simple class that can load and save a sync token to/from the database.
|
||||||
|
|
||||||
|
A `SyncToken` is an instance of a sync token, which is simply a string
|
||||||
|
retrieved from a matrix homeserver when querying the `/sync` endpoint (which
|
||||||
|
clients use to retrieve new events). It is given to the next call of the
|
||||||
|
`/sync` endpoint in order to specify the starting point in the event timeline
|
||||||
|
you would like to receive messages from.
|
||||||
|
|
||||||
### `callbacks.py`
|
### `callbacks.py`
|
||||||
|
|
||||||
|
|
12
main.py
12
main.py
|
@ -12,6 +12,7 @@ from nio import (
|
||||||
from callbacks import Callbacks
|
from callbacks import Callbacks
|
||||||
from config import Config
|
from config import Config
|
||||||
from storage import Storage
|
from storage import Storage
|
||||||
|
from sync_token import SyncToken
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -45,22 +46,23 @@ async def main():
|
||||||
client.add_event_callback(callbacks.message, (RoomMessageText,))
|
client.add_event_callback(callbacks.message, (RoomMessageText,))
|
||||||
client.add_event_callback(callbacks.invite, (InviteEvent,))
|
client.add_event_callback(callbacks.invite, (InviteEvent,))
|
||||||
|
|
||||||
# Retrieve the last sync token if it exists
|
# Create a new sync token, attempting to load one from the database if it has one already
|
||||||
token = store.get_sync_token()
|
sync_token = SyncToken(store)
|
||||||
|
|
||||||
# Sync loop
|
# Sync loop
|
||||||
while True:
|
while True:
|
||||||
# Sync with the server
|
# Sync with the server
|
||||||
sync_response = await client.sync(timeout=30000, full_state=True, since=token)
|
sync_response = await client.sync(timeout=30000, full_state=True,
|
||||||
|
since=sync_token.token)
|
||||||
|
|
||||||
# Check if the sync had an error
|
# Check if the sync had an error
|
||||||
if type(sync_response) == SyncError:
|
if type(sync_response) == SyncError:
|
||||||
logger.warning("Error in client sync: %s", sync_response.message)
|
logger.warning("Error in client sync: %s", sync_response.message)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Save the latest sync token
|
# Save the latest sync token to the database
|
||||||
token = sync_response.next_batch
|
token = sync_response.next_batch
|
||||||
if token:
|
if token:
|
||||||
store.save_sync_token(token)
|
sync_token.update(token)
|
||||||
|
|
||||||
asyncio.get_event_loop().run_until_complete(main())
|
asyncio.get_event_loop().run_until_complete(main())
|
||||||
|
|
30
storage.py
30
storage.py
|
@ -6,6 +6,7 @@ latest_db_version = 0
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class Storage(object):
|
class Storage(object):
|
||||||
def __init__(self, db_path):
|
def __init__(self, db_path):
|
||||||
"""Setup the database
|
"""Setup the database
|
||||||
|
@ -44,32 +45,3 @@ class Storage(object):
|
||||||
# Initialize a connection to the database
|
# Initialize a connection to the database
|
||||||
conn = sqlite3.connect(self.db_path)
|
conn = sqlite3.connect(self.db_path)
|
||||||
self.cursor = conn.cursor()
|
self.cursor = conn.cursor()
|
||||||
|
|
||||||
pass
|
|
||||||
|
|
||||||
def get_sync_token(self):
|
|
||||||
"""Retrieve the next_batch token from the last sync response.
|
|
||||||
|
|
||||||
Used to sync without retrieving messages we've processed in the past
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
A str containing the last sync token or None if one does not exist
|
|
||||||
"""
|
|
||||||
self.cursor.execute("SELECT token FROM sync_token")
|
|
||||||
rows = self.cursor.fetchone()
|
|
||||||
|
|
||||||
if not rows:
|
|
||||||
return None
|
|
||||||
|
|
||||||
return rows[0]
|
|
||||||
|
|
||||||
def save_sync_token(self, token):
|
|
||||||
"""Save a token from a sync response.
|
|
||||||
|
|
||||||
Can be retrieved later to sync from where we left off
|
|
||||||
|
|
||||||
Args:
|
|
||||||
token (str): A next_batch token as part of a sync response
|
|
||||||
"""
|
|
||||||
self.cursor.execute("INSERT OR REPLACE INTO sync_token"
|
|
||||||
" (token) VALUES (?)", (token,))
|
|
||||||
|
|
32
sync_token.py
Normal file
32
sync_token.py
Normal file
|
@ -0,0 +1,32 @@
|
||||||
|
class SyncToken(object):
|
||||||
|
"""A SyncToken is an instance of a sync token, which is a token retrieved from a matrix
|
||||||
|
homeserver. It is given to the /sync endpoint in order to specify at which point in the
|
||||||
|
event timeline you would like to receive messages after
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, store):
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
store (Storage): An object to access the storage layer
|
||||||
|
"""
|
||||||
|
self.store = store
|
||||||
|
|
||||||
|
# Attempt to load a token from the provided storage layer
|
||||||
|
self._load()
|
||||||
|
|
||||||
|
def _load(self):
|
||||||
|
"""Load the latest sync token from the database"""
|
||||||
|
self.store.cursor.execute("SELECT token FROM sync_token")
|
||||||
|
rows = self.store.cursor.fetchone()
|
||||||
|
|
||||||
|
if rows:
|
||||||
|
self.token = rows[0]
|
||||||
|
|
||||||
|
def update(self, token):
|
||||||
|
"""Update the sync token in the database
|
||||||
|
|
||||||
|
Args:
|
||||||
|
token (str): A sync token from a sync response sent by a matrix homeserver
|
||||||
|
"""
|
||||||
|
self.store.cursor.execute("INSERT OR REPLACE INTO sync_token "
|
||||||
|
"(token) VALUES (?)", (token,))
|
Loading…
Reference in a new issue