simple-ecowatt/simple_ecowatt/api.py

138 lines
4.1 KiB
Python

import datetime
import requests
import base64
import typing as ty
import flask
import logging
import json
logger = logging.getLogger(__name__)
class EmptyCache(Exception):
"""Raised when the cache is empty and data cannot be fetched"""
class MissingData(Exception):
"""Raised when the data was correctly fetched, but data for the current time cannot
be found"""
class ApiState:
API_AUTH: str = "https://digital.iservices.rte-france.com/token/oauth/"
API_BASE: str = "https://digital.iservices.rte-france.com/open_api/ecowatt/v4"
API_SIGNALS: str = "/signals"
UPDATE_EVERY = datetime.timedelta(seconds=3600)
flask_app: flask.Flask
api_id: str
api_secret: str
auth_expires: datetime.datetime
next_update: datetime.datetime
bearer_token: str
def __init__(self, flask_app: flask.Flask):
self.flask_app = flask_app
self.auth_expires = datetime.datetime.fromtimestamp(0) # force re-auth
self.next_update = self.auth_expires
self.bearer_token = ""
self.api_id = flask_app.config["EW_API_ID"]
self.api_secret = flask_app.config["EW_API_SECRET"]
self.api_data = None
try:
with self.flask_app.open_instance_resource("cache.json", "r") as handle:
self.api_data = json.load(handle)
except FileNotFoundError:
pass
def _authenticate(self):
"""Authenticate to the OAuth API"""
logger.info("Re-authenticating")
auth_str: str = base64.b64encode(
f"{self.api_id}:{self.api_secret}".encode("utf8")
).decode("utf8")
response = requests.post(
self.API_AUTH, headers={"Authorization": f"Basic {auth_str}"}
)
if response.status_code != 200:
raise Exception(f"Failed to authenticate: {response.body()}")
json_response = response.json()
self.bearer_token = (
f"{json_response['token_type']} {json_response['access_token']}"
)
self.auth_expires = datetime.datetime.now() + datetime.timedelta(
seconds=int(json_response["expires_in"] - 60)
)
def _update(self):
if datetime.datetime.now() < self.next_update:
return
logger.info("Fetching data from the API")
if datetime.datetime.now() > self.auth_expires:
self._authenticate()
response = requests.get(
self.API_BASE + self.API_SIGNALS,
headers={
"Authorization": self.bearer_token,
},
)
now = datetime.datetime.now()
if response.status_code == 429:
self.next_update = now + datetime.timedelta(
seconds=int(response.headers["Retry-After"]) + 10
)
logger.error("Quota exceeded, delayed until %s", self.next_update)
return
if response.status_code != 200:
logger.error(
"Unhandled error while fetching data: %d %s",
response.status_code,
response.text,
)
self.api_data = None
return
self.api_data = response.json()
self.next_update = now + self.UPDATE_EVERY
with self.flask_app.open_instance_resource("cache.json", "w") as handle:
json.dump(self.api_data, handle)
def get_json(self):
self._update()
if self.api_data is None:
raise EmptyCache()
return self.api_data
def _get_day(self):
data = self.get_json()
for day in data["signals"]:
if (
datetime.datetime.fromisoformat(day["jour"]).date()
== datetime.date.today()
):
return day
raise MissingData()
def get_today(self) -> int:
data = self._get_day()
return int(data["dvalue"])
def get_now(self) -> int:
day = self._get_day()
cur_hour = datetime.datetime.now().hour
hour_data = day["values"][cur_hour]
assert int(hour_data["pas"]) == cur_hour
alert_val = int(hour_data["hvalue"])
return alert_val