This repository has been archived by the owner on May 18, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 22
/
mqtt_gobject_bridge.py
100 lines (87 loc) · 2.97 KB
/
mqtt_gobject_bridge.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import errno
import logging
import os
import paho.mqtt.client
import socket
import ssl
import sys
import traceback
from gi.repository import GLib
AppDir = os.path.dirname(os.path.realpath(__file__))
sys.path.insert(1, os.path.join(AppDir, 'ext', 'velib_python'))
from ve_utils import exit_on_error
class MqttGObjectBridge(object):
def __init__(self, mqtt_server=None, client_id="", ca_cert=None, user=None, passwd=None, debug=False):
self._ca_cert = ca_cert
self._mqtt_user = user
self._mqtt_passwd = passwd
self._mqtt_server = mqtt_server or '127.0.0.1'
self._client = paho.mqtt.client.Client(client_id)
self._client.on_connect = self._on_connect
self._client.on_message = self._on_message
self._client.on_disconnect = self._on_disconnect
if debug:
self._client.on_log = self._on_log
self._socket_watch = None
self._socket_timer = None
if self._init_mqtt():
GLib.timeout_add_seconds(5, exit_on_error, self._init_mqtt)
def _init_mqtt(self):
try:
logging.info('[Init] Connecting to local broker')
if self._mqtt_user is not None and self._mqtt_passwd is not None:
self._client.username_pw_set(self._mqtt_user, self._mqtt_passwd)
if self._ca_cert is None:
self._client.connect(self._mqtt_server, 1883, 60)
else:
self._client.tls_set(self._ca_cert, cert_reqs=ssl.CERT_REQUIRED)
self._client.connect(self._mqtt_server, 8883, 60)
self._init_socket_handlers()
return False
except socket.error as e:
if e.errno == errno.ECONNREFUSED:
return True
raise
def _init_socket_handlers(self):
if self._socket_watch is not None:
GLib.source_remove(self._socket_watch)
self._socket_watch = GLib.io_add_watch(self._client.socket().fileno(), GLib.IO_IN,
self._on_socket_in)
if self._socket_timer is None:
self._socket_timer = GLib.timeout_add_seconds(1, exit_on_error, self._on_socket_timer)
def _on_log(self, client, userdata, level, log):
print(log)
def _on_socket_in(self, src, condition):
exit_on_error(self._client.loop_read)
return True
def _on_socket_timer(self):
self._client.loop_misc()
while self._client.want_write():
if self._client.loop_write(10) != paho.mqtt.client.MQTT_ERR_SUCCESS:
break
return True
def _on_connect(self, client, userdata, dict, rc):
pass
def _on_message(self, client, userdata, msg):
pass
def _on_disconnect(self, client, userdata, rc):
logging.error('[Disconnected] Lost connection to broker')
if self._socket_watch is not None:
GLib.source_remove(self._socket_watch)
self._socket_watch = None
logging.info('[Disconnected] Set timer')
GLib.timeout_add(5000, exit_on_error, self._reconnect)
def _reconnect(self):
try:
logging.info('[Reconnect] start')
self._client.reconnect()
self._init_socket_handlers()
logging.info('[Reconnect] success')
return False
except socket.error as e:
logging.error('[Reconnect] failed' + traceback.format_exc())
if e.errno == errno.ECONNREFUSED:
return True
raise