papi/papi/utils.py

154 lines
3.7 KiB
Python

import requests
import sys
from time import sleep
from copy import deepcopy
import math
import itertools
import datetime
import csv
import json
import logging as logger
def prepare(record):
newdict = deepcopy(record)
newdict["channels"] = dict(
sorted(channel2tuple(channel) for channel in record["channels"])
)
return newdict
def channel2tuple(channel):
return "%s#%s" % (channel["id"], channel["name"]), channel
def get_status(channels, key):
d = channels["channels"].get(key, {"status": "absent"})["status"]
return d
def compare(channels, previous, current):
changes, states = [], []
for key in channels:
pstatus = get_status(previous, key)
cstatus = get_status(current, key)
state = key, pstatus, cstatus
if pstatus != cstatus:
changes.append(state)
states.append(state)
res = {}
if not changes:
return {}
disparus = [state for state in states if state[2] == "absent"]
if disparus:
res["disparus"] = disparus
apparus = [state for state in states if state[1] == "absent"]
if apparus:
res["apparus"] = apparus
res["changements"] = changes
res["etats"] = states
return res
def clean_state(state):
# Ote l'identifiant en début de chaine
return state[0].split("#", 1)[1], state[1], "->", state[2]
def list_channels(p, c):
all_channels = sorted(set((*p["channels"], *c["channels"])))
return all_channels
def main():
loopcount = itertools.count().__next__
login()
previous = load_or_fetch()
historique = []
while loopcount() < maxloop:
current = fetch().json()
forward(current)
with open("last.json", "w") as f:
json.dump(current, f)
with open(raw_filename(), "a") as f:
json.dump(current, f)
current, previous = prepare(current), prepare(previous)
all_channels = sorted(set((*previous["channels"], *current["channels"])))
savelog2csv(current)
diff = compare(all_channels, previous, current)
savediff(date=current["date"], diff=diff)
if diff:
print("**********")
print(diff["changements"])
print("!!!!!!!!!!")
historique.append(diff)
previous = current
sleep(20)
return historique
def make_id_key(channel, keys=None, sep="#", tuple_=False):
"""
This takes out the concatenation of keys, value to use is as a pair.
>>> sample = {'id': 6, 'name': 'foo'}
>>> make_id_key(sample)
{'6#foo': {'id': 6, 'name': 'foo'}}
"""
if not keys:
keys = ["id", "name"]
kvalue = sep.join(str(channel[k]) for k in keys)
if tuple_:
return kvalue, channel
return {kvalue: channel}
"""
def raw_filename():
return "raw_" + str(datetime.date.today()).replace("-", "_") + ".json"
def log_filename():
return "log_" + str(datetime.date.today()).replace("-", "_") + ".csv"
def diff_filename():
return "diff_" + str(datetime.date.today()).replace("-", "_") + ".csv"
def savelog2csv(alert, *, filename_f=log_filename):
keys = "date id name type status".split(" ")
with open(filename_f(), "a") as f:
writer = csv.DictWriter(f, keys)
if f.tell() == 0:
writer.writeheader()
for a in status2list(alert):
writer.writerow(a)
def savediff(date, diff, *, filename=diff_filename):
keys = "date name before after".split(" ")
with open(filename(), "a") as f:
writer = csv.DictWriter(f, keys)
if f.tell() == 0:
writer.writeheader()
for d in diff:
data = {"date": date}
data.update(zip(("name", "before", "after"), d))
writer.writerow(data)
"""