Skip to content

Commit

Permalink
Merge pull request #1465 from palexandridis/mqtt-buffering
Browse files Browse the repository at this point in the history
MQTT connector buffering
  • Loading branch information
imbeacon authored Jul 18, 2024
2 parents b684e46 + 5d59fd2 commit c9df1cb
Showing 1 changed file with 21 additions and 4 deletions.
25 changes: 21 additions & 4 deletions thingsboard_gateway/connectors/mqtt/mqtt_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@
from thingsboard_gateway.tb_utility.tb_logger import init_logger

try:
from paho.mqtt.client import Client
from paho.mqtt.client import Client, Properties
from paho.mqtt.packettypes import PacketTypes
except ImportError:
print("paho-mqtt library not found")
TBUtility.install_package("tb-paho-mqtt-client")
from paho.mqtt.client import Client
from paho.mqtt.client import Client, Properties
from paho.mqtt.packettypes import PacketTypes

from paho.mqtt.client import MQTTv31, MQTTv311, MQTTv5

Expand Down Expand Up @@ -182,9 +184,16 @@ def __init__(self, gateway, config, connector_type):
# Set up external MQTT broker connection -----------------------------------------------------------------------
client_id = self.__broker.get("clientId", ''.join(random.choice(string.ascii_lowercase) for _ in range(23)))

self._cleanSession = self.__broker.get("cleanSession",True)
self._cleanStart = self.__broker.get("cleanStart",True)
self._sessionExpiryInterval = self.__broker.get("sessionExpiryInterval", 0)

self._mqtt_version = self.__broker.get('version', 5)
try:
self._client = Client(client_id, protocol=MQTT_VERSIONS[self._mqtt_version])
if self._mqtt_version != 5:
self._client = Client(client_id, clean_session=self._cleanSession, protocol=MQTT_VERSIONS[self._mqtt_version])
else:
self._client = Client(client_id, protocol=MQTT_VERSIONS[self._mqtt_version])
except KeyError:
self.__log.error('Unknown MQTT version. Starting up on version 5...')
self._client = Client(client_id, protocol=MQTTv5)
Expand Down Expand Up @@ -322,8 +331,16 @@ def run(self):
def __connect(self):
while not self._connected and not self.__stopped:
try:
self._client.connect(self.__broker['host'],
if self._mqtt_version != 5:
self._client.connect(self.__broker['host'],
self.__broker.get('port', 1883))
else:
properties=Properties(PacketTypes.CONNECT)
properties.SessionExpiryInterval = self._sessionExpiryInterval
self._client.connect(self.__broker['host'],
self.__broker.get('port', 1883),
clean_start = self._cleanStart,
properties = properties)
self._client.loop_start()
if not self._connected:
sleep(1)
Expand Down

0 comments on commit c9df1cb

Please sign in to comment.