From 5d59fd29f4b0d14d56946b480c0fb4228b3b105f Mon Sep 17 00:00:00 2001 From: Petros Date: Wed, 17 Jul 2024 15:51:41 +0000 Subject: [PATCH] Added support for clean_session and clean_start in order to be able to receive retained messages --- .../connectors/mqtt/mqtt_connector.py | 25 ++++++++++++++++--- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/thingsboard_gateway/connectors/mqtt/mqtt_connector.py b/thingsboard_gateway/connectors/mqtt/mqtt_connector.py index 74a0b7a55..5189e1657 100644 --- a/thingsboard_gateway/connectors/mqtt/mqtt_connector.py +++ b/thingsboard_gateway/connectors/mqtt/mqtt_connector.py @@ -35,11 +35,13 @@ from thingsboard_gateway.tb_utility.tb_logger import init_logger try: - from paho.mqtt.client import Client + from paho.mqtt.client import Client, Properties + from paho.mqtt.packettypes import PacketTypes except ImportError: print("paho-mqtt library not found") TBUtility.install_package("tb-paho-mqtt-client") - from paho.mqtt.client import Client + from paho.mqtt.client import Client, Properties + from paho.mqtt.packettypes import PacketTypes from paho.mqtt.client import MQTTv31, MQTTv311, MQTTv5 @@ -182,9 +184,16 @@ def __init__(self, gateway, config, connector_type): # Set up external MQTT broker connection ----------------------------------------------------------------------- client_id = self.__broker.get("clientId", ''.join(random.choice(string.ascii_lowercase) for _ in range(23))) + self._cleanSession = self.__broker.get("cleanSession",True) + self._cleanStart = self.__broker.get("cleanStart",True) + self._sessionExpiryInterval = self.__broker.get("sessionExpiryInterval", 0) + self._mqtt_version = self.__broker.get('version', 5) try: - self._client = Client(client_id, protocol=MQTT_VERSIONS[self._mqtt_version]) + if self._mqtt_version != 5: + self._client = Client(client_id, clean_session=self._cleanSession, protocol=MQTT_VERSIONS[self._mqtt_version]) + else: + self._client = Client(client_id, protocol=MQTT_VERSIONS[self._mqtt_version]) except KeyError: self.__log.error('Unknown MQTT version. Starting up on version 5...') self._client = Client(client_id, protocol=MQTTv5) @@ -322,8 +331,16 @@ def run(self): def __connect(self): while not self._connected and not self.__stopped: try: - self._client.connect(self.__broker['host'], + if self._mqtt_version != 5: + self._client.connect(self.__broker['host'], self.__broker.get('port', 1883)) + else: + properties=Properties(PacketTypes.CONNECT) + properties.SessionExpiryInterval = self._sessionExpiryInterval + self._client.connect(self.__broker['host'], + self.__broker.get('port', 1883), + clean_start = self._cleanStart, + properties = properties) self._client.loop_start() if not self._connected: sleep(1)