papi/papi/main.py

141 lines
3.5 KiB
Python

from typing import Optional, List
from pydantic import BaseModel
from fastapi import FastAPI, Request, Body, Depends
from sqlalchemy.orm import Session
from collections import defaultdict
from . import utils
from papi.sqlapp.database import Base, SessionLocal, engine
from papi.sqlapp import crud
from papi.sqlapp import schemas
app = FastAPI()
mesures = defaultdict(list)
notifications = defaultdict(list)
Base.metadata.create_all(bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
class Notifier:
@staticmethod
def __call__(idsonde, changes):
status = get_rapport(idsonde)
notifications[idsonde].append(
{"changes": changes, "status": [x for x in status]}
)
Notifier = Notifier()
sondes = {"test": schemas.SondeBase(identifiant="test", nom="Testlocal")}
def default_sample():
"Renvoie un dictionnaire vide qui sert peut servir de base de comparaison"
return {"channels": ()}
@app.get("/")
def read_root():
return {"msg": "Hello World"}
@app.post("/sonde/", response_model=List[schemas.SondeBase])
def post_sonde(sonde: schemas.SondeBase, db: Session = Depends(get_db)):
db_sonde = crud.get_sonde(db, identifiant=sonde.identifiant)
if not db_sonde:
db_sonde = crud.create_sonde(
db, sonde.identifiant, sonde.nom or sonde.identifiant
)
sondes[sonde.identifiant] = schemas.SondeBase.from_orm(
db_sonde
) # Attention au typage ici
return list(sondes.values())
@app.get("/sonde/", response_model=List[schemas.SondeBase])
def list_sonde():
return [x for x in sondes.values()]
@app.get("/notifications/{idsonde}/")
def list_notification(idsonde: str):
return notifications[idsonde][::-1]
@app.post("/sonde/{idsonde}/")
def post_sonde_data(idsonde: str, body: dict = Body(...)):
if idsonde not in sondes:
return
mesures_ = mesures[idsonde]
if not mesures_:
mesures_.append(default_sample())
mesures_.append(body)
date = body["date"]
previous = utils.prepare(mesures_[-2])
present = utils.prepare(mesures_[-1])
all_channels = sorted(set((*previous["channels"], *present["channels"])))
diff = utils.compare(all_channels, previous, present)
content = (
[" ".join((date, *utils.clean_state(d))) for d in diff["changements"]]
if diff
else ""
)
if content:
Notifier(idsonde, content)
return {
"count": len(mesures_)
if mesures_[0] != default_sample()
else len(mesures_) - 1,
"notify": content,
}
@app.get("/sonde/{idsonde}/rapport")
def get_rapport(idsonde):
try:
last = mesures[idsonde][-1]
except IndexError:
return
else:
last = utils.prepare(last)
for name, data in sorted(
last["channels"].items(), key=lambda text: float(text[0].split("#")[0])
):
yield f'{name} {data["status"]}'
@app.get("/sonde/{idsonde}/historique")
def historique(idsonde):
print(len(mesures[idsonde]))
for previous, present in zip(mesures[idsonde], mesures[idsonde][1:]):
date = present["date"]
previous = utils.prepare(previous)
present = utils.prepare(present)
all_channels = sorted(set((*previous["channels"], *present["channels"])))
diff = utils.compare(all_channels, previous, present)
if diff:
for d in diff["changements"]:
line = date, *utils.clean_state(d)
yield " ".join(line)