138 lines
4.1 KiB
Python
138 lines
4.1 KiB
Python
import datetime
|
|
import requests
|
|
import base64
|
|
import typing as ty
|
|
import flask
|
|
import logging
|
|
import json
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class EmptyCache(Exception):
|
|
"""Raised when the cache is empty and data cannot be fetched"""
|
|
|
|
|
|
class MissingData(Exception):
|
|
"""Raised when the data was correctly fetched, but data for the current time cannot
|
|
be found"""
|
|
|
|
|
|
class ApiState:
|
|
API_AUTH: str = "https://digital.iservices.rte-france.com/token/oauth/"
|
|
API_BASE: str = "https://digital.iservices.rte-france.com/open_api/ecowatt/v4"
|
|
API_SIGNALS: str = "/signals"
|
|
UPDATE_EVERY = datetime.timedelta(seconds=3600)
|
|
|
|
flask_app: flask.Flask
|
|
|
|
api_id: str
|
|
api_secret: str
|
|
|
|
auth_expires: datetime.datetime
|
|
next_update: datetime.datetime
|
|
bearer_token: str
|
|
|
|
def __init__(self, flask_app: flask.Flask):
|
|
self.flask_app = flask_app
|
|
self.auth_expires = datetime.datetime.fromtimestamp(0) # force re-auth
|
|
self.next_update = self.auth_expires
|
|
self.bearer_token = ""
|
|
self.api_id = flask_app.config["EW_API_ID"]
|
|
self.api_secret = flask_app.config["EW_API_SECRET"]
|
|
|
|
self.api_data = None
|
|
try:
|
|
with self.flask_app.open_instance_resource("cache.json", "r") as handle:
|
|
self.api_data = json.load(handle)
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
def _authenticate(self):
|
|
"""Authenticate to the OAuth API"""
|
|
logger.info("Re-authenticating")
|
|
auth_str: str = base64.b64encode(
|
|
f"{self.api_id}:{self.api_secret}".encode("utf8")
|
|
).decode("utf8")
|
|
response = requests.post(
|
|
self.API_AUTH, headers={"Authorization": f"Basic {auth_str}"}
|
|
)
|
|
if response.status_code != 200:
|
|
raise Exception(f"Failed to authenticate: {response.body()}")
|
|
json_response = response.json()
|
|
self.bearer_token = (
|
|
f"{json_response['token_type']} {json_response['access_token']}"
|
|
)
|
|
self.auth_expires = datetime.datetime.now() + datetime.timedelta(
|
|
seconds=int(json_response["expires_in"] - 60)
|
|
)
|
|
|
|
def _update(self):
|
|
if datetime.datetime.now() < self.next_update:
|
|
return
|
|
|
|
logger.info("Fetching data from the API")
|
|
|
|
if datetime.datetime.now() > self.auth_expires:
|
|
self._authenticate()
|
|
|
|
response = requests.get(
|
|
self.API_BASE + self.API_SIGNALS,
|
|
headers={
|
|
"Authorization": self.bearer_token,
|
|
},
|
|
)
|
|
now = datetime.datetime.now()
|
|
if response.status_code == 429:
|
|
self.next_update = now + datetime.timedelta(
|
|
seconds=int(response.headers["Retry-After"]) + 10
|
|
)
|
|
logger.error("Quota exceeded, delayed until %s", self.next_update)
|
|
return
|
|
if response.status_code != 200:
|
|
logger.error(
|
|
"Unhandled error while fetching data: %d %s",
|
|
response.status_code,
|
|
response.text,
|
|
)
|
|
self.api_data = None
|
|
return
|
|
self.api_data = response.json()
|
|
self.next_update = now + self.UPDATE_EVERY
|
|
|
|
with self.flask_app.open_instance_resource("cache.json", "w") as handle:
|
|
json.dump(self.api_data, handle)
|
|
|
|
def get_json(self):
|
|
self._update()
|
|
|
|
if self.api_data is None:
|
|
raise EmptyCache()
|
|
|
|
return self.api_data
|
|
|
|
def _get_day(self):
|
|
data = self.get_json()
|
|
|
|
for day in data["signals"]:
|
|
if (
|
|
datetime.datetime.fromisoformat(day["jour"]).date()
|
|
== datetime.date.today()
|
|
):
|
|
return day
|
|
raise MissingData()
|
|
|
|
def get_today(self) -> int:
|
|
data = self._get_day()
|
|
return int(data["dvalue"])
|
|
|
|
def get_now(self) -> int:
|
|
day = self._get_day()
|
|
|
|
cur_hour = datetime.datetime.now().hour
|
|
hour_data = day["values"][cur_hour]
|
|
assert int(hour_data["pas"]) == cur_hour
|
|
|
|
alert_val = int(hour_data["hvalue"])
|
|
return alert_val
|