-
Notifications
You must be signed in to change notification settings - Fork 7
/
mqtt_api.py
334 lines (293 loc) · 13.6 KB
/
mqtt_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
"""
This module provides an API to publish data from batcontrol to MQTT
for further processing and visualization.
The following topics are published:
- /status: online/offline status of batcontrol
- /evaluation_intervall: interval in seconds
- /last_evaluation: timestamp of last evaluation
- /mode: operational mode (-1 = charge from grid, 0 = avoid discharge, 10 = discharge allowed)
- /max_charging_from_grid_limit: charge limit in 0.1-1
- /max_charging_from_grid_limit_percent: charge limit in %
- /always_allow_discharge_limit: always discharge limit in 0.1-1
- /always_allow_discharge_limit_percent: always discharge limit in %
- /always_allow_discharge_limit_capacity: always discharge limit in Wh
- /charge_rate: charge rate in W
- /max_energy_capacity: maximum capacity of battery in Wh
- /stored_energy_capacity: energy stored in battery in Wh
- /stored_usable_energy_capacity: energy stored in battery in Wh and usable (min SOC considered)
- /reserved_energy_capacity: estimated energy reserved for discharge in Wh
- /SOC: state of charge in %
- /min_price_difference : minimum price difference in EUR
- /discharge_blocked : bool # Discharge is blocked by other sources
The following statistical arrays are published as JSON arrays:
- /FCST/production: forecasted production in W
- /FCST/consumption: forecasted consumption in W
- /FCST/prices: forecasted price in EUR
- /FCST/net_consumption: forecasted net consumption in W
Implemented Input-API:
- /mode/set: set mode (-1 = charge from grid, 0 = avoid discharge, 10 = discharge allowed)
- /charge_rate/set: set charge rate in W, sets mode to -1
- /always_allow_discharge_limit/set: set always discharge limit in 0.1-1
- /max_charging_from_grid_limit/set: set charge limit in 0-1
- /min_price_difference/set: set minimum price difference in EUR
The module uses the paho-mqtt library for MQTT communication and numpy for handling arrays.
"""
import time
import json
import logging
import paho.mqtt.client as mqtt
import numpy as np
logger = logging.getLogger('__main__')
logger.info('[MQTT] loading module ')
class MqttApi:
""" MQTT API to publish data from batcontrol to MQTT for further processing+visualization"""
SET_SUFFIX = '/set'
def __init__(self, config:dict):
self.config=config
self.base_topic = config['topic']
self.callbacks = {}
self.client = mqtt.Client()
if 'logger' in config and config['logger'] is True:
self.client.enable_logger(logger)
if 'username' in config and 'password' in config:
self.client.username_pw_set(config['username'], config['password'])
self.client.will_set(self.base_topic + '/status', 'offline', retain=True)
# TLS , not tested yet
if config['tls'] is True:
self.client.tls_set(
config['tls']['ca_certs'],
config['tls']['certfile'],
config['tls']['keyfile'],
cert_reqs=config['tls']['cert_reqs'],
tls_version=config['tls']['tls_version'],
ciphers=config['tls']['ciphers']
)
self.client.on_connect = self.on_connect
self.client.loop_start()
self.client.connect(config['broker'], config['port'], 60)
def on_connect(self, client, userdata, flags, rc):
""" Callback for MQTT connection to serve /status"""
logger.info('[MQTT] Connected with result code %s', rc)
# Make public, that we are running.
client.publish(self.base_topic + '/status', 'online', retain=True)
# Handle reconnect case
for topic in self.callbacks:
logger.debug('[MQTT] Subscribing topic: %s', topic)
for topic in self.callbacks:
client.subscribe(topic)
def wait_ready(self) -> bool:
""" Wait for MQTT connection to be ready"""
retry = 30
# Check if we are connected and wait for it
while self.client.is_connected() is False:
retry -= 1
if retry == 0:
logger.error('[MQTT] Could not connect to MQTT Broker')
return False
logger.info('[MQTT] Waiting for connection')
time.sleep(1)
return True
def _handle_message(self, client, userdata, message): # pylint: disable=unused-argument
""" Handle and dispatch incoming messages"""
logger.debug('[MQTT] Received message on %s', message.topic)
if message.topic in self.callbacks:
try:
self.callbacks[message.topic]['function'](
self.callbacks[message.topic]['convert'](message.payload)
)
except Exception as e:
logger.error('[MQTT] Error in callback %s : %s', message.topic, e)
else:
logger.warning('[MQTT] No callback registered for %s', message.topic)
def register_set_callback(self, topic:str, callback:callable, convert: callable) -> None:
""" Generic- register a callback for changing values inside batcontrol via
MQTT set topics
"""
topic_string = self.base_topic + "/" + topic + MqttApi.SET_SUFFIX
logger.debug('[MQTT] Registering callback for %s', topic_string)
# set api endpoints, generic subscription
self.callbacks[topic_string] = { 'function' : callback , 'convert' : convert }
self.client.subscribe(topic_string)
self.client.message_callback_add(topic_string , self._handle_message)
def publish_mode(self, mode:int) -> None:
""" Publish the mode (charge, lock, discharge) to MQTT
/mode
"""
if self.client.is_connected():
self.client.publish(self.base_topic + '/mode', mode)
def publish_charge_rate(self, rate:float) -> None:
""" Publish the forced charge rate in W to MQTT
/charge_rate
"""
if self.client.is_connected():
self.client.publish(self.base_topic + '/charge_rate', rate)
def publish_production(self, production:np.ndarray, timestamp:float) -> None:
""" Publish the production to MQTT
/FCST/production
The value is in W and based of solar forecast API.
The length is the same as used in internal arrays.
"""
if self.client.is_connected():
self.client.publish(
self.base_topic + '/FCST/production',
json.dumps(self._create_forecast(production, timestamp))
)
def _create_forecast(self, forecast:np.ndarray, timestamp:float) -> dict:
""" Create a forecast JSON object
from a numpy array and a timestamp
"""
# Take timestamp and reduce it to the first second of the hour
now = timestamp - (timestamp % 3600)
data_list = []
for h, value in enumerate(forecast):
# next hour after now
data_list.append(
{
'time_start': now + h * 3600,
'value': value,
'time_end': now - h + (h + 1) * 3600
}
)
data = { 'data' : data_list }
return data
def publish_consumption(self, consumption:np.ndarray, timestamp:float) -> None:
""" Publish the consumption to MQTT
/FCST/consumption
The value is in W and based of load profile and multiplied with
personal yearly consumption.
The length is the same as used in internal arrays.
"""
if self.client.is_connected():
self.client.publish(
self.base_topic + '/FCST/consumption',
json.dumps(self._create_forecast(consumption,timestamp))
)
def publish_prices(self, price:np.ndarray ,timestamp:float) -> None:
""" Publish the prices to MQTT
/FCST/prices
The length is the same as used in internal arrays.
"""
if self.client.is_connected():
self.client.publish(
self.base_topic + '/FCST/prices',
json.dumps(self._create_forecast(price,timestamp))
)
def publish_net_consumption(self, net_consumption:np.ndarray, timestamp:float) -> None:
""" Publish the net consumption in W to MQTT
/FCST/net_consumption
The length is the same as used in internal arrays.
This is the difference between production and consumption.
"""
if self.client.is_connected():
self.client.publish(
self.base_topic + '/FCST/net_consumption',
json.dumps(self._create_forecast(net_consumption,timestamp))
)
def publish_SOC(self, soc:float) -> None: # pylint: disable=invalid-name
""" Publish the state of charge in % to MQTT
/SOC
"""
if self.client.is_connected():
self.client.publish(self.base_topic + '/SOC', f'{int(soc):03}')
def publish_stored_energy_capacity(self, stored_energy:float) -> None:
""" Publish the stored energy capacity in Wh to MQTT
/stored_energy_capacity
"""
if self.client.is_connected():
self.client.publish(self.base_topic + '/stored_energy_capacity', f'{stored_energy:.1f}')
def publish_stored_usable_energy_capacity(self, stored_energy:float) -> None:
""" Publish the stored usable energy capacity in Wh to MQTT
/stored_usable_energy_capacity
"""
if self.client.is_connected():
self.client.publish(self.base_topic + '/stored_usable_energy_capacity', f'{stored_energy:.1f}')
def publish_reserved_energy_capacity(self, reserved_energy:float) -> None:
""" Publish the reserved energy capacity in Wh to MQTT
/reserved_energy_capacity
"""
if self.client.is_connected():
self.client.publish(
self.base_topic + '/reserved_energy_capacity',
f'{reserved_energy:.1f}'
)
def publish_always_allow_discharge_limit_capacity(self, discharge_limit:float) -> None:
""" Publish the always discharge limit in Wh to MQTT
/always_allow_discharge_limit_capacity
"""
if self.client.is_connected():
self.client.publish(
self.base_topic + '/always_allow_discharge_limit_capacity',
f'{discharge_limit:.1f}'
)
def publish_always_allow_discharge_limit(self, allow_discharge_limit:float) -> None:
""" Publish the always discharge limit to MQTT
/always_allow_discharge_limit as digit
/always_allow_discharge_limit_percent
"""
if self.client.is_connected():
self.client.publish(
self.base_topic + '/always_allow_discharge_limit',
f'{allow_discharge_limit:.2f}'
)
self.client.publish(
self.base_topic + '/always_allow_discharge_limit_percent',
f'{allow_discharge_limit * 100:.0f}'
)
def publish_max_charging_from_grid_limit(self, charge_limit:float) -> None:
""" Publish the maximum charging limit to MQTT
/max_charging_from_grid_limit_percent
/max_charging_from_grid_limit as digit.
"""
if self.client.is_connected():
self.client.publish(
self.base_topic + '/max_charging_from_grid_limit_percent',
f'{charge_limit * 100:.0f}'
)
self.client.publish(
self.base_topic + '/max_charging_from_grid_limit',
f'{charge_limit:.2f}'
)
def publish_min_price_difference(self, min_price_difference:float) -> None:
""" Publish the minimum price difference to MQTT found in config
/min_price_difference
"""
if self.client.is_connected():
self.client.publish(
self.base_topic + '/min_price_difference',
f'{min_price_difference:.3f}'
)
def publish_max_energy_capacity(self, max_capacity:float) -> None:
""" Publish the maximum energy capacity to MQTT
/max_energy_capacity
"""
if self.client.is_connected():
self.client.publish(
self.base_topic + '/max_energy_capacity',
f'{max_capacity:.1f}'
)
def publish_evaluation_intervall(self, intervall:int) -> None:
""" Publish the evaluation intervall to MQTT
/evaluation_intervall
"""
if self.client.is_connected():
self.client.publish(self.base_topic + '/evaluation_intervall', f'{intervall:.0f}')
def publish_last_evaluation_time(self, timestamp:float) -> None:
""" Publish the last evaluation timestamp to MQTT
This is the time when the last evaluation was started.
/last_evaluation
"""
if self.client.is_connected():
self.client.publish(self.base_topic + '/last_evaluation', f'{timestamp:.0f}')
def publish_discharge_blocked(self, discharge_blocked:bool) -> None:
""" Publish the discharge blocked status to MQTT
/discharge_blocked
"""
if self.client.is_connected():
self.client.publish(self.base_topic + '/discharge_blocked', str(discharge_blocked))
# For depended APIs like the Fronius Inverter classes, which is not directly batcontrol.
def generic_publish(self, topic:str, value:str) -> None:
""" Publish a generic value to a topic
For depended APIs like the Fronius Inverter classes, which is not directly batcontrol.
"""
if self.client.is_connected():
self.client.publish(self.base_topic + '/' + topic, value)