-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprocess.py
61 lines (53 loc) · 2.15 KB
/
process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import sys
import re
import os
import aiohttp
import asyncio
from enum import Enum
from time import gmtime, strftime
import json
pattern = r"POCSAG1200:\s+Address:\s+(\d+)\s+Function:\s+(\d)"
VALUES=['a','b','c','d']
SUPERVISOR_TOKEN=os.environ['SUPERVISOR_TOKEN']
def loadConfig():
with open('/data/options.json') as config_file:
return json.load(config_file)
async def fire_event(event_type: str, payload):
async with aiohttp.ClientSession() as websession:
async with websession.post(f'http://supervisor/core/api/events/{event_type}',
headers={
'Authorization': f'Bearer {SUPERVISOR_TOKEN}'
},
json=payload) as response:
response.raise_for_status()
def listStartswith(items: list, key: str):
for item in items:
if key.startswith(item):
return True
return False
CONFIG=loadConfig()
CONFIG_IGNORE_ADDRESSES=CONFIG['ignore_addresses']
if len(CONFIG_IGNORE_ADDRESSES) > 0:
print('Ignoring the following addresses: ')
for address in CONFIG_IGNORE_ADDRESSES:
print(address)
for line in sys.stdin:
if 'Exit' == line.rstrip():
break
if "No supported devices found." in line:
break
result = re.search(pattern, line)
if result:
pocsag_address = result[1].rjust(7, '0')
key = result[2]
pocsag_function = VALUES[int(key)]
local_time = strftime("%Y-%m-%d %H:%M:%S", gmtime())
full_address = pocsag_address + pocsag_function
if not listStartswith(CONFIG_IGNORE_ADDRESSES, full_address):
print(f"{local_time}: PROCESSING {pocsag_address} {pocsag_function}")
asyncio.run(fire_event('pocsag_receive', {
'address': pocsag_address,
'function': pocsag_function
}))
else:
print(f"{local_time}: IGNORING {pocsag_address} {pocsag_function}")