124 lines
2.9 KiB
Python
124 lines
2.9 KiB
Python
from typing import Optional
|
|
from pydantic import BaseModel
|
|
|
|
from fastapi import FastAPI, Request, Body
|
|
|
|
from collections import defaultdict
|
|
|
|
from . import utils
|
|
|
|
app = FastAPI()
|
|
|
|
|
|
mesures = defaultdict(list)
|
|
notifications = defaultdict(list)
|
|
|
|
|
|
class Notifier:
|
|
@staticmethod
|
|
def __call__(idsonde, changes):
|
|
status = get_rapport(idsonde)
|
|
notifications[idsonde].append(
|
|
{"changes": changes, "status": [x for x in status]}
|
|
)
|
|
|
|
|
|
class Sonde(BaseModel):
|
|
identifiant: str
|
|
nom: str
|
|
|
|
|
|
Notifier = Notifier()
|
|
|
|
sondes = {"test": Sonde(identifiant="test", nom="Testlocal")}
|
|
|
|
|
|
def default_sample():
|
|
"Renvoie un dictionnaire vide qui sert peut servir de base de comparaison"
|
|
return {"channels": ()}
|
|
|
|
|
|
@app.get("/")
|
|
def read_root():
|
|
return {"msg": "Hello World"}
|
|
|
|
|
|
@app.post("/sonde/")
|
|
def post_sonde(sonde: Sonde):
|
|
sondes[sonde.identifiant] = sonde
|
|
return list(sondes.values())
|
|
|
|
|
|
@app.get("/sonde/")
|
|
def list_sonde():
|
|
return [x for x in sondes.values()]
|
|
|
|
|
|
@app.get("/notifications/{idsonde}/")
|
|
def list_notification(idsonde: str):
|
|
return notifications[idsonde][::-1]
|
|
|
|
|
|
@app.post("/sonde/{idsonde}/")
|
|
def post_sonde_data(idsonde: str, body: dict = Body(...)):
|
|
if idsonde not in sondes:
|
|
return
|
|
|
|
mesures_ = mesures[idsonde]
|
|
if not mesures_:
|
|
mesures_.append(default_sample())
|
|
mesures_.append(body)
|
|
|
|
date = body["date"]
|
|
previous = utils.prepare(mesures_[-2])
|
|
present = utils.prepare(mesures_[-1])
|
|
all_channels = sorted(set((*previous["channels"], *present["channels"])))
|
|
|
|
diff = utils.compare(all_channels, previous, present)
|
|
|
|
content = (
|
|
[" ".join((date, *utils.clean_state(d))) for d in diff["changements"]]
|
|
if diff
|
|
else ""
|
|
)
|
|
if content:
|
|
Notifier(idsonde, content)
|
|
return {
|
|
"count": len(mesures_)
|
|
if mesures_[0] != default_sample()
|
|
else len(mesures_) - 1,
|
|
"notify": content,
|
|
}
|
|
|
|
|
|
@app.get("/sonde/{idsonde}/rapport")
|
|
def get_rapport(idsonde):
|
|
try:
|
|
last = mesures[idsonde][-1]
|
|
except IndexError:
|
|
return
|
|
else:
|
|
last = utils.prepare(last)
|
|
for name, data in sorted(
|
|
last["channels"].items(), key=lambda text: float(text[0].split("#")[0])
|
|
):
|
|
yield f'{name} {data["status"]}'
|
|
|
|
|
|
@app.get("/sonde/{idsonde}/historique")
|
|
def historique(idsonde):
|
|
print(len(mesures[idsonde]))
|
|
|
|
for previous, present in zip(mesures[idsonde], mesures[idsonde][1:]):
|
|
date = present["date"]
|
|
previous = utils.prepare(previous)
|
|
present = utils.prepare(present)
|
|
all_channels = sorted(set((*previous["channels"], *present["channels"])))
|
|
|
|
diff = utils.compare(all_channels, previous, present)
|
|
|
|
if diff:
|
|
for d in diff["changements"]:
|
|
line = date, *utils.clean_state(d)
|
|
yield " ".join(line)
|