Add probe for forwadring only

This commit is contained in:
Colin Goutte 2021-09-27 17:50:18 +02:00
parent c801f1d771
commit 001fb9026d
1 changed files with 259 additions and 0 deletions

259
papi/sonde.py Normal file
View File

@ -0,0 +1,259 @@
import requests
import sys
from time import sleep
from copy import deepcopy
try:
from . import api_credentials
except ImportError:
import api_credentials
import urllib3
import math
import itertools
import datetime
import csv
import json
import logging as logger
import json
https = False
urllib3.disable_warnings()
session = requests.Session()
session.verify = False
url = "https://%s" % api_credentials.ip
apiurl = "%s/api/1.1" % url
logged = []
def login():
post_url = "%s/user/login" % apiurl
login_r = session.post(
post_url,
json={"login": api_credentials.user, "password": api_credentials.password},
)
logged.append(True)
if login_r.ok:
logger.info("Logged")
else: # pragma: no cover
logger.info("Login error")
return login_r
def forward(data):
forward_urls = [
"https://papi.silib.re/sonde/test/",
]
for post_url in forward_urls:
res = session.post(post_url, json=data)
print(res.ok)
def status2list(status: dict):
keys = "id name channelType status".split(" ")
id_ = "id"
translate = {"channelType": "type"}
date = status["date"]
res = []
for key, channel in status["channels"].items():
res.append(
{"date": date, **{translate.get(key, key): channel[key] for key in keys}}
)
return sorted(res, key=lambda x: x[id_])
def strip_channel(channel, keys=None):
"""
>>> s = {'alarms': {'noSignal': True,
... 'noWatermark': None,
... 'qualityIndexStatus': None,
... 'timestampDrift': None,
... 'unexpectedWatermark': None},
... 'channelType': 'fm',
... 'id': 6,
... 'lastTimestamp': None,
... 'lastWatermarkId': None,
... 'name': 'AES-67',
... 'status': 'error'}
>>> strip_channel(s)
{'id': 6, 'name': 'AES-67', 'status': 'error'}
"""
if keys is None:
keys = "id", "name", "status"
return {k: channel[k] for k in keys}
def prepare(record):
newdict = deepcopy(record)
newdict["channels"] = dict(
sorted(channel2tuple(channel) for channel in record["channels"])
)
return newdict
def fetch():
if not logged:
login()
fetched = session.get("%s/status" % apiurl)
return fetched
def channel2tuple(channel):
return "%s#%s" % (channel["id"], channel["name"]), channel
pass
def get_status(channels, key):
d = channels["channels"].get(key, {"status": "absent"})["status"]
return d
def compare(channels, previous, current):
changes, states = [], []
for key in channels:
pstatus = get_status(previous, key)
cstatus = get_status(current, key)
state = key, pstatus, cstatus
if pstatus != cstatus:
changes.append(state)
states.append(state)
res = {}
if not changes:
return {}
disparus = [state for state in states if state[2] == "absent"]
if disparus:
res["disparus"] = disparus
apparus = [state for state in states if state[1] == "absent"]
if apparus:
res["apparus"] = apparus
res["changements"] = changes
res["etats"] = states
return res
def load_or_fetch(fetch=fetch):
try:
with open("last.json") as f:
previous = json.load(f)
# print("found")
except (FileNotFoundError, json.decoder.JSONDecodeError):
print("Nolast")
with open("last.json", "w") as f:
previous = fetch().json()
json.dump(previous, f)
return previous
def list_channels(p, c):
all_channels = sorted(set((*p["channels"], *c["channels"])))
return all_channels
def main(*, maxloop=math.inf, login=login, fetch=fetch):
loopcount = itertools.count().__next__
login()
previous = load_or_fetch()
historique = []
while loopcount() < maxloop:
try:
current = fetch().json()
except json.decoder.JSONDecodeError:
breakpoint()
forward(current)
""""
with open("last.json", "w") as f:
json.dump(current, f)
with open(raw_filename(), "a") as f:
json.dump(current, f)
current, previous = prepare(current), prepare(previous)
all_channels = sorted(set((*previous["channels"], *current["channels"])))
savelog2csv(current)
diff = compare(all_channels, previous, current)
savediff(date=current["date"], diff=diff)
if diff:
print("**********")
print(diff["changements"])
print("!!!!!!!!!!")
historique.append(diff)
previous = current
sleep(0.5)
return historique
"""
def make_id_key(channel, keys=None, sep="#", tuple_=False):
"""
This takes out the concatenation of keys, value to use is as a pair.
>>> sample = {'id': 6, 'name': 'foo'}
>>> make_id_key(sample)
{'6#foo': {'id': 6, 'name': 'foo'}}
"""
if not keys:
keys = ["id", "name"]
kvalue = sep.join(str(channel[k]) for k in keys)
if tuple_:
return kvalue, channel
return {kvalue: channel}
def raw_filename():
return "raw_" + str(datetime.date.today()).replace("-", "_") + ".json"
def log_filename():
return "log_" + str(datetime.date.today()).replace("-", "_") + ".csv"
def diff_filename():
return "diff_" + str(datetime.date.today()).replace("-", "_") + ".csv"
def savelog2csv(alert, *, filename_f=log_filename):
keys = "date id name type status".split(" ")
with open(filename_f(), "a") as f:
writer = csv.DictWriter(f, keys)
if f.tell() == 0:
writer.writeheader()
for a in status2list(alert):
writer.writerow(a)
def savediff(date, diff, *, filename=diff_filename):
keys = "date name before after".split(" ")
with open(filename(), "a") as f:
writer = csv.DictWriter(f, keys)
if f.tell() == 0:
writer.writeheader()
for d in diff:
data = {"date": date}
data.update(zip(("name", "before", "after"), d))
writer.writerow(data)
if __name__ == "__main__":
main(maxloop=1)