145 lines
3.8 KiB
Python
145 lines
3.8 KiB
Python
from typing import Optional, List
|
|
from pydantic import BaseModel
|
|
|
|
from fastapi import FastAPI, Request, Body, Depends
|
|
from sqlalchemy.orm import Session
|
|
|
|
from collections import defaultdict
|
|
|
|
from . import utils
|
|
|
|
from papi.sqlapp.database import Base, SessionLocal, engine
|
|
from papi.sqlapp import crud
|
|
from papi.sqlapp import schemas
|
|
|
|
app = FastAPI()
|
|
|
|
|
|
mesures = defaultdict(list)
|
|
notifications = defaultdict(list)
|
|
|
|
Base.metadata.create_all(bind=engine)
|
|
|
|
|
|
def get_db():
|
|
db = SessionLocal()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
class Notifier:
|
|
@staticmethod
|
|
def __call__(idsonde, changes):
|
|
status = get_rapport(idsonde)
|
|
notifications[idsonde].append(
|
|
{"changes": changes, "status": [x for x in status]}
|
|
)
|
|
|
|
|
|
Notifier = Notifier()
|
|
|
|
sondes = {"test": schemas.SondeBase(identifiant="test", nom="Testlocal")}
|
|
|
|
|
|
def default_sample():
|
|
"Renvoie un dictionnaire vide qui sert peut servir de base de comparaison"
|
|
return {"channels": ()}
|
|
|
|
|
|
@app.get("/")
|
|
def read_root():
|
|
return {"msg": "Hello World"}
|
|
|
|
|
|
@app.post("/sonde/", response_model=List[schemas.SondeBase])
|
|
def post_sonde(sonde: schemas.SondeBase, db: Session = Depends(get_db)):
|
|
db_sonde = crud.get_sonde(db, identifiant=sonde.identifiant)
|
|
if not db_sonde:
|
|
db_sonde = crud.create_sonde(
|
|
db, sonde.identifiant, sonde.nom or sonde.identifiant
|
|
)
|
|
sondes[sonde.identifiant] = schemas.SondeBase.from_orm(
|
|
db_sonde
|
|
) # Attention au typage ici
|
|
return list(sondes.values())
|
|
|
|
|
|
@app.get("/sonde/", response_model=List[schemas.SondeBase])
|
|
def list_sonde():
|
|
return [x for x in sondes.values()]
|
|
|
|
|
|
@app.get("/notifications/{idsonde}/")
|
|
def list_notification(idsonde: str):
|
|
return notifications[idsonde][::-1]
|
|
|
|
|
|
@app.post("/sonde/{idsonde}/")
|
|
def post_sonde_data(
|
|
idsonde: str, body: dict = Body(...), db: Session = Depends(get_db)
|
|
):
|
|
if not (sonde := crud.get_sonde(db, idsonde)):
|
|
return
|
|
mesures_ = []
|
|
mesures_.extend(crud.get_mesure(db, sonde.identifiant))
|
|
if not mesures_:
|
|
mesures_.append(
|
|
crud.create_mesure(db, sonde.sonde_id, content=default_sample())
|
|
)
|
|
mesures_.append(crud.create_mesure(db, sonde.sonde_id, content=body))
|
|
mesures[idsonde] = mesures_
|
|
date = body["date"]
|
|
previous = utils.prepare(mesures_[-2].content)
|
|
present = utils.prepare(mesures_[-1].content)
|
|
all_channels = sorted(set((*previous["channels"], *present["channels"])))
|
|
|
|
diff = utils.compare(all_channels, previous, present)
|
|
|
|
content = (
|
|
[" ".join((date, *utils.clean_state(d))) for d in diff["changements"]]
|
|
if diff
|
|
else ""
|
|
)
|
|
if content:
|
|
Notifier(idsonde, content)
|
|
return {
|
|
"count": len(mesures_)
|
|
if mesures_[0] != default_sample()
|
|
else len(mesures_) - 1,
|
|
"notify": content,
|
|
}
|
|
|
|
|
|
@app.get("/sonde/{idsonde}/rapport")
|
|
def get_rapport(idsonde):
|
|
try:
|
|
last = mesures[idsonde][-1]
|
|
except IndexError:
|
|
return
|
|
else:
|
|
last = utils.prepare(last)
|
|
for name, data in sorted(
|
|
last["channels"].items(), key=lambda text: float(text[0].split("#")[0])
|
|
):
|
|
yield f'{name} {data["status"]}'
|
|
|
|
|
|
@app.get("/sonde/{idsonde}/historique")
|
|
def historique(idsonde):
|
|
print(len(mesures[idsonde]))
|
|
|
|
for previous, present in zip(mesures[idsonde], mesures[idsonde][1:]):
|
|
date = present.content["date"]
|
|
previous = utils.prepare(previous.content)
|
|
present = utils.prepare(present.content)
|
|
all_channels = sorted(set((*previous["channels"], *present["channels"])))
|
|
|
|
diff = utils.compare(all_channels, previous, present)
|
|
|
|
if diff:
|
|
for d in diff["changements"]:
|
|
line = date, *utils.clean_state(d)
|
|
yield " ".join(line)
|