From 7d433cd4e5b81ded4012ed83dc59934273196d83 Mon Sep 17 00:00:00 2001 From: samson0v Date: Wed, 14 Aug 2024 15:28:55 +0300 Subject: [PATCH 01/33] Refactored OPC-UA connector --- .../connectors/opcua/device.py | 1 + .../connectors/opcua/opcua_connector.py | 122 ++-- .../opcua/opcua_uplink_converter.py | 56 +- .../opcua_asyncio/opcua_connector.py | 600 ------------------ .../gateway/tb_gateway_service.py | 2 +- 5 files changed, 83 insertions(+), 698 deletions(-) delete mode 100644 thingsboard_gateway/connectors/opcua_asyncio/opcua_connector.py diff --git a/thingsboard_gateway/connectors/opcua/device.py b/thingsboard_gateway/connectors/opcua/device.py index b39267c3f..88c65dcfd 100644 --- a/thingsboard_gateway/connectors/opcua/device.py +++ b/thingsboard_gateway/connectors/opcua/device.py @@ -27,6 +27,7 @@ def __init__(self, path, name, config, converter, converter_for_sub, logger): 'timeseries': [], 'attributes': [] } + self.nodes = [] self.load_values() diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index 67dbdd2c8..45d10d329 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -117,6 +117,7 @@ def close(self): self.__connected = False self.__log.info("Stopping OPC-UA Connector") + asyncio.run_coroutine_threadsafe(self.__disconnect(), self.__loop) asyncio.run_coroutine_threadsafe(self.__cancel_all_tasks(), self.__loop) start_time = monotonic() @@ -134,6 +135,13 @@ async def __cancel_all_tasks(self): for task in asyncio.all_tasks(self.__loop): task.cancel() + async def __disconnect(self): + try: + await self.__client.disconnect() + self.__log.info('%s has been disconnected from OPC-UA Server.', self.get_name()) + except Exception as e: + self.__log.error('%s could not be disconnected from OPC-UA Server: %s', self.name, e) + async def __reset_node(self, node): node['valid'] = False if node.get('sub_on', False): @@ -213,9 +221,11 @@ async def start_client(self): self.__log.error("Error on loading type definitions:\n %s", e) scan_period = self.__server_conf.get('scanPeriodInMillis', 5000) / 1000 + + await self.__scan_device_nodes() + while not self.__stopped: if monotonic() - self.__last_poll >= scan_period: - await self.__scan_device_nodes() await self.__poll_nodes() self.__last_poll = monotonic() @@ -332,40 +342,6 @@ async def find_node_name_space_index(self, path): unresolved = path[resolved_level:] return await self.__find_nodes(unresolved, current_parent_node=parent_node, nodes=resolved) - async def __scan_device_nodes(self): - existing_devices = list(map(lambda dev: dev.name, self.__device_nodes)) - - scanned_devices = [] - for device in self.__config.get('mapping', []): - nodes = await self.find_nodes(device['deviceNodePattern']) - self.__log.debug('Found devices: %s', nodes) - - device_names = await self._get_device_info_by_pattern( - device.get('deviceInfo', {}).get('deviceNameExpression')) - - for device_name in device_names: - scanned_devices.append(device_name) - if device_name not in existing_devices: - for node in nodes: - converter = self.__load_converter(device) - device_type = await self._get_device_info_by_pattern( - device.get('deviceInfo', {}).get('deviceProfileExpression', 'default'), - get_first=True) - device_config = {**device, 'device_name': device_name, 'device_type': device_type} - self.__device_nodes.append( - Device(path=node, name=device_name, config=device_config, - converter=converter(device_config, self.__log), - converter_for_sub=converter(device_config, self.__log) if not self.__server_conf.get( - 'disableSubscriptions', - False) else None, logger=self.__log)) - self.__log.info('Added device node: %s', device_name) - - for device_name in existing_devices: - if device_name not in scanned_devices: - await self.__reset_nodes(device_name) - - self.__log.debug('Device nodes: %s', self.__device_nodes) - async def _get_device_info_by_pattern(self, pattern, get_first=False): result = [] @@ -411,9 +387,48 @@ def __convert_sub_data(self): self.__data_to_send.put(*converter_data) device.converter_for_sub.clear_data() else: - sleep(.2) + sleep(.05) - async def __poll_nodes(self): + async def __scan_device_nodes(self): + await self._create_new_devices() + await self._load_devices_nodes() + + async def _create_new_devices(self): + existing_devices = list(map(lambda dev: dev.name, self.__device_nodes)) + + scanned_devices = [] + for device in self.__config.get('mapping', []): + nodes = await self.find_nodes(device['deviceNodePattern']) + self.__log.debug('Found devices: %s', nodes) + + device_names = await self._get_device_info_by_pattern( + device.get('deviceInfo', {}).get('deviceNameExpression')) + + for device_name in device_names: + scanned_devices.append(device_name) + if device_name not in existing_devices: + for node in nodes: + converter = self.__load_converter(device) + device_type = await self._get_device_info_by_pattern( + device.get('deviceInfo', {}).get('deviceProfileExpression', 'default'), + get_first=True) + device_config = {**device, 'device_name': device_name, 'device_type': device_type} + self.__device_nodes.append( + Device(path=node, name=device_name, config=device_config, + converter=converter(device_config, self.__log), + converter_for_sub=converter(device_config, self.__log) if not self.__server_conf.get( + 'disableSubscriptions', + False) else None, logger=self.__log)) + + self.__log.info('Added device node: %s', device_name) + + for device_name in existing_devices: + if device_name not in scanned_devices: + await self.__reset_nodes(device_name) + + self.__log.debug('Device nodes: %s', self.__device_nodes) + + async def _load_devices_nodes(self): for device in self.__device_nodes: for section in ('attributes', 'timeseries'): for node in device.values.get(section, []): @@ -426,24 +441,25 @@ async def __poll_nodes(self): qualified_path = await self.find_node_name_space_index(path) if len(qualified_path) == 0: if node.get('valid', True): - self.__log.warning('Node not found; device: %s, key: %s, path: %s', device.name, + self.__log.warning('Node not found; device: %s, key: %s, path: %s', + device.name, node['key'], node['path']) await self.__reset_node(node) continue elif len(qualified_path) > 1: self.__log.warning( - 'Multiple matching nodes found; device: %s, key: %s, path: %s; %s', device.name, + 'Multiple matching nodes found; device: %s, key: %s, path: %s; %s', + device.name, node['key'], node['path'], qualified_path) node['qualified_path'] = qualified_path[0] path = qualified_path[0] var = await self.__client.nodes.root.get_child(path) + device.nodes.append({'var': var, 'key': node['key'], 'section': section}) + if (node.get('valid') is None or (node.get('valid') and self.__server_conf.get('disableSubscriptions', False))): - value = await var.read_data_value() - device.converter.convert(config={'section': section, 'key': node['key']}, val=value) - if (not self.__server_conf.get('disableSubscriptions', False) and not node.get('sub_on', False) and not self.__stopped): @@ -454,14 +470,16 @@ async def __poll_nodes(self): node['subscription'] = handle node['sub_on'] = True node['id'] = var.nodeid.to_string() - self.__log.info("Subscribed on data change; device: %s, key: %s, path: %s", device.name, + self.__log.info("Subscribed on data change; device: %s, key: %s, path: %s", + device.name, node['key'], node['id']) node['valid'] = True except ConnectionError: raise except (BadNodeIdUnknown, BadConnectionClosed, BadInvalidState, BadAttributeIdInvalid, - BadCommunicationError, BadOutOfService, BadNoMatch, BadUnexpectedError, UaStatusCodeErrors, + BadCommunicationError, BadOutOfService, BadNoMatch, BadUnexpectedError, + UaStatusCodeErrors, BadWaitingForInitialData): if node.get('valid', True): self.__log.warning('Node not found (2); device: %s, key: %s, path: %s', device.name, @@ -476,23 +494,33 @@ async def __poll_nodes(self): self.__log.exception(e) await self.__reset_node(node) + async def __poll_nodes(self): + values = await self.__client.read_values([node_config['var'] for device in self.__device_nodes for node_config in device.nodes]) + + converted_nodes_count = 0 + for device in self.__device_nodes: + nodes_count = len(device.nodes) + device_values = values[converted_nodes_count:converted_nodes_count + nodes_count] + converted_nodes_count += nodes_count + device.converter.convert(device.nodes, device_values) converter_data = device.converter.get_data() if converter_data: self.__data_to_send.put(*converter_data) device.converter.clear_data() + self.__log.debug('Converted nodes values count: %s', converted_nodes_count) + def __send_data(self): while not self.__stopped: if not self.__data_to_send.empty(): data = self.__data_to_send.get() self.statistics['MessagesReceived'] = self.statistics['MessagesReceived'] + 1 - self.__log.debug(data) self.__gateway.send_to_storage(self.get_name(), self.get_id(), data) self.statistics['MessagesSent'] = self.statistics['MessagesSent'] + 1 - self.__log.debug('Data to ThingsBoard %s', data) + self.__log.debug('Count data packs to ThingsBoard: %s', self.statistics['MessagesSent']) else: - sleep(.2) + sleep(.05) async def get_shared_attr_node_id(self, path, result={}): try: diff --git a/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py b/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py index facd1aa69..61b96eb7d 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py +++ b/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py @@ -59,54 +59,10 @@ def get_data(self): return None - def convert(self, config, val): - if not val: - return + def convert(self, configs, values): + for (val, config) in zip(values, configs): + if not val: + continue - data = val.Value.Value - - if data is not None: - if isinstance(data, LocalizedText): - data = data.Text - elif val.Value.VariantType == VariantType.ExtensionObject: - data = str(data) - elif val.Value.VariantType == VariantType.DateTime: - if data.tzinfo is None: - data = data.replace(tzinfo=timezone.utc) - data = data.isoformat() - elif val.Value.VariantType == VariantType.StatusCode: - data = data.name - elif (val.Value.VariantType == VariantType.QualifiedName - or val.Value.VariantType == VariantType.NodeId - or val.Value.VariantType == VariantType.ExpandedNodeId): - data = data.to_string() - elif val.Value.VariantType == VariantType.ByteString: - data = data.hex() - elif val.Value.VariantType == VariantType.XmlElement: - data = data.decode('utf-8') - elif val.Value.VariantType == VariantType.Guid: - data = str(data) - elif val.Value.VariantType == VariantType.DiagnosticInfo: - data = data.to_string() - elif val.Value.VariantType == VariantType.Null: - data = None - - - - if config['section'] == 'timeseries': - if val.SourceTimestamp and int(val.SourceTimestamp.replace( - tzinfo=timezone.utc).timestamp() * 1000) != self._last_node_timestamp: - timestamp = int(val.SourceTimestamp.replace(tzinfo=timezone.utc).timestamp() * 1000) - self._last_node_timestamp = timestamp - elif val.ServerTimestamp and int(val.ServerTimestamp.replace( - tzinfo=timezone.utc).timestamp() * 1000) != self._last_node_timestamp: - timestamp = int(val.ServerTimestamp.replace(tzinfo=timezone.utc).timestamp() * 1000) - self._last_node_timestamp = timestamp - else: - timestamp = int(time() * 1000) - - self.data[DATA_TYPES[config['section']]].append({'ts': timestamp, 'values': {config['key']: data}}) - else: - self.data[DATA_TYPES[config['section']]].append({config['key']: data}) - - self._log.debug('Converted data: %s', self.data) + if val is not None: + self.data[DATA_TYPES[config['section']]].append({config['key']: val}) diff --git a/thingsboard_gateway/connectors/opcua_asyncio/opcua_connector.py b/thingsboard_gateway/connectors/opcua_asyncio/opcua_connector.py deleted file mode 100644 index 867d90014..000000000 --- a/thingsboard_gateway/connectors/opcua_asyncio/opcua_connector.py +++ /dev/null @@ -1,600 +0,0 @@ -# Copyright 2024. ThingsBoard -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import asyncio -import re -from random import choice -from string import ascii_lowercase -from threading import Thread -from time import sleep, time -from queue import Queue - -from thingsboard_gateway.connectors.connector import Connector -from thingsboard_gateway.connectors.opcua_asyncio.backward_compatibility_adapter import BackwardCompatibilityAdapter -from thingsboard_gateway.connectors.opcua_asyncio.device import Device -from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader -from thingsboard_gateway.tb_utility.tb_utility import TBUtility -from thingsboard_gateway.tb_utility.tb_logger import init_logger - -try: - import asyncua -except ImportError: - print("OPC-UA library not found") - TBUtility.install_package("asyncua") - import asyncua - -from asyncua.crypto.security_policies import SecurityPolicyBasic256Sha256, SecurityPolicyBasic256, \ - SecurityPolicyBasic128Rsa15 -from asyncua.ua.uaerrors import UaStatusCodeError, BadNodeIdUnknown, BadConnectionClosed, \ - BadInvalidState, BadSessionClosed, BadAttributeIdInvalid, BadCommunicationError, BadOutOfService - -DEFAULT_UPLINK_CONVERTER = 'OpcUaUplinkConverter' - -SECURITY_POLICIES = { - "Basic128Rsa15": SecurityPolicyBasic128Rsa15, - "Basic256": SecurityPolicyBasic256, - "Basic256Sha256": SecurityPolicyBasic256Sha256, -} - -MESSAGE_SECURITY_MODES = { - "None": asyncua.ua.MessageSecurityMode.None_, - "Sign": asyncua.ua.MessageSecurityMode.Sign, - "SignAndEncrypt": asyncua.ua.MessageSecurityMode.SignAndEncrypt -} - - -class OpcUaConnectorAsyncIO(Connector, Thread): - def __init__(self, gateway, config, connector_type): - self.statistics = {'MessagesReceived': 0, - 'MessagesSent': 0} - super().__init__() - self._connector_type = connector_type - self.__gateway = gateway - self.__config = config - - # check if config is in old format and convert it to new format - if len(tuple(filter(lambda node_config: not node_config.get('deviceInfo'), - self.__config.get('server', {}).get('mapping', [])))): - backward_compatibility_adapter = BackwardCompatibilityAdapter(self.__config) - self.__config = backward_compatibility_adapter.convert() - - self.__id = self.__config.get('id') - self.__server_conf = self.__config.get('server', {}) - self.name = self.__config.get("name", 'OPC-UA Connector ' + ''.join(choice(ascii_lowercase) for _ in range(5))) - self.__log = init_logger(self.__gateway, self.name, self.__config.get('logLevel', 'INFO'), - enable_remote_logging=self.__config.get('enableRemoteLogging', False)) - - if "opc.tcp" not in self.__server_conf.get("url"): - self.__opcua_url = "opc.tcp://" + self.__server_conf.get("url") - else: - self.__opcua_url = self.__server_conf.get("url") - - self.__data_to_send = Queue(-1) - self.__sub_data_to_convert = Queue(-1) - - self.__loop = asyncio.new_event_loop() - - self.__client = None - self.__subscription = None - - self.__connected = False - self.__stopped = False - self.daemon = True - - self.__device_nodes = [] - self.__last_poll = 0 - - def open(self): - self.__stopped = False - self.start() - self.__log.info("Starting OPC-UA Connector (Async IO)") - - def get_type(self): - return self._connector_type - - def close(self): - task = self.__loop.create_task(self.__reset_nodes()) - - while not task.done(): - sleep(.2) - - self.__stopped = True - self.__connected = False - self.__log.info('%s has been stopped.', self.get_name()) - self.__log.stop() - - async def __reset_node(self, node): - node['valid'] = False - if node.get('sub_on', False): - try: - if self.__subscription: - await self.__subscription.unsubscribe(node['subscription']) - except: - pass - node['subscription'] = None - node['sub_on'] = False - - async def __reset_nodes(self, device_name=None): - for device in self.__device_nodes: - if device_name is None or device.name == device_name: - for section in ('attributes', 'timeseries'): - for node in device.values.get(section, []): - await self.__reset_node(node) - - if device_name is None and self.__subscription: - try: - await self.__subscription.delete() - except: - pass - self.__subscription = None - - def get_id(self): - return self.__id - - def get_name(self): - return self.name - - def is_connected(self): - return self.__connected - - def is_stopped(self): - return self.__stopped - - def get_config(self): - return self.__server_conf - - def run(self): - data_send_thread = Thread(name='Send Data Thread', target=self.__send_data, daemon=True) - data_send_thread.start() - - if not self.__server_conf.get('disableSubscriptions', False): - sub_data_convert_thread = Thread(name='Sub Data Convert Thread', target=self.__convert_sub_data, - daemon=True) - sub_data_convert_thread.start() - - self.__loop.run_until_complete(self.start_client()) - - async def start_client(self): - while not self.__stopped: - self.__client = asyncua.Client(url=self.__opcua_url, - timeout=self.__server_conf.get('timeoutInMillis', 4000) / 1000) - - if self.__server_conf["identity"].get("type") == "cert.PEM": - await self.__set_auth_settings_by_cert() - if self.__server_conf["identity"].get("username"): - self.__set_auth_settings_by_username() - - try: - async with self.__client: - self.__connected = True - - try: - await self.__client.load_data_type_definitions() - except Exception as e: - self.__log.error("Error on loading type definitions:\n %s", e) - - while not self.__stopped: - if time() - self.__last_poll >= self.__server_conf.get('scanPeriodInMillis', 5000) / 1000: - await self.__scan_device_nodes() - await self.__poll_nodes() - self.__last_poll = time() - - await asyncio.sleep(.2) - except (ConnectionError, BadSessionClosed): - self.__log.warning('Connection lost for %s', self.get_name()) - except asyncio.exceptions.TimeoutError: - self.__log.warning('Failed to connect %s', self.get_name()) - except Exception as e: - self.__log.exception(e) - finally: - await self.__reset_nodes() - self.__connected = False - await asyncio.sleep(1) - - async def __set_auth_settings_by_cert(self): - try: - ca_cert = self.__server_conf["identity"].get("caCert") - private_key = self.__server_conf["identity"].get("privateKey") - cert = self.__server_conf["identity"].get("cert") - policy = self.__server_conf["security"] - mode = self.__server_conf["identity"].get("mode", "SignAndEncrypt") - - if cert is None or private_key is None: - self.__log.exception("Error in ssl configuration - cert or privateKey parameter not found") - raise RuntimeError("Error in ssl configuration - cert or privateKey parameter not found") - - await self.__client.set_security( - SECURITY_POLICIES[policy], - certificate=cert, - private_key=private_key, - server_certificate=ca_cert, - mode=MESSAGE_SECURITY_MODES[mode] - ) - except Exception as e: - self.__log.exception(e) - - def __set_auth_settings_by_username(self): - self.__client.set_user(self.__server_conf["identity"].get("username")) - if self.__server_conf["identity"].get("password"): - self.__client.set_password(self.__server_conf["identity"].get("password")) - - def __load_converter(self, device): - converter_class_name = device.get('converter', DEFAULT_UPLINK_CONVERTER) - module = TBModuleLoader.import_module(self._connector_type, converter_class_name) - - if module: - self.__log.debug('Converter %s for device %s - found!', converter_class_name, self.name) - return module - - self.__log.error("Cannot find converter for %s device", self.name) - return None - - @staticmethod - def is_regex_pattern(pattern): - return not re.fullmatch(pattern, pattern) - - async def __find_nodes(self, node_list, current_parent_node, nodes): - assert len(node_list) > 0 - final = [] - - children = await current_parent_node.get_children() - for node in children: - child_node = await node.read_browse_name() - - if re.fullmatch(re.escape(node_list[0]), child_node.Name): - new_nodes = [*nodes, f'{child_node.NamespaceIndex}:{child_node.Name}'] - if len(node_list) == 1: - final.append(new_nodes) - else: - final.extend(await self.__find_nodes(node_list[1:], current_parent_node=node, nodes=new_nodes)) - - return final - - async def find_nodes(self, node_pattern, current_parent_node=None, nodes=[]): - node_list = node_pattern.split('\\.') - - if current_parent_node is None: - current_parent_node = self.__client.nodes.root - - if len(node_list) > 0 and node_list[0].lower() == 'root': - node_list = node_list[1:] - - return await self.__find_nodes(node_list, current_parent_node, nodes) - - async def find_node_name_space_index(self, path): - if isinstance(path, str): - path = path.split('\\.') - - # find unresolved nodes - u_node_count = len(tuple(filter(lambda u_node: len(u_node.split(':')) < 2, path))) - - resolved = path[:-u_node_count] - resolved_level = len(path) - u_node_count - parent_node = await self.__client.nodes.root.get_child(resolved) - - unresolved = path[resolved_level:] - return await self.__find_nodes(unresolved, current_parent_node=parent_node, nodes=resolved) - - async def __scan_device_nodes(self): - existing_devices = list(map(lambda dev: dev.name, self.__device_nodes)) - - scanned_devices = [] - for device in self.__config.get('mapping', []): - nodes = await self.find_nodes(device['deviceNodePattern']) - self.__log.debug('Found devices: %s', nodes) - - device_names = [] - device_name_pattern = device.get('deviceInfo', {}).get('deviceNameExpression') - if re.match(r"\${([A-Za-z.:\d]*)}", device_name_pattern): - device_name_nodes = await self.find_nodes(device_name_pattern) - self.__log.debug('Found device name nodes: %s', device_name_nodes) - - for node in device_name_nodes: - try: - var = await self.__client.nodes.root.get_child(node) - value = await var.read_value() - device_names.append(value) - except Exception as e: - self.__log.exception(e) - continue - else: - device_names.append(device_name_pattern) - - for device_name in device_names: - scanned_devices.append(device_name) - if device_name not in existing_devices: - for node in nodes: - converter = self.__load_converter(device) - device_config = {**device, 'device_name': device_name} - self.__device_nodes.append( - Device(path=node, name=device_name, config=device_config, - converter=converter(device_config, self.__log), - converter_for_sub=converter(device_config, self.__log) if not self.__server_conf.get( - 'disableSubscriptions', - False) else None, logger=self.__log)) - self.__log.info('Added device node: %s', device_name) - - for device_name in existing_devices: - if device_name not in scanned_devices: - await self.__reset_nodes(device_name) - - self.__log.debug('Device nodes: %s', self.__device_nodes) - - def __convert_sub_data(self): - while not self.__stopped: - if not self.__sub_data_to_convert.empty(): - sub_node, data = self.__sub_data_to_convert.get() - - for device in self.__device_nodes: - for section in ('attributes', 'timeseries'): - for node in device.values.get(section, []): - if node.get('id') == sub_node.__str__(): - device.converter_for_sub.convert(config={'section': section, 'key': node['key']}, - val=data.monitored_item.Value) - converter_data = device.converter_for_sub.get_data() - - if converter_data: - self.__data_to_send.put(*converter_data) - device.converter_for_sub.clear_data() - else: - sleep(.2) - - async def __poll_nodes(self): - for device in self.__device_nodes: - for section in ('attributes', 'timeseries'): - for node in device.values.get(section, []): - try: - path = node.get('qualified_path', node['path']) - if isinstance(path, str) and re.match(r"(ns=\d+;[isgb]=[^}]+)", path): - var = self.__client.get_node(path) - else: - if len(path[-1].split(':')) != 2: - qualified_path = await self.find_node_name_space_index(path) - if len(qualified_path) == 0: - if node.get('valid', True): - self.__log.warning('Node not found; device: %s, key: %s, path: %s', device.name, - node['key'], node['path']) - await self.__reset_node(node) - continue - elif len(qualified_path) > 1: - self.__log.warning( - 'Multiple matching nodes found; device: %s, key: %s, path: %s; %s', device.name, - node['key'], node['path'], qualified_path) - node['qualified_path'] = qualified_path[0] - path = qualified_path[0] - - var = await self.__client.nodes.root.get_child(path) - - if not node.get('valid', False) or self.__server_conf.get('disableSubscriptions', False): - value = await var.read_data_value() - device.converter.convert(config={'section': section, 'key': node['key']}, val=value) - - if not self.__server_conf.get('disableSubscriptions', False) and not node.get('sub_on', - False): - if self.__subscription is None: - self.__subscription = await self.__client.create_subscription(1, SubHandler( - self.__sub_data_to_convert, self.__log)) - handle = await self.__subscription.subscribe_data_change(var) - node['subscription'] = handle - node['sub_on'] = True - node['id'] = var.nodeid.to_string() - self.__log.info("Subscribed on data change; device: %s, key: %s, path: %s", device.name, - node['key'], node['id']) - - node['valid'] = True - except ConnectionError: - raise - except (BadNodeIdUnknown, BadConnectionClosed, BadInvalidState, BadAttributeIdInvalid, - BadCommunicationError, BadOutOfService): - if node.get('valid', True): - self.__log.warning('Node not found (2); device: %s, key: %s, path: %s', device.name, - node['key'], node['path']) - await self.__reset_node(node) - except UaStatusCodeError as uae: - if node.get('valid', True): - self.__log.exception('Node status code error: %s', uae) - await self.__reset_node(node) - except Exception as e: - if node.get('valid', True): - self.__log.exception(e) - await self.__reset_node(node) - - converter_data = device.converter.get_data() - if converter_data: - self.__data_to_send.put(*converter_data) - - device.converter.clear_data() - - def __send_data(self): - while not self.__stopped: - if not self.__data_to_send.empty(): - data = self.__data_to_send.get() - self.statistics['MessagesReceived'] = self.statistics['MessagesReceived'] + 1 - self.__log.debug(data) - self.__gateway.send_to_storage(self.get_name(), self.get_id(), data) - self.statistics['MessagesSent'] = self.statistics['MessagesSent'] + 1 - self.__log.debug('Data to ThingsBoard %s', data) - else: - sleep(.2) - - async def get_shared_attr_node_id(self, path, result={}): - try: - q_path = await self.find_node_name_space_index(path) - var = await self.__client.nodes.root.get_child(q_path[0]) - result['result'] = var - except Exception as e: - result['error'] = e.__str__() - - def on_attributes_update(self, content): - self.__log.debug(content) - try: - device = tuple(filter(lambda i: i.name == content['device'], self.__device_nodes))[0] - - for (key, value) in content['data'].items(): - for attr_update in device.config['attributes_updates']: - if attr_update['key'] == key: - result = {} - task = self.__loop.create_task( - self.get_shared_attr_node_id( - device.path + attr_update['value'].replace('\\', '').split('.'), result)) - - while not task.done(): - sleep(.1) - - if result.get('error'): - self.__log.error('Node not found! (%s)', result['error']) - return - - node_id = result['result'] - self.__loop.create_task(self.__write_value(node_id, value)) - return - except Exception as e: - self.__log.exception(e) - - def server_side_rpc_handler(self, content): - try: - if content.get('data') is None: - content['data'] = {'params': content['params'], 'method': content['method']} - - rpc_method = content["data"].get("method") - - # check if RPC type is connector RPC (can be only 'get' or 'set') - try: - (connector_type, rpc_method_name) = rpc_method.split('_') - if connector_type == self._connector_type: - rpc_method = rpc_method_name - content['device'] = content['params'].split(' ')[0].split('=')[-1] - except (ValueError, IndexError): - pass - - # firstly check if a method is not service - if rpc_method == 'set' or rpc_method == 'get': - full_path = '' - args_list = [] - device = content.get('device') - - try: - args_list = content['data']['params'].split(';') - - if 'ns' in content['data']['params']: - full_path = ';'.join([item for item in (args_list[0:-1] if rpc_method == 'set' else args_list)]) - else: - full_path = args_list[0].split('=')[-1] - except IndexError: - self.__log.error('Not enough arguments. Expected min 2.') - self.__gateway.send_rpc_reply(device=device, - req_id=content['data'].get('id'), - content={content['data'][ - 'method']: 'Not enough arguments. Expected min 2.', - 'code': 400}) - - result = {} - if rpc_method == 'get': - task = self.__loop.create_task(self.__read_value(full_path, result)) - - while not task.done(): - sleep(.2) - elif rpc_method == 'set': - value = args_list[2].split('=')[-1] - task = self.__loop.create_task(self.__write_value(full_path, value, result)) - - while not task.done(): - sleep(.2) - - self.__gateway.send_rpc_reply(device=device, - req_id=content['data'].get('id'), - content={content['data']['method']: result}) - else: - device = tuple(filter(lambda i: i.name == content['device'], self.__device_nodes))[0] - - for rpc in device.config['rpc_methods']: - if rpc['method'] == content["data"]['method']: - arguments_from_config = rpc["arguments"] - arguments = content["data"].get("params") if content["data"].get( - "params") is not None else arguments_from_config - - try: - result = {} - task = self.__loop.create_task(self.__call_method(device.path, arguments, result)) - - while not task.done(): - sleep(.2) - - self.__gateway.send_rpc_reply(content["device"], - content["data"]["id"], - {content["data"]["method"]: result, "code": 200}) - self.__log.debug("method %s result is: %s", rpc['method'], result) - except Exception as e: - self.__log.exception(e) - self.__gateway.send_rpc_reply(content["device"], content["data"]["id"], - {"error": str(e), "code": 500}) - else: - self.__log.error("Method %s not found for device %s", rpc_method, content["device"]) - self.__gateway.send_rpc_reply(content["device"], content["data"]["id"], - {"error": "%s - Method not found" % rpc_method, - "code": 404}) - - except Exception as e: - self.__log.exception(e) - - async def __write_value(self, path, value, result={}): - try: - var = path - if isinstance(path, str): - var = self.__client.get_node(path.replace('\\.', '.')) - - await var.write_value(value) - except Exception as e: - result['error'] = e.__str__() - - async def __read_value(self, path, result={}): - try: - var = self.__client.get_node(path) - result['value'] = await var.read_value() - except Exception as e: - result['error'] = e.__str__() - - async def __call_method(self, path, arguments, result={}): - try: - var = self.__client.get_node(path) - result['result'] = await var.call_method(*arguments) - except Exception as e: - result['error'] = e.__str__() - - def update_converter_config(self, converter_name, config): - self.__log.debug('Received remote converter configuration update for %s with configuration %s', converter_name, - config) - for device in self.__device_nodes: - if device.converter.__class__.__name__ == converter_name: - device.config.update(config) - device.load_values() - self.__log.info('Updated converter configuration for: %s with configuration %s', - converter_name, device.config) - - for node_config in self.__config['mapping']: - if node_config['deviceNodePattern'] == device.config['deviceNodePattern']: - node_config.update(config) - - self.__gateway.update_connector_config_file(self.name, {'server': self.__server_conf, - 'mapping': self.__config.get('mapping', [])}) - - -class SubHandler: - def __init__(self, queue, logger): - self.__log = logger - self.__queue = queue - - def datachange_notification(self, node, _, data): - self.__log.debug("New data change event %s %s", node, data) - self.__queue.put((node, data)) diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index a3903f1da..49e61809e 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -166,6 +166,7 @@ def __init__(self, config_file=None): self.__modify_main_config() log.info("Gateway starting...") + self.__duplicate_detector = DuplicateDetector(self.available_connectors_by_name) self.__updater = TBUpdater() self.version = self.__updater.get_version() log.info("ThingsBoard IoT gateway version: %s", self.version["current_version"]) @@ -250,7 +251,6 @@ def __init__(self, config_file=None): except Exception as e: log.exception("Error while connecting to connectors, please update configuration: %s", e) - self.__duplicate_detector = DuplicateDetector(self.available_connectors_by_name) self.__load_persistent_devices() if self.__config['thingsboard'].get('managerEnabled', False): From e9683a0a74fb8e01585ee20042a8a936bec62a01 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Wed, 14 Aug 2024 17:35:48 +0300 Subject: [PATCH 02/33] Added handling for non primitive types --- .../opcua/opcua_uplink_converter.py | 35 +++++++++++++++++-- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py b/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py index 61b96eb7d..dcdc8e113 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py +++ b/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py @@ -61,8 +61,37 @@ def get_data(self): def convert(self, configs, values): for (val, config) in zip(values, configs): - if not val: + if not val or val is None: continue - if val is not None: - self.data[DATA_TYPES[config['section']]].append({config['key']: val}) + data = val + + if not isinstance(data, (int, float, str, bool, dict, list, type(None), LocalizedText)): + self._log.info(f"Non primitive data type: {type(data)}") + data = val.Value.Value + if isinstance(data, LocalizedText): + data = data.Text + elif val.Value.VariantType == VariantType.ExtensionObject: + data = str(data) + elif val.Value.VariantType == VariantType.DateTime: + if data.tzinfo is None: + data = data.replace(tzinfo=timezone.utc) + data = data.isoformat() + elif val.Value.VariantType == VariantType.StatusCode: + data = data.name + elif (val.Value.VariantType == VariantType.QualifiedName + or val.Value.VariantType == VariantType.NodeId + or val.Value.VariantType == VariantType.ExpandedNodeId): + data = data.to_string() + elif val.Value.VariantType == VariantType.ByteString: + data = data.hex() + elif val.Value.VariantType == VariantType.XmlElement: + data = data.decode('utf-8') + elif val.Value.VariantType == VariantType.Guid: + data = str(data) + elif val.Value.VariantType == VariantType.DiagnosticInfo: + data = data.to_string() + elif val.Value.VariantType == VariantType.Null: + data = None + + self.data[DATA_TYPES[config['section']]].append({config['key']: data}) From 91cb991993a1e1b0da16954ec3caa3e8d703a3fa Mon Sep 17 00:00:00 2001 From: imbeacon Date: Thu, 15 Aug 2024 07:20:55 +0300 Subject: [PATCH 03/33] Fix for getting connector configuration from main configuration file --- .../tb_utility/tb_gateway_remote_configurator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py b/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py index 935116e51..11a30bfcc 100644 --- a/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py +++ b/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py @@ -193,7 +193,7 @@ def _load_connectors_configuration(self): for connector in connector_list: for general_connector_config in self.connectors_configuration: if general_connector_config['name'] == connector['name']: - config = connector.pop('config')[general_connector_config['configuration']] + config = connector.get('config')[general_connector_config['configuration']] general_connector_config.update(connector) general_connector_config['configurationJson'] = config From 2fd84ed319b3405baaabf25be90e75b79ea5712d Mon Sep 17 00:00:00 2001 From: imbeacon Date: Thu, 15 Aug 2024 08:41:19 +0300 Subject: [PATCH 04/33] Adjusted non-primitive parsing --- .../opcua/opcua_uplink_converter.py | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py b/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py index dcdc8e113..e48c77aad 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py +++ b/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py @@ -68,30 +68,32 @@ def convert(self, configs, values): if not isinstance(data, (int, float, str, bool, dict, list, type(None), LocalizedText)): self._log.info(f"Non primitive data type: {type(data)}") - data = val.Value.Value if isinstance(data, LocalizedText): data = data.Text - elif val.Value.VariantType == VariantType.ExtensionObject: + elif val.VariantType == VariantType.ExtensionObject: data = str(data) - elif val.Value.VariantType == VariantType.DateTime: + elif val.VariantType == VariantType.DateTime: if data.tzinfo is None: data = data.replace(tzinfo=timezone.utc) data = data.isoformat() - elif val.Value.VariantType == VariantType.StatusCode: + elif val.VariantType == VariantType.StatusCode: data = data.name - elif (val.Value.VariantType == VariantType.QualifiedName - or val.Value.VariantType == VariantType.NodeId - or val.Value.VariantType == VariantType.ExpandedNodeId): + elif (val.VariantType == VariantType.QualifiedName + or val.VariantType == VariantType.NodeId + or val.VariantType == VariantType.ExpandedNodeId): data = data.to_string() - elif val.Value.VariantType == VariantType.ByteString: + elif val.VariantType == VariantType.ByteString: data = data.hex() - elif val.Value.VariantType == VariantType.XmlElement: + elif val.VariantType == VariantType.XmlElement: data = data.decode('utf-8') - elif val.Value.VariantType == VariantType.Guid: + elif val.VariantType == VariantType.Guid: data = str(data) - elif val.Value.VariantType == VariantType.DiagnosticInfo: + elif val.VariantType == VariantType.DiagnosticInfo: data = data.to_string() - elif val.Value.VariantType == VariantType.Null: + elif val.VariantType == VariantType.Null: data = None + else: + self._log.info(f"Unsupported data type: {val.VariantType}, will be processed as a string.") + data = str(data) self.data[DATA_TYPES[config['section']]].append({config['key']: data}) From 9d18a0ae402e8b7d8af880b3d84f3d7c649bbca8 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 14 Aug 2024 19:23:32 +0200 Subject: [PATCH 05/33] build docker script added. requirement for asyncua added --- build_docker.sh | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100755 build_docker.sh diff --git a/build_docker.sh b/build_docker.sh new file mode 100755 index 000000000..2e1cadd70 --- /dev/null +++ b/build_docker.sh @@ -0,0 +1,35 @@ +#!/bin/sh +# Copyright 2024. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -e # exit on any error + +# Fetch the current branch name and latest commit ID +BRANCH_NAME=$(git rev-parse --abbrev-ref HEAD | sed 's/[\/]/-/g') +COMMIT_ID=$(git rev-parse --short HEAD) + +# Combine them to create a version tag +VERSION_TAG="${BRANCH_NAME}-${COMMIT_ID}" + +echo "$(date) Building project with version tag $VERSION_TAG ..." +set -x + +#docker build . -t sevlamat/tb-gateway:$VERSION_TAG -f docker/Dockerfile -o type=registry + +# multi arch + DOCKER_CLI_EXPERIMENTAL=enabled \ + docker buildx build . -t sevlamat/tb-gateway:$VERSION_TAG -f docker/Dockerfile --platform=linux/amd64,linux/arm64 -o type=registry + +set +x +echo "$(date) Done." From 4f8b1f404c13d548c86b3e184594e7571e5bd925 Mon Sep 17 00:00:00 2001 From: samson0v Date: Thu, 15 Aug 2024 10:09:27 +0300 Subject: [PATCH 06/33] Adjusted batch reading, changed method of reading to get correct processing of non primitive types --- .../connectors/opcua/opcua_connector.py | 5 ++- .../opcua/opcua_uplink_converter.py | 38 +++++++++++-------- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index 45d10d329..684fd3600 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -20,6 +20,8 @@ from time import sleep, monotonic from queue import Queue +from asyncua import ua + from thingsboard_gateway.connectors.connector import Connector from thingsboard_gateway.connectors.opcua.backward_compatibility_adapter import BackwardCompatibilityAdapter from thingsboard_gateway.connectors.opcua.device import Device @@ -495,7 +497,8 @@ async def _load_devices_nodes(self): await self.__reset_node(node) async def __poll_nodes(self): - values = await self.__client.read_values([node_config['var'] for device in self.__device_nodes for node_config in device.nodes]) + values = await self.__client.read_attributes( + [node_config['var'] for device in self.__device_nodes for node_config in device.nodes]) converted_nodes_count = 0 for device in self.__device_nodes: diff --git a/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py b/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py index e48c77aad..6d34b28aa 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py +++ b/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py @@ -60,40 +60,46 @@ def get_data(self): return None def convert(self, configs, values): + if not isinstance(configs, list): + configs = [configs] + if not isinstance(list, values): + values = [values] for (val, config) in zip(values, configs): if not val or val is None: continue - data = val + data = val.Value.Value - if not isinstance(data, (int, float, str, bool, dict, list, type(None), LocalizedText)): - self._log.info(f"Non primitive data type: {type(data)}") + if data is not None: if isinstance(data, LocalizedText): data = data.Text - elif val.VariantType == VariantType.ExtensionObject: + elif val.Value.VariantType == VariantType.ExtensionObject: data = str(data) - elif val.VariantType == VariantType.DateTime: + elif val.Value.VariantType == VariantType.DateTime: if data.tzinfo is None: data = data.replace(tzinfo=timezone.utc) data = data.isoformat() - elif val.VariantType == VariantType.StatusCode: + elif val.Value.VariantType == VariantType.StatusCode: data = data.name - elif (val.VariantType == VariantType.QualifiedName - or val.VariantType == VariantType.NodeId - or val.VariantType == VariantType.ExpandedNodeId): + elif (val.Value.VariantType == VariantType.QualifiedName + or val.Value.VariantType == VariantType.NodeId + or val.Value.VariantType == VariantType.ExpandedNodeId): data = data.to_string() - elif val.VariantType == VariantType.ByteString: + elif val.Value.VariantType == VariantType.ByteString: data = data.hex() - elif val.VariantType == VariantType.XmlElement: + elif val.Value.VariantType == VariantType.XmlElement: data = data.decode('utf-8') - elif val.VariantType == VariantType.Guid: + elif val.Value.VariantType == VariantType.Guid: data = str(data) - elif val.VariantType == VariantType.DiagnosticInfo: + elif val.Value.VariantType == VariantType.DiagnosticInfo: data = data.to_string() - elif val.VariantType == VariantType.Null: + elif val.Value.VariantType == VariantType.Null: data = None else: - self._log.info(f"Unsupported data type: {val.VariantType}, will be processed as a string.") - data = str(data) + self._log.warning(f"Unsupported data type: {val.VariantType}, will be processed as a string.") + if hasattr(data, 'to_string'): + data = data.to_string() + else: + data = str(data) self.data[DATA_TYPES[config['section']]].append({config['key']: data}) From fe972e4c06c64e6e577abd4ab3c03ada04848703 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 15 Aug 2024 09:19:17 +0200 Subject: [PATCH 07/33] docker python12 and cleanup --- docker/Dockerfile | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/docker/Dockerfile b/docker/Dockerfile index acde58aca..916be6a30 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,4 +1,4 @@ -FROM --platform=$TARGETPLATFORM python:3.11-slim AS build +FROM --platform=$TARGETPLATFORM python:3.12-slim AS build ARG TARGETPLATFORM ARG BUILDPLATFORM @@ -29,8 +29,11 @@ RUN mkdir -p /default-config/config /default-config/extensions/ && \ echo "Unsupported platform detected. Trying to use default value...";; \ esac && \ curl https://sh.rustup.rs -sSf | sh -s -- -y --default-host=$DEFAULT_HOST --profile minimal && \ + apt-get remove --purge -y \ + gcc python3-dev build-essential libssl-dev libffi-dev zlib1g-dev pkg-config && \ + apt-get autoremove -y && \ apt-get clean && \ - rm -rf /var/lib/apt/lists/* && \ + rm -rf /var/lib/apt/lists/* /tmp/* && \ echo '#!/bin/sh\n\ # Main start script\n\ CONF_FOLDER="/thingsboard_gateway/config"\n\ From db7cc12c7172be5f13e40078db20aa9f693070e0 Mon Sep 17 00:00:00 2001 From: samson0v Date: Thu, 15 Aug 2024 10:30:20 +0300 Subject: [PATCH 08/33] Fixed passing arguments --- thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py b/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py index 6d34b28aa..6281b96e8 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py +++ b/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py @@ -62,7 +62,7 @@ def get_data(self): def convert(self, configs, values): if not isinstance(configs, list): configs = [configs] - if not isinstance(list, values): + if not isinstance(values, list): values = [values] for (val, config) in zip(values, configs): if not val or val is None: From 7ccf44517db574dcc268cf2f76a2a10efe6c7744 Mon Sep 17 00:00:00 2001 From: samson0v Date: Thu, 15 Aug 2024 10:40:11 +0300 Subject: [PATCH 09/33] Fixed attribute error --- thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py b/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py index 6281b96e8..86979147a 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py +++ b/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py @@ -96,7 +96,7 @@ def convert(self, configs, values): elif val.Value.VariantType == VariantType.Null: data = None else: - self._log.warning(f"Unsupported data type: {val.VariantType}, will be processed as a string.") + self._log.warning(f"Unsupported data type: {val.Value.VariantType}, will be processed as a string.") if hasattr(data, 'to_string'): data = data.to_string() else: From 84a6d029ac4ea25d66c725591123ce1f2b9be99a Mon Sep 17 00:00:00 2001 From: samson0v Date: Thu, 15 Aug 2024 10:54:50 +0300 Subject: [PATCH 10/33] Added processing primitive types --- thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py b/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py index 86979147a..9675ca346 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py +++ b/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py @@ -70,7 +70,7 @@ def convert(self, configs, values): data = val.Value.Value - if data is not None: + if data is not None and not isinstance(data, (int, float, str, bool, dict, list, type(None))): if isinstance(data, LocalizedText): data = data.Text elif val.Value.VariantType == VariantType.ExtensionObject: From ed0fe1ea7d6424d894d2aea0a244b5d5df3f7a56 Mon Sep 17 00:00:00 2001 From: samson0v Date: Thu, 15 Aug 2024 11:38:41 +0300 Subject: [PATCH 11/33] Added additional log --- thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py | 1 + 1 file changed, 1 insertion(+) diff --git a/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py b/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py index 9675ca346..72387c2e9 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py +++ b/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py @@ -71,6 +71,7 @@ def convert(self, configs, values): data = val.Value.Value if data is not None and not isinstance(data, (int, float, str, bool, dict, list, type(None))): + self._log.error("Unsupported data: %s | and data type: %s", data, type(data)) if isinstance(data, LocalizedText): data = data.Text elif val.Value.VariantType == VariantType.ExtensionObject: From 1442a2ae87f33cab48a08295e8e9b7fb6220305a Mon Sep 17 00:00:00 2001 From: samson0v Date: Thu, 15 Aug 2024 13:14:57 +0300 Subject: [PATCH 12/33] Added error handling --- .../connectors/opcua/opcua_uplink_converter.py | 1 - .../gateway/tb_gateway_service.py | 18 +++++++++++------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py b/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py index 72387c2e9..9675ca346 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py +++ b/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py @@ -71,7 +71,6 @@ def convert(self, configs, values): data = val.Value.Value if data is not None and not isinstance(data, (int, float, str, bool, dict, list, type(None))): - self._log.error("Unsupported data: %s | and data type: %s", data, type(data)) if isinstance(data, LocalizedText): data = data.Text elif val.Value.VariantType == VariantType.ExtensionObject: diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 49e61809e..51e53430a 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -1100,7 +1100,7 @@ def __send_to_storage(self): else: sleep(0.2) except Exception as e: - log.error(e) + log.exception(e) @staticmethod def __get_data_size(data: dict): @@ -1131,12 +1131,16 @@ def __convert_telemetry_to_ts(data): return data def __send_data_pack_to_storage(self, data, connector_name, connector_id=None): - json_data = dumps(data) - save_result = self._event_storage.put(json_data) - if not save_result: - log.error('%rData from the device "%s" cannot be saved, connector name is %s.', - "[" + connector_id + "] " if connector_id is not None else "", - data["deviceName"], connector_name) + try: + json_data = dumps(data) + save_result = self._event_storage.put(json_data) + if not save_result: + log.error('%rData from the device "%s" cannot be saved, connector name is %s.', + "[" + connector_id + "] " if connector_id is not None else "", + data["deviceName"], connector_name) + except Exception as e: + log.error('CONVERTING ERROR | %s', data) + log.exception(e) def check_size(self, devices_data_in_event_pack): if (self.__get_data_size(devices_data_in_event_pack) From 27772d2767309136454e5899f326cfb7e127a680 Mon Sep 17 00:00:00 2001 From: samson0v Date: Thu, 15 Aug 2024 13:45:50 +0300 Subject: [PATCH 13/33] Fixed list data converting --- .../connectors/opcua/opcua_uplink_converter.py | 4 +++- .../gateway/tb_gateway_service.py | 16 ++++++---------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py b/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py index 9675ca346..6ed3ca37a 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py +++ b/thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py @@ -70,7 +70,9 @@ def convert(self, configs, values): data = val.Value.Value - if data is not None and not isinstance(data, (int, float, str, bool, dict, list, type(None))): + if isinstance(data, list): + data = [str(item) for item in data] + elif data is not None and not isinstance(data, (int, float, str, bool, dict, type(None))): if isinstance(data, LocalizedText): data = data.Text elif val.Value.VariantType == VariantType.ExtensionObject: diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 51e53430a..f7ea20a6d 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -1131,16 +1131,12 @@ def __convert_telemetry_to_ts(data): return data def __send_data_pack_to_storage(self, data, connector_name, connector_id=None): - try: - json_data = dumps(data) - save_result = self._event_storage.put(json_data) - if not save_result: - log.error('%rData from the device "%s" cannot be saved, connector name is %s.', - "[" + connector_id + "] " if connector_id is not None else "", - data["deviceName"], connector_name) - except Exception as e: - log.error('CONVERTING ERROR | %s', data) - log.exception(e) + json_data = dumps(data) + save_result = self._event_storage.put(json_data) + if not save_result: + log.error('%rData from the device "%s" cannot be saved, connector name is %s.', + "[" + connector_id + "] " if connector_id is not None else "", + data["deviceName"], connector_name) def check_size(self, devices_data_in_event_pack): if (self.__get_data_size(devices_data_in_event_pack) From ca4b0aeed9c2e9e7f1371234d61b4f86ac0f7553 Mon Sep 17 00:00:00 2001 From: samson0v Date: Thu, 15 Aug 2024 14:19:12 +0300 Subject: [PATCH 14/33] Fixed BadNothingToDo error --- .../connectors/opcua/opcua_connector.py | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index f489e59f3..8712f87cf 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -497,22 +497,26 @@ async def _load_devices_nodes(self): await self.__reset_node(node) async def __poll_nodes(self): - values = await self.__client.read_attributes( - [node_config['var'] for device in self.__device_nodes for node_config in device.nodes]) + all_nodes = [node_config['var'] for device in self.__device_nodes for node_config in device.nodes] - converted_nodes_count = 0 - for device in self.__device_nodes: - nodes_count = len(device.nodes) - device_values = values[converted_nodes_count:converted_nodes_count + nodes_count] - converted_nodes_count += nodes_count - device.converter.convert(device.nodes, device_values) - converter_data = device.converter.get_data() - if converter_data: - self.__data_to_send.put(*converter_data) + if len(all_nodes) > 0: + values = await self.__client.read_attributes(all_nodes) + + converted_nodes_count = 0 + for device in self.__device_nodes: + nodes_count = len(device.nodes) + device_values = values[converted_nodes_count:converted_nodes_count + nodes_count] + converted_nodes_count += nodes_count + device.converter.convert(device.nodes, device_values) + converter_data = device.converter.get_data() + if converter_data: + self.__data_to_send.put(*converter_data) - device.converter.clear_data() + device.converter.clear_data() - self.__log.debug('Converted nodes values count: %s', converted_nodes_count) + self.__log.debug('Converted nodes values count: %s', converted_nodes_count) + else: + self.__log.info('No nodes to poll') def __send_data(self): while not self.__stopped: From 3a476292f821923efbf7e506c948611293f20774 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Thu, 15 Aug 2024 17:09:09 +0300 Subject: [PATCH 15/33] Hardcoded queue parameters --- thingsboard_gateway/gateway/tb_gateway_service.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index f7ea20a6d..96881c001 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -222,9 +222,9 @@ def __init__(self, config_file=None): self.init_statistics_service(self.__config['thingsboard'].get('statistics', DEFAULT_STATISTIC)) - self.__min_pack_send_delay_ms = self.__config['thingsboard'].get('minPackSendDelayMS', 200) + self.__min_pack_send_delay_ms = 10#self.__config['thingsboard'].get('minPackSendDelayMS', 200) self.__min_pack_send_delay_ms = self.__min_pack_send_delay_ms / 1000.0 - self.__min_pack_size_to_send = self.__config['thingsboard'].get('minPackSizeToSend', 50) + self.__min_pack_size_to_send = 1000 #self.__config['thingsboard'].get('minPackSizeToSend', 50) self._send_thread = Thread(target=self.__read_data_from_storage, daemon=True, name="Send data to Thingsboard Thread") @@ -1042,7 +1042,7 @@ def __send_to_storage(self): data = self.__convert_telemetry_to_ts(data) - max_data_size = self.__config["thingsboard"].get("maxPayloadSizeBytes", 400) + max_data_size = 1000000 #self.__config["thingsboard"].get("maxPayloadSizeBytes", 400) if self.__get_data_size(data) >= max_data_size: # Data is too large, so we will attempt to send in pieces adopted_data = {"deviceName": data['deviceName'], From d61788dbb4787e5a7686bed51683f4a80bd2987b Mon Sep 17 00:00:00 2001 From: imbeacon Date: Thu, 15 Aug 2024 17:25:02 +0300 Subject: [PATCH 16/33] Changed polling delay logic --- .../connectors/opcua/opcua_connector.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index 8712f87cf..6eeedece9 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -104,7 +104,7 @@ def __init__(self, gateway, config, connector_type): self.daemon = True self.__device_nodes = [] - self.__last_poll = 0 + self.__next_poll = 0 def open(self): self.__stopped = False @@ -227,14 +227,13 @@ async def start_client(self): await self.__scan_device_nodes() while not self.__stopped: - if monotonic() - self.__last_poll >= scan_period: + if monotonic() >= self.__next_poll: + self.__next_poll = monotonic() + scan_period await self.__poll_nodes() - self.__last_poll = monotonic() - if not scan_period < 0.2: - await asyncio.sleep(0.2) - else: - await asyncio.sleep(scan_period) + time_to_sleep = self.__next_poll - scan_period - monotonic() + if time_to_sleep > 0: + await asyncio.sleep(time_to_sleep) except (ConnectionError, BadSessionClosed): self.__log.warning('Connection lost for %s', self.get_name()) except asyncio.exceptions.TimeoutError: From 05dfc5bed51ff824cc43c801d49777c9dff7fc3e Mon Sep 17 00:00:00 2001 From: samson0v Date: Fri, 16 Aug 2024 10:19:25 +0300 Subject: [PATCH 17/33] Added pollPeriodInMillis parameter, renamed scanPeriodInSec parameter, optimized polling and scanning methods calling --- .../connectors/opcua/opcua_connector.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index 6eeedece9..06f98fdae 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -105,6 +105,7 @@ def __init__(self, gateway, config, connector_type): self.__device_nodes = [] self.__next_poll = 0 + self.__next_scan = 0 def open(self): self.__stopped = False @@ -222,16 +223,20 @@ async def start_client(self): except Exception as e: self.__log.error("Error on loading type definitions:\n %s", e) - scan_period = self.__server_conf.get('scanPeriodInMillis', 5000) / 1000 - - await self.__scan_device_nodes() + poll_period = int(self.__server_conf.get('pollPeriodInMillis', 5000) / 1000) + scan_period = self.__server_conf.get('scanPeriodInSec', 3600) while not self.__stopped: + if monotonic() >= self.__next_scan: + self.__next_scan = monotonic() + scan_period + await self.__scan_device_nodes() + if monotonic() >= self.__next_poll: - self.__next_poll = monotonic() + scan_period + self.__next_poll = monotonic() + poll_period await self.__poll_nodes() - time_to_sleep = self.__next_poll - scan_period - monotonic() + current_time = monotonic() + time_to_sleep = min(self.__next_poll - current_time, self.__next_scan - current_time) if time_to_sleep > 0: await asyncio.sleep(time_to_sleep) except (ConnectionError, BadSessionClosed): From ef8acad52733314ef3108e493c41957edd041c24 Mon Sep 17 00:00:00 2001 From: samson0v Date: Fri, 16 Aug 2024 10:56:44 +0300 Subject: [PATCH 18/33] Updated default values in tb_gateway.json --- thingsboard_gateway/config/tb_gateway.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/thingsboard_gateway/config/tb_gateway.json b/thingsboard_gateway/config/tb_gateway.json index 0e52afb82..241419697 100644 --- a/thingsboard_gateway/config/tb_gateway.json +++ b/thingsboard_gateway/config/tb_gateway.json @@ -12,8 +12,8 @@ "enable": false, "filterFile": "list.json" }, - "maxPayloadSizeBytes": 1024, - "minPackSendDelayMS": 200, + "maxPayloadSizeBytes": 8196, + "minPackSendDelayMS": 50, "minPackSizeToSend": 500, "checkConnectorsConfigurationInSeconds": 60, "handleDeviceRenaming": true, From 58fbf711f0d68f5d2b58cc9c9fddf40e9ce219d7 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Fri, 16 Aug 2024 14:18:27 +0300 Subject: [PATCH 19/33] Fix for asyncua library installation at runtime --- thingsboard_gateway/connectors/opcua/opcua_connector.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index 06f98fdae..8281c2926 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -20,8 +20,6 @@ from time import sleep, monotonic from queue import Queue -from asyncua import ua - from thingsboard_gateway.connectors.connector import Connector from thingsboard_gateway.connectors.opcua.backward_compatibility_adapter import BackwardCompatibilityAdapter from thingsboard_gateway.connectors.opcua.device import Device From 4ed867e8b11f4caf58a55abfa8abf9af68fd2666 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Fri, 16 Aug 2024 18:10:43 +0300 Subject: [PATCH 20/33] Fix for loosing mapping array --- .../connectors/opcua/backward_compatibility_adapter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thingsboard_gateway/connectors/opcua/backward_compatibility_adapter.py b/thingsboard_gateway/connectors/opcua/backward_compatibility_adapter.py index de95ad503..a15489e04 100644 --- a/thingsboard_gateway/connectors/opcua/backward_compatibility_adapter.py +++ b/thingsboard_gateway/connectors/opcua/backward_compatibility_adapter.py @@ -60,7 +60,7 @@ def convert(self): self._log.error('Error during conversion: ', e) self._log.info('Config: ', node_config) - mapping = self._config.get('server', {}).pop('mapping', []) + mapping = self._config.get('server', {}).get('mapping', []) self._config['mapping'] = mapping return self._config From 983d5704ccdc42909220a118d4c55979c68d9613 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Fri, 16 Aug 2024 18:13:14 +0300 Subject: [PATCH 21/33] Reduced sleeping and refactored max payload size processing configuration parameter, hardcoded for now to be 1MB --- thingsboard_gateway/gateway/tb_gateway_service.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 96881c001..50a671898 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -225,6 +225,7 @@ def __init__(self, config_file=None): self.__min_pack_send_delay_ms = 10#self.__config['thingsboard'].get('minPackSendDelayMS', 200) self.__min_pack_send_delay_ms = self.__min_pack_send_delay_ms / 1000.0 self.__min_pack_size_to_send = 1000 #self.__config['thingsboard'].get('minPackSizeToSend', 50) + self.__max_payload_size_in_bytes = 1_000_000 #self.__config["thingsboard"].get("maxPayloadSizeBytes", 400) self._send_thread = Thread(target=self.__read_data_from_storage, daemon=True, name="Send data to Thingsboard Thread") @@ -502,7 +503,7 @@ def _watchers(self): self.__rpc_requests_in_progress = new_rpc_request_in_progress else: try: - sleep(0.2) + sleep(0.02) except Exception as e: log.exception(e) break @@ -1098,7 +1099,7 @@ def __send_to_storage(self): self.__send_data_pack_to_storage(data, connector_name, connector_id) else: - sleep(0.2) + sleep(0.02) except Exception as e: log.exception(e) @@ -1140,7 +1141,7 @@ def __send_data_pack_to_storage(self, data, connector_name, connector_id=None): def check_size(self, devices_data_in_event_pack): if (self.__get_data_size(devices_data_in_event_pack) - >= self.__config["thingsboard"].get("maxPayloadSizeBytes", 400)): + >= self.__max_payload_size_in_bytes): self.__send_data(devices_data_in_event_pack) for device in devices_data_in_event_pack: devices_data_in_event_pack[device]["telemetry"] = [] @@ -1690,7 +1691,7 @@ def ping(self): return self.name def get_max_payload_size_bytes(self): - return self.__config["thingsboard"].get("maxPayloadSizeBytes", 400) + return self.__max_payload_size_in_bytes # ---------------------------- # Storage -------------------- From 2f79b611f052240075fbd656eb0f138b7afd3743 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Sat, 17 Aug 2024 12:38:34 +0300 Subject: [PATCH 22/33] Added session timeout for OCPUA client --- thingsboard_gateway/connectors/opcua/opcua_connector.py | 1 + 1 file changed, 1 insertion(+) diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index 8281c2926..6d51eb143 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -204,6 +204,7 @@ async def start_client(self): try: self.__client = asyncua.Client(url=self.__opcua_url, timeout=self.__server_conf.get('timeoutInMillis', 4000) / 1000) + self.__client.session_timeout = self.__server_conf.get('sessionTimeoutInMillis', 3600000) if self.__server_conf["identity"].get("type") == "cert.PEM": await self.__set_auth_settings_by_cert() From 267128d3d70fad7de772266801f53a59fdc62c61 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Sat, 17 Aug 2024 13:39:54 +0300 Subject: [PATCH 23/33] Added conversion for scan period --- thingsboard_gateway/connectors/opcua/opcua_connector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index 6d51eb143..aacf07ff7 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -223,7 +223,7 @@ async def start_client(self): self.__log.error("Error on loading type definitions:\n %s", e) poll_period = int(self.__server_conf.get('pollPeriodInMillis', 5000) / 1000) - scan_period = self.__server_conf.get('scanPeriodInSec', 3600) + scan_period = int(self.__server_conf.get('scanPeriodInSec', 3600)) while not self.__stopped: if monotonic() >= self.__next_scan: From 33b8ca2b56897d4c31fdecab334e11b9a3ccebe8 Mon Sep 17 00:00:00 2001 From: imbeacon Date: Sat, 17 Aug 2024 16:02:55 +0300 Subject: [PATCH 24/33] Changed configuration conversion, fix for opcua canncelled error handling (removed disconnection if connector is not stopped), minor fixes in logger --- .../opcua/backward_compatibility_adapter.py | 21 ++++++++++++------- .../connectors/opcua/opcua_connector.py | 16 ++++++++------ thingsboard_gateway/tb_utility/tb_logger.py | 8 +++---- 3 files changed, 27 insertions(+), 18 deletions(-) diff --git a/thingsboard_gateway/connectors/opcua/backward_compatibility_adapter.py b/thingsboard_gateway/connectors/opcua/backward_compatibility_adapter.py index a15489e04..6e84af6e4 100644 --- a/thingsboard_gateway/connectors/opcua/backward_compatibility_adapter.py +++ b/thingsboard_gateway/connectors/opcua/backward_compatibility_adapter.py @@ -1,4 +1,4 @@ -from copy import copy +from copy import deepcopy import re @@ -11,13 +11,16 @@ class BackwardCompatibilityAdapter: } def __init__(self, config, logger): - self._config = copy(config) + self._config = deepcopy(config) self._log = logger def convert(self): - for node_config in self._config.get('server', {}).get('mapping', []): + mapping_configuration = deepcopy(self._config.get('server', {}).get('mapping', [])) + if not mapping_configuration: + return self._config + for node_config in mapping_configuration: try: - node_config['deviceNodeSource'] = self.get_value_source(node_config['deviceNodePattern']) + node_config['deviceNodeSource'] = self.get_value_source(node_config['deviceNodePattern'], False) device_type_pattern = node_config.pop('deviceTypePattern', 'default') device_name_pattern = node_config.pop('deviceNamePattern', None) @@ -60,16 +63,18 @@ def convert(self): self._log.error('Error during conversion: ', e) self._log.info('Config: ', node_config) - mapping = self._config.get('server', {}).get('mapping', []) - self._config['mapping'] = mapping + # Removing old mapping section + self._config['server'].pop('mapping') + # Adding new mapping section + self._config['mapping'] = mapping_configuration return self._config @staticmethod - def get_value_source(value): + def get_value_source(value, possible_constant=True): if re.search(r"(ns=\d+;[isgb]=[^}]+)", value): return 'identifier' - elif re.search(r"\${([A-Za-z.:\\\d]+)}", value): + elif re.search(r"\${([A-Za-z.:\\\d]+)}", value) or not possible_constant: return 'path' else: return 'constant' diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index aacf07ff7..71e34226c 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -19,6 +19,7 @@ from threading import Thread from time import sleep, monotonic from queue import Queue +from copy import deepcopy from thingsboard_gateway.connectors.connector import Connector from thingsboard_gateway.connectors.opcua.backward_compatibility_adapter import BackwardCompatibilityAdapter @@ -62,6 +63,7 @@ def __init__(self, gateway, config, connector_type): super().__init__() self._connector_type = connector_type self.__gateway = gateway + self.__original_config = deepcopy(config) self.__config = config self.__id = self.__config.get('id') self.name = self.__config.get("name", 'OPC-UA Connector ' + ''.join(choice(ascii_lowercase) for _ in range(5))) @@ -181,7 +183,7 @@ def is_stopped(self): return self.__stopped def get_config(self): - return self.__config + return self.__original_config def run(self): data_send_thread = Thread(name='Send Data Thread', target=self.__send_data, daemon=True) @@ -256,12 +258,14 @@ async def start_client(self): break except Exception as e: self.__log.exception("Error in main loop: %s", e) - break + if self.__stopped: + break finally: - if self.__connected: - await self.__client.disconnect() - self.__connected = False - await asyncio.sleep(1) + if self.__stopped: + if self.__connected: + await self.__client.disconnect() + self.__connected = False + await asyncio.sleep(.5) async def __set_auth_settings_by_cert(self): try: diff --git a/thingsboard_gateway/tb_utility/tb_logger.py b/thingsboard_gateway/tb_utility/tb_logger.py index 7d7d6a10b..aa0337195 100644 --- a/thingsboard_gateway/tb_utility/tb_logger.py +++ b/thingsboard_gateway/tb_utility/tb_logger.py @@ -104,7 +104,7 @@ def _send_errors(self): is_tb_client = hasattr(self._gateway, 'tb_client') sleep(1) - if not TbLogger.IS_ALL_ERRORS_COUNT_RESET and self._gateway.tb_client.is_connected(): + if not TbLogger.IS_ALL_ERRORS_COUNT_RESET and self._gateway.tb_client is not None and self._gateway.tb_client.is_connected(): self._gateway.tb_client.client.send_telemetry( {self.attr_name: 0, 'ALL_ERRORS_COUNT': 0}, quality_of_service=0) TbLogger.IS_ALL_ERRORS_COUNT_RESET = True @@ -148,6 +148,6 @@ def _send_error_count(self, error_attr_name=None): error_attr_name = error_attr_name + '_ERRORS_COUNT' else: error_attr_name = self.attr_name - - self._gateway.tb_client.client.send_telemetry( - {error_attr_name: self.errors, 'ALL_ERRORS_COUNT': TbLogger.ALL_ERRORS_COUNT}) + if self._gateway.tb_client is not None and self._gateway.tb_client.is_connected(): + self._gateway.tb_client.client.send_telemetry( + {error_attr_name: self.errors, 'ALL_ERRORS_COUNT': TbLogger.ALL_ERRORS_COUNT}) From b89c4b97e21286edeaaf483f015cbb5a835803df Mon Sep 17 00:00:00 2001 From: imbeacon Date: Sat, 17 Aug 2024 20:09:05 +0300 Subject: [PATCH 25/33] Fix for unintialized gateway main loop messages processing --- thingsboard_gateway/gateway/tb_gateway_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 50a671898..c5469f4e0 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -1038,7 +1038,7 @@ def __send_to_storage(self): data["deviceName"] = "currentThingsBoardGateway" data['deviceType'] = "gateway" - if self.__check_devices_idle: + if hasattr(self, "__check_devices_idle") and self.__check_devices_idle: self.__connected_devices[data['deviceName']]['last_receiving_data'] = time() data = self.__convert_telemetry_to_ts(data) From fb2f036016bc187b0cc4679d4452b659c68f14c6 Mon Sep 17 00:00:00 2001 From: samson0v Date: Mon, 19 Aug 2024 10:31:19 +0300 Subject: [PATCH 26/33] Fixed getting scan period --- thingsboard_gateway/connectors/opcua/opcua_connector.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index 71e34226c..a58135339 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -225,7 +225,9 @@ async def start_client(self): self.__log.error("Error on loading type definitions:\n %s", e) poll_period = int(self.__server_conf.get('pollPeriodInMillis', 5000) / 1000) - scan_period = int(self.__server_conf.get('scanPeriodInSec', 3600)) + scan_period = int( + self.__server_conf.get('scanPeriodInSec', + self.__server_conf.get('scanPeriodInMillis', 3600000) / 1000)) while not self.__stopped: if monotonic() >= self.__next_scan: From a5c85347ed97a91eef1ae798d0e694f84975421d Mon Sep 17 00:00:00 2001 From: samson0v Date: Mon, 19 Aug 2024 11:19:35 +0300 Subject: [PATCH 27/33] Renamed scanPeriodInSec to scanPeriodInMillis --- thingsboard_gateway/connectors/opcua/opcua_connector.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index a58135339..746d9a913 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -225,9 +225,7 @@ async def start_client(self): self.__log.error("Error on loading type definitions:\n %s", e) poll_period = int(self.__server_conf.get('pollPeriodInMillis', 5000) / 1000) - scan_period = int( - self.__server_conf.get('scanPeriodInSec', - self.__server_conf.get('scanPeriodInMillis', 3600000) / 1000)) + scan_period = int(self.__server_conf.get('scanPeriodInMillis', 3600000) / 1000) while not self.__stopped: if monotonic() >= self.__next_scan: From 0d6003750c2cf1282ab8c2df2fcc90ea5c60b8b7 Mon Sep 17 00:00:00 2001 From: samson0v Date: Mon, 19 Aug 2024 12:56:27 +0300 Subject: [PATCH 28/33] Fixed converting data from subscriptions --- thingsboard_gateway/connectors/opcua/opcua_connector.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index 746d9a913..4b180e973 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -388,8 +388,8 @@ def __convert_sub_data(self): for section in ('attributes', 'timeseries'): for node in device.values.get(section, []): if node.get('id') == sub_node.__str__(): - device.converter_for_sub.convert(config={'section': section, 'key': node['key']}, - val=data.monitored_item.Value) + device.converter_for_sub.convert({'section': section, 'key': node['key']}, + data.monitored_item.Value) converter_data = device.converter_for_sub.get_data() if converter_data: From fa5a0052d0744e61a264dcd7c1a6f0814214c711 Mon Sep 17 00:00:00 2001 From: samson0v Date: Mon, 19 Aug 2024 13:42:56 +0300 Subject: [PATCH 29/33] Fixed error handling in the main loop (BadTooManySessions) --- thingsboard_gateway/connectors/opcua/opcua_connector.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index 4b180e973..325ce1e8e 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -247,7 +247,6 @@ async def start_client(self): except asyncio.CancelledError as e: if self.__stopped: self.__log.debug('Task was cancelled due to connector stop: %s', e.__str__()) - break else: self.__log.exception('Task was cancelled: %s', e.__str__()) except UaStatusCodeError as e: @@ -255,11 +254,8 @@ async def start_client(self): if self.__connected: await self.__client.disconnect() self.__connected = False - break except Exception as e: self.__log.exception("Error in main loop: %s", e) - if self.__stopped: - break finally: if self.__stopped: if self.__connected: From d737c8bfb71de1359d58fee492f57d7f811666fa Mon Sep 17 00:00:00 2001 From: samson0v Date: Tue, 20 Aug 2024 11:06:20 +0300 Subject: [PATCH 30/33] Minor performance improving --- thingsboard_gateway/gateway/tb_client.py | 2 +- thingsboard_gateway/gateway/tb_gateway_service.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/thingsboard_gateway/gateway/tb_client.py b/thingsboard_gateway/gateway/tb_client.py index d79cabf0e..62657cdbe 100644 --- a/thingsboard_gateway/gateway/tb_client.py +++ b/thingsboard_gateway/gateway/tb_client.py @@ -273,7 +273,7 @@ def unsubscribe(self, subscription_id): self.client.gw_unsubscribe(subscription_id) self.client.unsubscribe_from_attribute(subscription_id) - def connect(self, min_reconnect_delay=10): + def connect(self, min_reconnect_delay=5): self.__paused = False self.__stopped = False self.__min_reconnect_delay = min_reconnect_delay diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index c5469f4e0..e302c14f8 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -1117,13 +1117,13 @@ def __convert_telemetry_to_ts(data): telemetry_with_ts = [] for item in data["telemetry"]: if item.get("ts") is None: - telemetry = {**telemetry, **item} + telemetry.update(item) else: if isinstance(item['ts'], int): - telemetry_with_ts.append({"ts": item["ts"], "values": {**item["values"]}}) + telemetry_with_ts.append({"ts": item["ts"], "values": item["values"]}) else: log.warning('Data has invalid TS (timestamp) format! Using generated TS instead.') - telemetry_with_ts.append({"ts": int(time() * 1000), "values": {**item["values"]}}) + telemetry_with_ts.append({"ts": int(time() * 1000), "values": item["values"]}) if telemetry_with_ts: data["telemetry"] = telemetry_with_ts From e32b93c5ef205cb01ce21d38b990b1bef42c77d3 Mon Sep 17 00:00:00 2001 From: samson0v Date: Tue, 20 Aug 2024 15:51:34 +0300 Subject: [PATCH 31/33] Avoiding received connector config mutating --- thingsboard_gateway/connectors/opcua/opcua_connector.py | 4 +--- thingsboard_gateway/gateway/tb_gateway_service.py | 3 ++- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/thingsboard_gateway/connectors/opcua/opcua_connector.py b/thingsboard_gateway/connectors/opcua/opcua_connector.py index 325ce1e8e..63a352747 100644 --- a/thingsboard_gateway/connectors/opcua/opcua_connector.py +++ b/thingsboard_gateway/connectors/opcua/opcua_connector.py @@ -19,7 +19,6 @@ from threading import Thread from time import sleep, monotonic from queue import Queue -from copy import deepcopy from thingsboard_gateway.connectors.connector import Connector from thingsboard_gateway.connectors.opcua.backward_compatibility_adapter import BackwardCompatibilityAdapter @@ -63,7 +62,6 @@ def __init__(self, gateway, config, connector_type): super().__init__() self._connector_type = connector_type self.__gateway = gateway - self.__original_config = deepcopy(config) self.__config = config self.__id = self.__config.get('id') self.name = self.__config.get("name", 'OPC-UA Connector ' + ''.join(choice(ascii_lowercase) for _ in range(5))) @@ -183,7 +181,7 @@ def is_stopped(self): return self.__stopped def get_config(self): - return self.__original_config + return self.__config def run(self): data_send_thread = Thread(name='Send Data Thread', target=self.__send_data, daemon=True) diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index e302c14f8..46472c2ae 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -27,6 +27,7 @@ from sys import argv, executable, getsizeof from threading import RLock, Thread, main_thread, current_thread from time import sleep, time, monotonic +from copy import deepcopy from simplejson import JSONDecodeError, dumps, load, loads from yaml import safe_load @@ -910,7 +911,7 @@ def __connect_with_connectors(self): if available_connector is None or available_connector.is_stopped(): connector = self._implemented_connectors[connector_type](self, - connector_config["config"][config], # noqa + deepcopy(connector_config["config"][config]), # noqa connector_type) connector.name = connector_name self.available_connectors_by_id[connector_id] = connector From 8c480ff58c23dfda1be1d12b3a28a13a8608bace Mon Sep 17 00:00:00 2001 From: imbeacon Date: Wed, 21 Aug 2024 07:15:11 +0300 Subject: [PATCH 32/33] Minor improvements for initialization --- thingsboard_gateway/gateway/tb_gateway_service.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index 46472c2ae..b34d90f5d 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -167,6 +167,7 @@ def __init__(self, config_file=None): self.__modify_main_config() log.info("Gateway starting...") + self._event_storage = self._event_storage_types[self.__config["storage"]["type"]](self.__config["storage"]) self.__duplicate_detector = DuplicateDetector(self.available_connectors_by_name) self.__updater = TBUpdater() self.version = self.__updater.get_version() @@ -210,7 +211,6 @@ def __init__(self, config_file=None): self.__rpc_to_devices_processing_thread = Thread(target=self.__rpc_to_devices_processing, daemon=True, name="RPC to devices processing thread") self.__rpc_to_devices_processing_thread.start() - self._event_storage = self._event_storage_types[self.__config["storage"]["type"]](self.__config["storage"]) self.init_grpc_service(self.__config.get('grpc')) @@ -1487,12 +1487,8 @@ def add_device_async(self, data): return Status.FAILURE def add_device(self, device_name, content, device_type=None): - # if (device_name not in self.__added_devices - # or device_name not in self.__connected_devices - # or device_name not in self.__saved_devices - # or monotonic() - self.__added_devices[device_name]["last_send_ts"] > 60 - # or (self.__added_devices[device_name]["device_details"]["connectorName"] != content['connector'].get_name() # noqa E501 - # or self.__added_devices[device_name]["device_details"]["connectorType"] != content['connector'].get_type())): # noqa E501 + if self.tb_client is None or not self.tb_client.is_connected(): + return device_type = device_type if device_type is not None else 'default' self.__connected_devices[device_name] = {**content, DEVICE_TYPE_PARAMETER: device_type} @@ -1631,8 +1627,9 @@ def __load_persistent_devices(self): self.__renamed_devices[device_name] = new_device_name self.__connected_devices[device_name] = device_data_to_save for device in list(self.__connected_devices.keys()): - self.add_device(device, self.__connected_devices[device], self.__connected_devices[device][ - DEVICE_TYPE_PARAMETER]) + if device in self.__connected_devices: + self.add_device(device, self.__connected_devices[device], self.__connected_devices[device][ + DEVICE_TYPE_PARAMETER]) self.__saved_devices[device_name] = device_data_to_save except Exception as e: From a1283dedb887961c92e23d1628ad27d9fa259c5d Mon Sep 17 00:00:00 2001 From: samson0v Date: Wed, 21 Aug 2024 09:41:48 +0300 Subject: [PATCH 33/33] Small fixes --- build_docker.sh => build_latest_docker.sh | 4 +--- thingsboard_gateway/gateway/tb_gateway_service.py | 14 +++++++------- 2 files changed, 8 insertions(+), 10 deletions(-) rename build_docker.sh => build_latest_docker.sh (82%) diff --git a/build_docker.sh b/build_latest_docker.sh similarity index 82% rename from build_docker.sh rename to build_latest_docker.sh index 2e1cadd70..69a7e4f6d 100755 --- a/build_docker.sh +++ b/build_latest_docker.sh @@ -25,11 +25,9 @@ VERSION_TAG="${BRANCH_NAME}-${COMMIT_ID}" echo "$(date) Building project with version tag $VERSION_TAG ..." set -x -#docker build . -t sevlamat/tb-gateway:$VERSION_TAG -f docker/Dockerfile -o type=registry - # multi arch DOCKER_CLI_EXPERIMENTAL=enabled \ - docker buildx build . -t sevlamat/tb-gateway:$VERSION_TAG -f docker/Dockerfile --platform=linux/amd64,linux/arm64 -o type=registry + docker buildx build . -t tb-gateway:$VERSION_TAG -f docker/Dockerfile --platform=linux/amd64,linux/arm64 -o type=registry set +x echo "$(date) Done." diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index b34d90f5d..9068dc59e 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -209,7 +209,7 @@ def __init__(self, config_file=None): name="RPC processing thread") self.__rpc_processing_thread.start() self.__rpc_to_devices_processing_thread = Thread(target=self.__rpc_to_devices_processing, daemon=True, - name="RPC to devices processing thread") + name="RPC to devices processing thread") self.__rpc_to_devices_processing_thread.start() self.init_grpc_service(self.__config.get('grpc')) @@ -223,10 +223,10 @@ def __init__(self, config_file=None): self.init_statistics_service(self.__config['thingsboard'].get('statistics', DEFAULT_STATISTIC)) - self.__min_pack_send_delay_ms = 10#self.__config['thingsboard'].get('minPackSendDelayMS', 200) + self.__min_pack_send_delay_ms = self.__config['thingsboard'].get('minPackSendDelayMS', 50) self.__min_pack_send_delay_ms = self.__min_pack_send_delay_ms / 1000.0 - self.__min_pack_size_to_send = 1000 #self.__config['thingsboard'].get('minPackSizeToSend', 50) - self.__max_payload_size_in_bytes = 1_000_000 #self.__config["thingsboard"].get("maxPayloadSizeBytes", 400) + self.__min_pack_size_to_send = self.__config['thingsboard'].get('minPackSizeToSend', 500) + self.__max_payload_size_in_bytes = self.__config["thingsboard"].get("maxPayloadSizeBytes", 8196) self._send_thread = Thread(target=self.__read_data_from_storage, daemon=True, name="Send data to Thingsboard Thread") @@ -674,7 +674,7 @@ def __process_deleted_gateway_devices(self, deleted_device_name: str): self.__duplicate_detector.delete_device(deleted_device_name) self.__save_persistent_devices() self.__load_persistent_devices() - return True + return {'success': True} def __process_renamed_gateway_devices(self, renamed_device: dict): if self.__config.get('handleDeviceRenaming', True): @@ -693,7 +693,7 @@ def __process_renamed_gateway_devices(self, renamed_device: dict): else: log.debug("Received renamed device notification %r, but device renaming handle is disabled", renamed_device) - return True + return {'success': True} def __process_remote_configuration(self, new_configuration): if new_configuration is not None and self.__remote_configurator is not None: @@ -1044,7 +1044,7 @@ def __send_to_storage(self): data = self.__convert_telemetry_to_ts(data) - max_data_size = 1000000 #self.__config["thingsboard"].get("maxPayloadSizeBytes", 400) + max_data_size = self.__max_payload_size_in_bytes if self.__get_data_size(data) >= max_data_size: # Data is too large, so we will attempt to send in pieces adopted_data = {"deviceName": data['deviceName'],