From 746c5133198ee5720bf248f6f12965baaaabb357 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Tue, 17 Sep 2024 14:07:08 +0300 Subject: [PATCH] Implemented strategies for reporting --- .../connectors/modbus/constants.py | 13 ++ .../connectors/modbus/modbus_connector.py | 203 +++++++++--------- .../connectors/modbus/slave.py | 142 ++++++++++-- 3 files changed, 241 insertions(+), 117 deletions(-) diff --git a/thingsboard_gateway/connectors/modbus/constants.py b/thingsboard_gateway/connectors/modbus/constants.py index 546df1285..dc12ada2a 100644 --- a/thingsboard_gateway/connectors/modbus/constants.py +++ b/thingsboard_gateway/connectors/modbus/constants.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from enum import Enum from thingsboard_gateway.gateway.constants import * @@ -71,6 +72,18 @@ INPUT_REGISTERS = "input_registers" DISCRETE_INPUTS = "discrete_inputs" +# Report strategy parameters +REPORT_STRATEGY_PARAMETER = "dataReportStrategy" +REPORT_PERIOD_PARAMETER = "reportPeriod" +class ReportStrategy(Enum): + ON_REPORT_PERIOD = "ON_REPORT_PERIOD" + ON_CHANGE = "ON_CHANGE" + ON_CHANGE_OR_REPORT_PERIOD = "ON_CHANGE_OR_REPORT_PERIOD" + +class RequestType(Enum): + POLL = "POLL" + SEND_DATA = "SEND_DATA" + # Default values TIMEOUT = 30 diff --git a/thingsboard_gateway/connectors/modbus/modbus_connector.py b/thingsboard_gateway/connectors/modbus/modbus_connector.py index 10169ffa1..4aed76a5c 100644 --- a/thingsboard_gateway/connectors/modbus/modbus_connector.py +++ b/thingsboard_gateway/connectors/modbus/modbus_connector.py @@ -13,7 +13,7 @@ # limitations under the License. from copy import deepcopy from threading import Thread, Lock -from time import sleep +from time import sleep, monotonic from time import time as timestamp from time import monotonic as time from queue import Queue @@ -260,8 +260,8 @@ def __load_slaves(self): 'callback': ModbusConnector.callback})) @classmethod - def callback(cls, slave): - cls.process_requests.put(slave) + def callback(cls, slave: Slave, request_type: RequestType, data=None): + cls.process_requests.put((slave, request_type, data)) @property def connector_type(self): @@ -317,24 +317,13 @@ def __convert_data(self, params): ATTRIBUTES_PARAMETER: [] } - if current_device_config.get('sendDataOnlyOnChange'): - self.statistics[STATISTIC_MESSAGE_RECEIVED_PARAMETER] += 1 - - for converted_data_section in CONVERTED_DATA_SECTIONS: - for current_section_dict in converted_data[converted_data_section]: - for key, value in current_section_dict.items(): - if device.config[LAST_PREFIX + converted_data_section].get(key) is None or \ - device.config[LAST_PREFIX + converted_data_section][key] != value: - device.config[LAST_PREFIX + converted_data_section][key] = value - to_send[converted_data_section].append({key: value}) - elif converted_data and current_device_config.get('sendDataOnlyOnChange') is None or \ - not current_device_config.get('sendDataOnlyOnChange'): - self.statistics[STATISTIC_MESSAGE_RECEIVED_PARAMETER] += 1 - - for converted_data_section in CONVERTED_DATA_SECTIONS: - device.config[LAST_PREFIX + converted_data_section] = converted_data[ - converted_data_section] - to_send[converted_data_section] = converted_data[converted_data_section] + # Check report strategy for each key in attributes and telemetry for device and send data only if it is necessary + for converted_data_section in CONVERTED_DATA_SECTIONS: + for current_section_dict in converted_data[converted_data_section]: + for key, value in current_section_dict.items(): + should_send = device.update_cached_data_and_check_is_data_should_be_send(converted_data_section, key, value) + if should_send: + to_send[converted_data_section].append({key: value}) if to_send.get(ATTRIBUTES_PARAMETER) or to_send.get(TELEMETRY_PARAMETER): return to_send @@ -383,88 +372,102 @@ def get_id(self): def __process_slaves(self): while not self.__stopped: if not self.__stopped and not ModbusConnector.process_requests.empty(): - device: Slave = ModbusConnector.process_requests.get() - device_connected = False - device_disconnected = False + (device, request_type, data) = ModbusConnector.process_requests.get() + if request_type == RequestType.POLL: + self.__poll_device(device) + elif request_type == RequestType.SEND_DATA: + self.__send_data_from_device_by_strategy(device, data) + sleep(.001) - self.__log.debug("Checking %s", device) - if device.config.get(TYPE_PARAMETER).lower() == 'serial': - self.lock.acquire() + def __send_data_from_device_by_strategy(self, device, data): + self.__gateway.send_to_storage(self.get_name(), self.get_id(), data) + self.statistics[STATISTIC_MESSAGE_SENT_PARAMETER] += 1 - device_responses = {'timeseries': {}, 'attributes': {}} - current_device_config = {} - try: - for config_section in device_responses: - if device.config.get(config_section) is not None and len(device.config.get(config_section)): - current_device_config = device.config - - if self.__connect_to_current_master(device): - if not device_connected and device.config['master'].is_socket_open(): - device_connected = True - self.__gateway.add_device(device.device_name, {CONNECTOR_PARAMETER: self}, - device_type=device.config.get(DEVICE_TYPE_PARAMETER)) - else: - device_disconnected = True - else: - if not device_disconnected: - device_disconnected = True - self.__gateway.del_device(device.device_name) - continue - - if (not device.config['master'].is_socket_open() - or not len(current_device_config[config_section])): - if not device.config['master'].is_socket_open(): - error = 'Socket is closed, connection is lost, for device %s with config %s' % ( - device.device_name, current_device_config) - else: - error = 'Config is invalid or empty for device %s, config %s' % ( - device.device_name, current_device_config) - self.__log.error(error) - self.__log.debug("Device %s is not connected, data will not be processed", - device.device_name) - continue - - # Reading data from device - for interested_data in range(len(current_device_config[config_section])): - current_data = deepcopy(current_device_config[config_section][interested_data]) - current_data[DEVICE_NAME_PARAMETER] = device.device_name - input_data = self.__function_to_device(device, current_data) - - # due to issue #1056 - if isinstance(input_data, ModbusIOException) or isinstance(input_data, ExceptionResponse): - device.config.pop('master', None) - self.__gateway.del_device(device.device_name) - self.__connect_to_current_master(device) - break - - device_responses[config_section][current_data[TAG_PARAMETER]] = { - "data_sent": current_data, - "input_data": input_data - } - - self.__log.debug("Checking %s for device %s", config_section, device) - self.__log.debug('Device response: ', device_responses) - - if device_responses.get('timeseries') or device_responses.get('attributes'): - self._convert_msg_queue.put((self.__convert_data, (device, current_device_config, { - **current_device_config, - BYTE_ORDER_PARAMETER: current_device_config.get(BYTE_ORDER_PARAMETER, device.byte_order), - WORD_ORDER_PARAMETER: current_device_config.get(WORD_ORDER_PARAMETER, device.word_order) - }, device_responses))) - - except ConnectionException: - self.__gateway.del_device(device.device_name) - sleep(5) - self.__log.error("Connection lost! Reconnecting...") - except Exception as e: - self.__gateway.del_device(device.device_name) - self.__log.exception(e) - - # Release mutex if "serial" type only - if device.config.get(TYPE_PARAMETER) == 'serial': - self.lock.release() - sleep(.001) + def __poll_device(self, device): + device_connected = device.last_connect_time != 0 and monotonic() - device.last_connect_time < 10 + device_disconnected = False + + self.__log.debug("Checking %s", device) + if device.config.get(TYPE_PARAMETER).lower() == 'serial': + self.lock.acquire() + + device_responses = {'timeseries': {}, 'attributes': {}} + current_device_config = {} + try: + for config_section in device_responses: + if device.config.get(config_section) is not None and len(device.config.get(config_section)): + current_device_config = device.config + connected_to_current_master = self.__connect_to_current_master(device) + if connected_to_current_master: + is_socket_open = device.config['master'].is_socket_open() + if not device_connected and is_socket_open: + device_connected = True + device.last_connect_time = monotonic() + self.__gateway.add_device(device.device_name, {CONNECTOR_PARAMETER: self}, + device_type=device.config.get(DEVICE_TYPE_PARAMETER)) + elif not is_socket_open: + device.last_connect_time = 0 + device_disconnected = True + else: + if not device_disconnected: + device.last_connect_time = 0 + device_disconnected = True + self.__gateway.del_device(device.device_name) + continue + + if (not device.config['master'].is_socket_open() + or not len(current_device_config[config_section])): + if not device.config['master'].is_socket_open(): + error = 'Socket is closed, connection is lost, for device %s with config %s' % ( + device.device_name, current_device_config) + else: + error = 'Config is invalid or empty for device %s, config %s' % ( + device.device_name, current_device_config) + self.__log.error(error) + self.__log.debug("Device %s is not connected, data will not be processed", + device.device_name) + continue + + # Reading data from device + for interested_data in range(len(current_device_config[config_section])): + current_data = deepcopy(current_device_config[config_section][interested_data]) + current_data[DEVICE_NAME_PARAMETER] = device.device_name + input_data = self.__function_to_device(device, current_data) + + # due to issue #1056 + if isinstance(input_data, ModbusIOException) or isinstance(input_data, ExceptionResponse): + device.config.pop('master', None) + self.__gateway.del_device(device.device_name) + self.__connect_to_current_master(device) + break + + device_responses[config_section][current_data[TAG_PARAMETER]] = { + "data_sent": current_data, + "input_data": input_data + } + + self.__log.debug("Checking %s for device %s", config_section, device) + self.__log.debug('Device response: ', device_responses) + + if device_responses.get('timeseries') or device_responses.get('attributes'): + self._convert_msg_queue.put((self.__convert_data, (device, current_device_config, { + **current_device_config, + BYTE_ORDER_PARAMETER: current_device_config.get(BYTE_ORDER_PARAMETER, device.byte_order), + WORD_ORDER_PARAMETER: current_device_config.get(WORD_ORDER_PARAMETER, device.word_order) + }, device_responses))) + + except ConnectionException: + self.__gateway.del_device(device.device_name) + sleep(5) + self.__log.error("Connection lost! Reconnecting...") + except Exception as e: + self.__gateway.del_device(device.device_name) + self.__log.exception(e) + finally: + # Release mutex if "serial" type only + if device.config.get(TYPE_PARAMETER) == 'serial': + self.lock.release() def __connect_to_current_master(self, device: Slave=None): connect_attempt_count = 5 diff --git a/thingsboard_gateway/connectors/modbus/slave.py b/thingsboard_gateway/connectors/modbus/slave.py index 3d31cf41b..8e4a2a81b 100644 --- a/thingsboard_gateway/connectors/modbus/slave.py +++ b/thingsboard_gateway/connectors/modbus/slave.py @@ -13,7 +13,7 @@ # limitations under the License. from threading import Thread -from time import time, sleep +from time import sleep, monotonic from pymodbus.constants import Defaults @@ -22,6 +22,10 @@ from thingsboard_gateway.connectors.modbus.bytes_modbus_downlink_converter import BytesModbusDownlinkConverter from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader +DEFAULT_REPORT_STRATEGY = { + 'type': ReportStrategy.ON_REPORT_PERIOD, + REPORT_PERIOD_PARAMETER: 60.0 +} class Slave(Thread): def __init__(self, **kwargs): @@ -31,6 +35,8 @@ def __init__(self, **kwargs): self._log = kwargs['logger'] self.poll_period = kwargs['pollPeriod'] / 1000 + self.last_connect_time = 0 + self.byte_order = kwargs.get('byteOrder', 'LITTLE') self.word_order = kwargs.get('wordOrder', 'LITTLE') self.config = { @@ -50,7 +56,6 @@ def __init__(self, **kwargs): 'retries': kwargs.get('retries', 3), 'connection_attempt': 0, 'last_connection_attempt_time': 0, - 'sendDataOnlyOnChange': kwargs.get('sendDataOnlyOnChange', False), 'waitAfterFailedAttemptsMs': kwargs.get('waitAfterFailedAttemptsMs', 0), 'connectAttemptTimeMs': kwargs.get('connectAttemptTimeMs', 0), 'retry_on_empty': kwargs.get('retryOnEmpty', False), @@ -60,18 +65,39 @@ def __init__(self, **kwargs): 'attributes': kwargs.get('attributes', []), 'timeseries': kwargs.get('timeseries', []), 'attributeUpdates': kwargs.get('attributeUpdates', []), - 'rpc': kwargs.get('rpc', []), - 'last_attributes': {}, - 'last_telemetry': {} + 'rpc': kwargs.get('rpc', []) } - self.__load_converters(kwargs['connector'], kwargs['gateway']) + self.__basic_device_report_strategy_config = self.__get_report_strategy_from_config(kwargs) + + self.__load_converters(kwargs['connector']) self.callback = kwargs['callback'] - self.last_polled_time = None + self.__last_polled_time = None + self.__last_checked_time = monotonic() self.daemon = True self.stop = False + # Cache for devices data for report strategy + # Two possible for keys options, depending on type of slave: + # 1. For tcp, tls, udp slaves: + # (host, port, unit_id, datatype, function_code, address, bit - Optional, byteOrder - Optional, wordOrder - Optional) + # 2. For serial slaves: + # (port, unit_id, datatype, function_code, address, bit - Optional, byteOrder - Optional, wordOrder - Optional) + # Value + # {last_telemetry:{key: {strategy, reportPeriodMs(Depends on strategy), next_send_monotonic_ms, previous_value, values_for_calculation(In next release), processing_function(In next release)}}, + # last_attributes:{...}} + + self.cached_data = { + LAST_PREFIX + TELEMETRY_PARAMETER: {}, + LAST_PREFIX + ATTRIBUTES_PARAMETER: {} + } + + # Data to send periodically, key is monotonic of time when data should be sent + + self.__data_to_send_periodically = {} + + self.__init_cache_for_data() self.name = "Modbus slave processor for unit " + str(self.config['unitId']) + " on host " + str( self.config['host']) + ":" + str(self.config['port']) @@ -79,13 +105,20 @@ def __init__(self, **kwargs): self.start() def timer(self): - self.callback(self) - self.last_polled_time = time() + self.callback(self, RequestType.POLL) + self.__last_polled_time = monotonic() while not self.stop: - if time() - self.last_polled_time >= self.poll_period: - self.callback(self) - self.last_polled_time = time() + try: + current_monotonic = monotonic() + if current_monotonic - self.__last_polled_time >= self.poll_period: + self.callback(self, RequestType.POLL) + self.__last_polled_time = current_monotonic + if current_monotonic - self.__last_checked_time >= 1: + self.__check_data_to_send_periodically(current_monotonic) + self.__last_checked_time = current_monotonic + except Exception as e: + self._log.exception("Error in slave timer: %s", e) sleep(0.001) @@ -98,7 +131,86 @@ def close(self): def get_name(self): return self.device_name - def __load_converters(self, connector, gateway): + def __check_data_to_send_periodically(self, current_monotonic): + telemetry_data_to_send = [] + attributes_data_to_send = [] + + self.__check_data_to_send_periodically_entry(TELEMETRY_PARAMETER, telemetry_data_to_send, current_monotonic) + self.__check_data_to_send_periodically_entry(ATTRIBUTES_PARAMETER, attributes_data_to_send, current_monotonic) + + if telemetry_data_to_send or attributes_data_to_send: + self.callback(self, RequestType.SEND_DATA, {DEVICE_NAME_PARAMETER: self.device_name, + DEVICE_TYPE_PARAMETER: self.config["deviceType"], + TELEMETRY_PARAMETER: telemetry_data_to_send, + ATTRIBUTES_PARAMETER: attributes_data_to_send}) + + def __check_data_to_send_periodically_entry(self, section, data_array: list, current_monotonic: float): + for key, report_data_config in self.cached_data[LAST_PREFIX + section].items(): + if (report_data_config['type'] in (ReportStrategy.ON_CHANGE_OR_REPORT_PERIOD, ReportStrategy.ON_REPORT_PERIOD) + and report_data_config['previous_value'] is not None + and report_data_config['next_send_monotonic_ms'] <= current_monotonic): + data_array.append({key: report_data_config['previous_value']}) + report_data_config['next_send_monotonic_ms'] = current_monotonic + report_data_config[REPORT_PERIOD_PARAMETER] + + def __init_cache_for_data(self): + for index, key_config in enumerate(self.config[TIMESERIES_PARAMETER]): + key = key_config.get('key', key_config.get('tag')) + self.cached_data[LAST_PREFIX + TELEMETRY_PARAMETER][key] = { + **self.__get_report_strategy_from_config(key_config, self.__basic_device_report_strategy_config), + 'next_send_monotonic_ms': 0, + 'previous_value': None + } + for index, key_config in enumerate(self.config[ATTRIBUTES_PARAMETER]): + key = key_config.get('key', key_config.get('tag')) + self.cached_data[LAST_PREFIX + ATTRIBUTES_PARAMETER][key] = { + **self.__get_report_strategy_from_config(key_config, self.__basic_device_report_strategy_config), + 'next_send_monotonic_ms': 0, + 'previous_value': None + } + + def update_cached_data_and_check_is_data_should_be_send(self, configuration_section, key, value) -> bool: + section_with_prefix = LAST_PREFIX + configuration_section + previous_value = self.cached_data.get(section_with_prefix, {}).get(key, {}).get('previous_value') + if previous_value is None: + self.cached_data[section_with_prefix][key]['previous_value'] = value + if self.cached_data[section_with_prefix][key]['type'] in (ReportStrategy.ON_CHANGE_OR_REPORT_PERIOD, ReportStrategy.ON_REPORT_PERIOD): + self.cached_data[section_with_prefix][key]['next_send_monotonic_ms'] = ( + monotonic() + self.cached_data[section_with_prefix][key][REPORT_PERIOD_PARAMETER]) + return True + if previous_value != value: + self.cached_data[section_with_prefix][key]['previous_value'] = value + if self.cached_data[section_with_prefix][key]['type'] == ReportStrategy.ON_CHANGE: + return True + elif self.cached_data[section_with_prefix][key]['type'] == ReportStrategy.ON_CHANGE_OR_REPORT_PERIOD: + self.cached_data[section_with_prefix][key]['next_send_monotonic_ms'] = ( + monotonic() + self.cached_data[section_with_prefix][key][REPORT_PERIOD_PARAMETER]) + return True + + return False + + def __get_report_strategy_from_config(self, config: dict, default_report_strategy_config=None): + if default_report_strategy_config is None: + default_report_strategy_config = DEFAULT_REPORT_STRATEGY + report_strategy_config = default_report_strategy_config + if not config: + return report_strategy_config + if config.get(SEND_DATA_ONLY_ON_CHANGE_PARAMETER) is not None: + report_strategy_config = { + 'type': ReportStrategy.ON_CHANGE if config.get(SEND_DATA_ONLY_ON_CHANGE_PARAMETER) else ReportStrategy.ON_REPORT_PERIOD, + REPORT_PERIOD_PARAMETER: config.get(report_strategy_config.get(REPORT_PERIOD_PARAMETER, config.get("pollPeriod", 60000)), 60000) / 1000 + } + if config.get(REPORT_STRATEGY_PARAMETER) is not None: + try: + report_strategy_config = { + 'type': ReportStrategy[config[REPORT_STRATEGY_PARAMETER].get('type', ReportStrategy.ON_REPORT_PERIOD.name).upper()], + REPORT_PERIOD_PARAMETER: config[REPORT_STRATEGY_PARAMETER].get(REPORT_PERIOD_PARAMETER, 60000) / 1000 + } + except Exception: + self._log.error("Report strategy config is not valid. Using default report strategy for config: %r", config) + report_strategy_config = default_report_strategy_config + return report_strategy_config + + def __load_converters(self, connector): try: if self.config.get(UPLINK_PREFIX + CONVERTER_PARAMETER) is not None: converter = TBModuleLoader.import_module(connector.connector_type, @@ -114,10 +226,6 @@ def __load_converters(self, connector, gateway): self.config[UPLINK_PREFIX + CONVERTER_PARAMETER] = converter self.config[DOWNLINK_PREFIX + CONVERTER_PARAMETER] = downlink_converter - - if self.device_name not in gateway.get_devices(): - gateway.add_device(self.device_name, {CONNECTOR_PARAMETER: connector}, - device_type=self.config.get(DEVICE_TYPE_PARAMETER)) except Exception as e: self._log.exception(e)