import datetime import requests import base64 import typing as ty import flask import logging import json logger = logging.getLogger(__name__) class EmptyCache(Exception): """Raised when the cache is empty and data cannot be fetched""" class MissingData(Exception): """Raised when the data was correctly fetched, but data for the current time cannot be found""" class ApiState: API_AUTH: str = "https://digital.iservices.rte-france.com/token/oauth/" API_BASE: str = "https://digital.iservices.rte-france.com/open_api/ecowatt/v4" API_SIGNALS: str = "/signals" UPDATE_EVERY = datetime.timedelta(seconds=3600) flask_app: flask.Flask api_id: str api_secret: str auth_expires: datetime.datetime next_update: datetime.datetime bearer_token: str def __init__(self, flask_app: flask.Flask): self.flask_app = flask_app self.auth_expires = datetime.datetime.fromtimestamp(0) # force re-auth self.next_update = self.auth_expires self.bearer_token = "" self.api_id = flask_app.config["EW_API_ID"] self.api_secret = flask_app.config["EW_API_SECRET"] self.api_data = None try: with self.flask_app.open_instance_resource("cache.json", "r") as handle: self.api_data = json.load(handle) except FileNotFoundError: pass def _authenticate(self): """Authenticate to the OAuth API""" logger.info("Re-authenticating") auth_str: str = base64.b64encode( f"{self.api_id}:{self.api_secret}".encode("utf8") ).decode("utf8") response = requests.post( self.API_AUTH, headers={"Authorization": f"Basic {auth_str}"} ) if response.status_code != 200: raise Exception(f"Failed to authenticate: {response.body()}") json_response = response.json() self.bearer_token = ( f"{json_response['token_type']} {json_response['access_token']}" ) self.auth_expires = datetime.datetime.now() + datetime.timedelta( seconds=int(json_response["expires_in"] - 60) ) def _update(self): if datetime.datetime.now() < self.next_update: return logger.info("Fetching data from the API") if datetime.datetime.now() > self.auth_expires: self._authenticate() response = requests.get( self.API_BASE + self.API_SIGNALS, headers={ "Authorization": self.bearer_token, }, ) now = datetime.datetime.now() if response.status_code == 429: self.next_update = now + datetime.timedelta( seconds=int(response.headers["Retry-After"]) + 10 ) logger.error("Quota exceeded, delayed until %s", self.next_update) return if response.status_code != 200: logger.error( "Unhandled error while fetching data: %d %s", response.status_code, response.text, ) self.api_data = None return self.api_data = response.json() self.next_update = now + self.UPDATE_EVERY with self.flask_app.open_instance_resource("cache.json", "w") as handle: json.dump(self.api_data, handle) def get_json(self): self._update() if self.api_data is None: raise EmptyCache() return self.api_data def _get_day(self): data = self.get_json() for day in data["signals"]: if ( datetime.datetime.fromisoformat(day["jour"]).date() == datetime.date.today() ): return day raise MissingData() def get_today(self) -> int: data = self._get_day() return int(data["dvalue"]) def get_now(self) -> int: day = self._get_day() cur_hour = datetime.datetime.now().hour hour_data = day["values"][cur_hour] assert int(hour_data["pas"]) == cur_hour alert_val = int(hour_data["hvalue"]) return alert_val