papi/papi/main.py

186 lines
4.9 KiB
Python

from typing import Optional, List
from configparser import ConfigParser
from pydantic import BaseModel
from fastapi import FastAPI, Request, Body, Depends
from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session
from collections import defaultdict
from . import utils
from papi.sqlapp.database import Base, SessionLocal, engine
from papi.sqlapp import crud
from papi.sqlapp import schemas
app = FastAPI()
templates = Jinja2Templates(directory="templates/")
mesures = defaultdict(list)
notifications = defaultdict(list)
Base.metadata.create_all(bind=engine)
conf = ConfigParser()
conf.read("conf_notifications.ini")
def sondeid2notifsemails(idsonde, mapping=conf):
try:
section = mapping[idsonde]
except KeyError:
raise KeyError(f"{idsonde} non trouvé dans {list(mapping.keys())}")
mails = section["email"]
if isinstance(mails, str):
mails = [x.strip() for x in mails.split("\n")]
return mails
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
class Notifier:
@staticmethod
def __call__(idsonde, changes):
status = get_rapport(idsonde)
notifications[idsonde].append(
{"changes": changes, "status": [x for x in status]}
)
Notifier = Notifier()
sondes = {"test": schemas.SondeBase(identifiant="test", nom="Testlocal")}
def default_sample():
"Renvoie un dictionnaire vide qui sert peut servir de base de comparaison"
return {"channels": ()}
@app.get("/")
def read_root():
return {"msg": "Hello World"}
@app.post("/sonde/", response_model=List[schemas.SondeBase])
def post_sonde(sonde: schemas.SondeBase, db: Session = Depends(get_db)):
db_sonde = crud.get_sonde(db, identifiant=sonde.identifiant)
if not db_sonde:
db_sonde = crud.create_sonde(
db, sonde.identifiant, sonde.nom or sonde.identifiant
)
sondes[sonde.identifiant] = schemas.SondeBase.from_orm(
db_sonde
) # Attention au typage ici
return list(sondes.values())
@app.get("/sonde/", response_model=List[schemas.SondeBase])
def list_sonde():
return [x for x in sondes.values()]
@app.get("/notifications/{idsonde}")
def list_notification(idsonde: str):
return notifications[idsonde][::-1]
@app.get("/notifications/{idsonde}/text")
def list_notification_as_text(request: Request, idsonde: str):
content = notifications[idsonde][-1]
try:
recipients = sondeid2notifsemails(idsonde)
except KeyError:
recipients = ["mail1@xxx", "mail2@xxx"]
changements = content["changes"]
status = content["status"]
data = {"recipients": recipients, "changements": changements, "status": status}
return templates.TemplateResponse(
"notification_email.html", context={"request": request, "data": data}
)
@app.post("/sonde/{idsonde}/")
def post_sonde_data(
idsonde: str, body: dict = Body(...), db: Session = Depends(get_db)
):
if not (sonde := crud.get_sonde(db, idsonde)):
return
mesures_ = [x for x in sonde.mesures]
if not mesures_:
mesures_.append(
crud.create_mesure(db, sonde.sonde_id, content=default_sample())
)
mesures_.append(crud.create_mesure(db, sonde.sonde_id, content=body))
mesures[idsonde] = mesures_
date = body["date"]
previous = utils.prepare(mesures_[-2].content)
present = utils.prepare(mesures_[-1].content)
all_channels = sorted(set((*previous["channels"], *present["channels"])))
diff = utils.compare(all_channels, previous, present)
content = (
[" ".join((date, *utils.clean_state(d))) for d in diff["changements"]]
if diff
else ""
)
if content:
Notifier(idsonde, content)
return {
"count": len(mesures_)
if "date" in mesures_[0].content.keys()
else len(mesures_) - 1,
"notify": content,
}
@app.get("/sonde/{idsonde}/rapport")
def get_rapport(idsonde):
try:
last = mesures[idsonde][-1]
except IndexError:
return
else:
last = last.content
last = utils.prepare(last)
for name, data in sorted(
last["channels"].items(), key=lambda text: float(text[0].split("#")[0])
):
yield f'{name.split("#", 1)[1]} {data["status"]}'
@app.get("/sonde/{idsonde}/historique")
def historique(idsonde, db: Session = Depends(get_db)):
if not (sonde := crud.get_sonde(db, idsonde)):
return
mesures = sonde.mesures
for previous, present in zip(mesures, mesures[1:]):
date = present.content["date"]
previous = utils.prepare(previous.content)
present = utils.prepare(present.content)
all_channels = sorted(set((*previous["channels"], *present["channels"])))
diff = utils.compare(all_channels, previous, present)
if diff:
for d in diff["changements"]:
line = date, *utils.clean_state(d)
yield " ".join(line)