-
Notifications
You must be signed in to change notification settings - Fork 0
/
heatpump-logger.py
executable file
·146 lines (123 loc) · 4.12 KB
/
heatpump-logger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#!/usr/bin/python
import websocket
import json
from time import sleep
import xml.etree.ElementTree as ET
from datetime import datetime
import urllib.request
import sys
import os
import configparser
config = configparser.ConfigParser()
config.read('./heatpump-logger.ini')
for option in ['heatpumpIP', 'domoticzIP']:
if not config.has_option('network', option):
raise Exception(f'missing option {option} in .ini file')
heatpumpIP = config.get('network', 'heatpumpIP')
domoticzIP = config.get('network', 'domoticzIP')
interval = config.getint('logging', 'interval', fallback=5)
logfile = config.get('logging', 'logfile', fallback="./heatpump-logger.log")
test = config.getboolean('development', 'test', fallback=False)
if len(heatpumpIP) == 0 or len(domoticzIP) == 0:
raise Exception('Provide IP addresses in .ini file')
def ft(v): return v[0:-2] # strip ' °C'
def fp(v): return v[0:-3] # strip ' KW'
def fe(v): return v[0:-4] # strip ' KWh'
def fs(v): return v or 'Uit'
def times1000(v): return str(float(v)*1000)
pages = {
# page_id: [[
# 1. field number on page
# 2. domo deviceID
# 3. description on page
# 4. formatter function
# 5. conversion for domoticz push] ....]
'temp': [
[0, 14, "Aanvoer", ft, None],
[1, 15, "Retour", ft, None],
[2, 16, "Retour berekend", ft, None],
[4, 17, "Buitentemperatuur", ft, None],
[5, 18, "Tapwater gemeten", ft, None],
[7, 19, "Bron-in", ft, None],
[8, 20, "Bron-uit", ft, None],
],
'status': [
[5, 12, "Bedrijfstoestand", fs, None],
[6, 23, "Vermogen", fp, times1000]
],
'energy': [
[0, None, "Verwarmen", fe, times1000],
[1, None, "Warmwater", fe, times1000],
]
}
def push(domoID, value):
if test:
print(f"pushing {value} to device {domoID}")
else:
try:
url = (f"http://{domoticzIP}/json.htm?type=command¶m=udevice&"
f"idx={domoID}&nvalue=0&svalue={value}")
result = urllib.request.urlopen(url, timeout=1)
# print(result.getcode())
except:
print(sys.exc_info())
def parseNavigation(xml):
root = ET.fromstring(xml)
items = root[0].findall('item')
item_temp = items[0]
item_status = items[7]
item_energy = items[8]
if item_temp[0].text != "Temperaturen" or \
item_status[0].text != "Installatiestatus" or \
item_energy[0].text != "Energie":
raise Exception("Menuitems changed?")
return (item_temp.attrib['id'],
item_status.attrib['id'],
item_energy.attrib['id'])
def parse_items(root, page):
items = root.findall('item')
values = []
for (n, domoID, label, fmt, fpush) in page:
item = items[n]
text = item[0].text
if text != label:
raise Exception(
f"found {text} instead of {label} on position {n}")
value = fmt(item[1].text) if fmt else item[1].text
if domoID:
push(domoID, fpush(value) if fpush else value)
values.append(value)
return values
def parse_page(menu_id, page):
ws.send(f'GET;{menu_id}')
root = ET.fromstring(ws.recv())
return parse_items(root, page)
def log(now, values):
s = f'{str(now)[0:-7]} {" ".join(values)}\n'
if test:
print(s, end='')
elif logfile:
with open(logfile, "a") as f:
f.write(s)
ws = websocket.WebSocket()
ws.connect(f"ws://{heatpumpIP}:8214", subprotocols=["Lux_WS"])
ws.send(f'LOGIN;') # we don't need a password to obtain the data
result = ws.recv()
(menu_temp_id, menu_status_id, menu_energy_id) = parseNavigation(result)
now = datetime.now()
last_print = -1
while True:
if test or (now.minute % 5 == 0 and last_print != now.minute):
doprint = True
last_print = now.minute
else:
doprint = False
values = parse_page(menu_status_id, pages['status'])
values += parse_page(menu_temp_id, pages['temp'])
if doprint:
values += parse_page(menu_energy_id, pages['energy'])
log(now, values)
if test:
break
sleep(interval)
now = datetime.now()