diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index a09e10b2..9d656c1d 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -111,6 +111,7 @@ class TBGRPCServerManager: CUSTOM_RPC_DIR = "/etc/thingsboard-gateway/rpc" + def load_file(path_to_file): with open(path_to_file, 'r') as target_file: content = load(target_file) @@ -187,8 +188,12 @@ def __init__(self, config_file=None): storage_log = logging.getLogger('storage') storage_log.setLevel('INFO') storage_log.addHandler(self.main_handler) - self._event_storage = self._event_storage_types[self.__config["storage"]["type"]](self.__config["storage"], storage_log) - self._report_strategy_service = ReportStrategyService(self.__config['thingsboard'], self, self.__converted_data_queue, log) + self._event_storage = self._event_storage_types[self.__config["storage"]["type"]](self.__config["storage"], + storage_log) + self._report_strategy_service = ReportStrategyService(self.__config['thingsboard'], + self, + self.__converted_data_queue, + log) self.__updater = TBUpdater() self.version = self.__updater.get_version() log.info("ThingsBoard IoT gateway version: %s", self.version["current_version"]) @@ -539,7 +544,8 @@ def _watchers(self): self.cancel_rpc_request(rpc_in_progress) self.__rpc_requests_in_progress[rpc_in_progress] = "del" new_rpc_request_in_progress = {key: value for key, value in - self.__rpc_requests_in_progress.items() if value != 'del'} + self.__rpc_requests_in_progress.items() if value != 'del' + } if not self.__rpc_register_queue.empty(): new_rpc_request_in_progress = self.__rpc_requests_in_progress rpc_request_from_queue = self.__rpc_register_queue.get(False) @@ -875,7 +881,7 @@ def _load_connectors(self, config=None): if connector_type != "grpc": connector_class = None if connector_config_from_main.get('useGRPC', False): - module_name = f'Grpc{self._default_connectors.get(connector_type, connector_config_from_main.get("class"))}' + module_name = f'Grpc{self._default_connectors.get(connector_type, connector_config_from_main.get("class"))}' # noqa connector_class = TBModuleLoader.import_module(connector_type, module_name) if self.__grpc_manager and self.__grpc_manager.is_alive() and connector_class: @@ -957,8 +963,8 @@ def _load_connectors(self, config=None): "config_updated": stat(connector_config_file_path), "config_file_path": connector_config_file_path, "grpc_key": connector_persistent_key} - if isinstance(connector_conf_from_file, dict) and connector_conf_from_file.get(REPORT_STRATEGY_PARAMETER) is not None: - connector_configuration_local[REPORT_STRATEGY_PARAMETER] = connector_conf_from_file[REPORT_STRATEGY_PARAMETER] + if isinstance(connector_conf_from_file, dict) and connector_conf_from_file.get(REPORT_STRATEGY_PARAMETER) is not None: # noqa + connector_configuration_local[REPORT_STRATEGY_PARAMETER] = connector_conf_from_file[REPORT_STRATEGY_PARAMETER] # noqa self.connectors_configs[connector_type].append(connector_configuration_local) except Exception as e: log.exception("Error on loading connector: %r", e) @@ -1016,11 +1022,13 @@ def __connect_with_connectors(self): self.available_connectors_by_id[connector_id] = connector self.available_connectors_by_name[connector_name] = connector try: - report_strategy_config_connector = connector_config[CONFIG_SECTION_PARAMETER][config].pop(REPORT_STRATEGY_PARAMETER, None) - connector_report_strategy = ReportStrategyConfig(report_strategy_config_connector) - self._report_strategy_service.register_connector_report_strategy(connector_name, connector_id, connector_report_strategy) + report_strategy_config_connector = connector_config[CONFIG_SECTION_PARAMETER][config].pop(REPORT_STRATEGY_PARAMETER, None) # noqa + connector_report_strategy = ReportStrategyConfig(report_strategy_config_connector) # noqa + self._report_strategy_service.register_connector_report_strategy(connector_name, connector_id, connector_report_strategy) # noqa except ValueError: - log.info("Cannot find separated report strategy for connector %r. The main report strategy will be used as a connector report strategy.", + log.info("Cannot find separated report strategy for connector %r. \ + The main report strategy \ + will be used as a connector report strategy.", connector_name) self.__update_connector_devices(connector) self.__cleanup_connectors() @@ -1089,7 +1097,7 @@ def check_connector_configuration_updates(self): if self.__remote_configurator is not None: self.__remote_configurator.send_current_configuration() - def send_to_storage(self, connector_name, connector_id, data: Union[dict, ConvertedData]=None): + def send_to_storage(self, connector_name, connector_id, data: Union[dict, ConvertedData] = None): if data is None: log.error("[%r]Data is empty from connector %r!", connector_id, connector_name) try: @@ -1136,9 +1144,9 @@ def __send_to_storage(self): log.debug("Data from %s connector was sent to storage: %r", connector_name, data_array) current_time = int(time() * 1000) if self.__latency_debug_mode and event.metadata.get("sendToStorageTs"): - log.debug("Event was in queue for %r ms", current_time - event.metadata.get("sendToStorageTs")) + log.debug("Event was in queue for %r ms", current_time - event.metadata.get("sendToStorageTs")) # noqa if self.__latency_debug_mode and event.metadata.get(DATA_RETRIEVING_STARTED): - log.debug("Data retrieving and conversion took %r ms", current_time - event.metadata.get(DATA_RETRIEVING_STARTED)) + log.debug("Data retrieving and conversion took %r ms", current_time - event.metadata.get(DATA_RETRIEVING_STARTED)) # noqa else: self.__send_to_storage_old_formatted_data(connector_name, connector_id, data_array) @@ -1162,17 +1170,17 @@ def __send_to_storage_new_formatted_data(self, connector_name, connector_id, dat if data.device_name in self.__renamed_devices: data.device_name = self.__renamed_devices[data.device_name] if self.tb_client.is_connected() and (data.device_name not in self.get_devices() or - data.device_name not in self.__connected_devices): - if self.available_connectors_by_id.get(connector_id) is not None: - self.add_device(data.device_name, - {CONNECTOR_PARAMETER: self.available_connectors_by_id[connector_id]}, - device_type=data.device_type) - elif self.available_connectors_by_name.get(connector_name) is not None: - self.add_device(data.device_name, - {CONNECTOR_PARAMETER: self.available_connectors_by_name[connector_name]}, - device_type=data.device_type) - else: - log.error("Connector %s is not available!", connector_name) + data.device_name not in self.__connected_devices): + if self.available_connectors_by_id.get(connector_id) is not None: + self.add_device(data.device_name, + {CONNECTOR_PARAMETER: self.available_connectors_by_id[connector_id]}, + device_type=data.device_type) + elif self.available_connectors_by_name.get(connector_name) is not None: + self.add_device(data.device_name, + {CONNECTOR_PARAMETER: self.available_connectors_by_name[connector_name]}, + device_type=data.device_type) + else: + log.error("Connector %s is not available!", connector_name) if not self.__connector_incoming_messages.get(connector_id): self.__connector_incoming_messages[connector_id] = 0 @@ -1185,17 +1193,19 @@ def __send_to_storage_new_formatted_data(self, connector_name, connector_id, dat adopted_data_max_entry_size = max_data_size - DEBUG_METADATA_TEMPLATE_SIZE - len(connector_name) \ if self.__latency_debug_mode else max_data_size start_splitting = int(time() * 1000) - adopted_data: List[ConvertedData] = data.convert_to_objects_with_maximal_size(adopted_data_max_entry_size) + adopted_data: List[ConvertedData] = data.convert_to_objects_with_maximal_size(adopted_data_max_entry_size) # noqa end_splitting = int(time() * 1000) if self.__latency_debug_mode: log.trace("Data splitting took %r ms, telemetry datapoints count: %r, attributes count: %r", - end_splitting - start_splitting, data.telemetry_datapoints_count, data.attributes_datapoints_count) + end_splitting - start_splitting, + data.telemetry_datapoints_count, + data.attributes_datapoints_count) if self.__latency_debug_mode and data.metadata.get("receivedTs"): - log.debug("Data processing before sending to storage took %r ms", end_splitting - data.metadata.get("receivedTs", 0)) + log.debug("Data processing before sending to storage took %r ms", + end_splitting - data.metadata.get("receivedTs", 0)) for adopted_data_entry in adopted_data: self.__send_data_pack_to_storage(adopted_data_entry, connector_name, connector_id) - def __send_to_storage_old_formatted_data(self, connector_name, connector_id, data_array): max_data_size = self.get_max_payload_size_bytes() for data in data_array: @@ -1213,7 +1223,7 @@ def __send_to_storage_old_formatted_data(self, connector_name, connector_id, dat if data["deviceName"] in self.__renamed_devices: data["deviceName"] = self.__renamed_devices[data["deviceName"]] if self.tb_client.is_connected() and (data["deviceName"] not in self.get_devices() or - data["deviceName"] not in self.__connected_devices): + data["deviceName"] not in self.__connected_devices): if self.available_connectors_by_id.get(connector_id) is not None: self.add_device(data["deviceName"], {CONNECTOR_PARAMETER: self.available_connectors_by_id[connector_id]}, @@ -1337,7 +1347,7 @@ def __send_data_pack_to_storage(self, data, connector_name, connector_id=None): def check_size(self, devices_data_in_event_pack, current_data_pack_size, item_size): - if current_data_pack_size + item_size >= self.get_max_payload_size_bytes() - max(100, self.get_max_payload_size_bytes()/10): + if current_data_pack_size + item_size >= self.get_max_payload_size_bytes() - max(100, self.get_max_payload_size_bytes()/10): # noqa current_data_pack_size = TBUtility.get_data_size(devices_data_in_event_pack) else: current_data_pack_size += item_size @@ -1401,13 +1411,17 @@ def __read_data_from_storage(self): for item in current_event["telemetry"]: if has_metadata and item.get('ts'): item.update({'metadata': current_event.get('metadata')}) - current_event_pack_data_size = self.check_size(devices_data_in_event_pack, current_event_pack_data_size, TBUtility.get_data_size(item)) + current_event_pack_data_size = self.check_size(devices_data_in_event_pack, + current_event_pack_data_size, + TBUtility.get_data_size(item)) devices_data_in_event_pack[current_event["deviceName"]]["telemetry"].append(item) # noqa telemetry_dp_count += len(item.get('values', [])) else: if has_metadata and current_event["telemetry"].get('ts'): current_event["telemetry"].update({'metadata': current_event.get('metadata')}) - current_event_pack_data_size = self.check_size(devices_data_in_event_pack, current_event_pack_data_size, TBUtility.get_data_size(current_event["telemetry"])) + current_event_pack_data_size = self.check_size(devices_data_in_event_pack, + current_event_pack_data_size, + TBUtility.get_data_size(current_event["telemetry"])) # noqa devices_data_in_event_pack[current_event["deviceName"]]["telemetry"].append(current_event["telemetry"]) # noqa telemetry_dp_count += len(current_event["telemetry"].get('values', [])) # log.debug("Processing telemetry in event took %r seconds.", time() - start_processing_telemetry_in_event) # noqa @@ -1415,11 +1429,15 @@ def __read_data_from_storage(self): if current_event.get("attributes"): if isinstance(current_event["attributes"], list): for item in current_event["attributes"]: - current_event_pack_data_size = self.check_size(devices_data_in_event_pack, current_event_pack_data_size, TBUtility.get_data_size(item)) + current_event_pack_data_size = self.check_size(devices_data_in_event_pack, + current_event_pack_data_size, + TBUtility.get_data_size(item)) devices_data_in_event_pack[current_event["deviceName"]]["attributes"].update(item.items()) # noqa attribute_dp_count += 1 else: - current_event_pack_data_size = self.check_size(devices_data_in_event_pack, current_event_pack_data_size, TBUtility.get_data_size(current_event["attributes"])) + current_event_pack_data_size = self.check_size(devices_data_in_event_pack, + current_event_pack_data_size, + TBUtility.get_data_size(current_event["attributes"])) # noqa devices_data_in_event_pack[current_event["deviceName"]]["attributes"].update(current_event["attributes"].items()) # noqa attribute_dp_count += 1 @@ -1433,9 +1451,9 @@ def __read_data_from_storage(self): pack_processing_time = int((time() - start_pack_processing) * 1000) average_event_processing_time = (pack_processing_time / events_len) if average_event_processing_time < 1.0: - average_event_processing_time_str = f"{average_event_processing_time * 1000:.2f} microseconds." + average_event_processing_time_str = f"{average_event_processing_time * 1000:.2f} microseconds." # noqa else: - average_event_processing_time_str = f"{average_event_processing_time:.2f} milliseconds." + average_event_processing_time_str = f"{average_event_processing_time:.2f} milliseconds." # noqa log.debug("Sending data to ThingsBoard, pack size %i processing took %i ,milliseconds. Average event processing time is %s", # noqa events_len, pack_processing_time, @@ -1481,7 +1499,8 @@ def __handle_published_events(self): futures = [] try: - if self.tb_client.is_connected() and (self.__remote_configurator is None or not self.__remote_configurator.in_process): + if self.tb_client.is_connected() and (self.__remote_configurator is None or + not self.__remote_configurator.in_process): qos = self.tb_client.client.quality_of_service if qos == 1: futures = list(self.__messages_confirmation_executor.map(self.__process_published_event, events)) @@ -1559,7 +1578,7 @@ def _rpc_request_handler(self, request_id, content): connector_name) content['id'] = request_id result = self.available_connectors_by_name[connector_name].server_side_rpc_handler(content) # noqa E501 - elif module == 'gateway' or (self.__remote_shell and module in self.__remote_shell.shell_commands): + elif module == 'gateway' or (self.__remote_shell and module in self.__remote_shell.shell_commands): # noqa result = self.__rpc_gateway_processing(request_id, content) else: log.error("Connector \"%s\" not found", module) @@ -1782,8 +1801,9 @@ def add_device(self, device_name, content, device_type=None): connector_type = content['connector'].get_type() connector_name = content['connector'].get_name() try: - if self.__added_devices.get(device_name) is None or (self.__added_devices[device_name]['device_details']['connectorType'] != connector_type - or self.__added_devices[device_name]['device_details']['connectorName'] != connector_name): + if (self.__added_devices.get(device_name) is None + or (self.__added_devices[device_name]['device_details']['connectorType'] != connector_type + or self.__added_devices[device_name]['device_details']['connectorName'] != connector_name)): device_details = { 'connectorType': connector_type, 'connectorName': connector_name @@ -1797,7 +1817,7 @@ def add_device(self, device_name, content, device_type=None): return True - def update_device(self, device_name, event, content): + def update_device(self, device_name, event, content: Connector): should_save = False if self.__connected_devices.get(device_name) is None: return @@ -1808,9 +1828,10 @@ def update_device(self, device_name, event, content): if should_save: self.__save_persistent_devices() info_to_send = { - DatapointKey("connectorName", ReportStrategyConfig({"type": ReportStrategy.ON_RECEIVED.name})): content.get_name() + DatapointKey("connectorName", ReportStrategyConfig({"type": ReportStrategy.ON_RECEIVED.name})): + content.get_name() } - if device_name in self.__connected_devices: # TODO: check for possible race condition + if device_name in self.__connected_devices: # TODO: check for possible race condition self.send_to_storage(connector_name=content.get_name(), connector_id=content.get_id(), data={"deviceName": device_name, @@ -2101,6 +2122,7 @@ def import_custom_rpc_methods(self, module_name, module_path): # Add the method to the __gateway_rpc_methods dictionary self.__gateway_rpc_methods[attr_name.replace("__rpc_", "")] = attr.__get__(self) + if __name__ == '__main__': TBGatewayService( path.dirname(path.dirname(path.abspath(__file__))) + '/config/tb_gateway.json'.replace('/', path.sep))