Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added handling rpc methods to connectors on the new UI #1452

Merged
merged 1 commit into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 37 additions & 13 deletions thingsboard_gateway/connectors/modbus/modbus_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ def server_side_rpc_handler(self, server_rpc_request):
if connector_type == self._connector_type:
rpc_method = rpc_method_name
server_rpc_request['device'] = server_rpc_request['params'].split(' ')[0].split('=')[-1]
except (IndexError, ValueError):
except (IndexError, ValueError, AttributeError):
pass

if server_rpc_request.get(DEVICE_SECTION_PARAMETER) is not None:
Expand Down Expand Up @@ -679,17 +679,24 @@ def server_side_rpc_handler(self, server_rpc_request):
break
else:
self.__log.error("Received rpc request, but method %s not found in config for %s.",
rpc_method,
self.get_name())
rpc_method,
self.get_name())
self.__gateway.send_rpc_reply(server_rpc_request[DEVICE_SECTION_PARAMETER],
server_rpc_request[DATA_PARAMETER][RPC_ID_PARAMETER],
{rpc_method: "METHOD NOT FOUND!"})
else:
self.__log.debug("Received RPC to connector: %r", server_rpc_request)
results = []
for device in self.__slaves:
server_rpc_request[DEVICE_SECTION_PARAMETER] = device.device_name
results.append(self.__process_request(server_rpc_request, server_rpc_request['params'], return_result=True))

return results

except Exception as e:
self.__log.exception(e)

def __process_request(self, content, rpc_command_config, request_type='RPC'):
def __process_request(self, content, rpc_command_config, request_type='RPC', return_result=False):
self.__log.debug('Processing %s request', request_type)
if rpc_command_config is not None:
device = ModbusConnector.__get_device_by_name(content[DEVICE_SECTION_PARAMETER], self.__slaves)
Expand Down Expand Up @@ -745,16 +752,33 @@ def __process_request(self, content, rpc_command_config, request_type='RPC'):
if content.get(RPC_ID_PARAMETER) or (content.get(DATA_PARAMETER) is not None
and content[DATA_PARAMETER].get(RPC_ID_PARAMETER)) is not None:
if isinstance(response, Exception) or isinstance(response, ExceptionResponse):
self.__gateway.send_rpc_reply(device=content[DEVICE_SECTION_PARAMETER],
req_id=content[DATA_PARAMETER].get(RPC_ID_PARAMETER),
content={
content[DATA_PARAMETER][RPC_METHOD_PARAMETER]: str(response)
},
success_sent=False)
if not return_result:
self.__gateway.send_rpc_reply(device=content[DEVICE_SECTION_PARAMETER],
req_id=content[DATA_PARAMETER].get(RPC_ID_PARAMETER),
content={
content[DATA_PARAMETER][RPC_METHOD_PARAMETER]: str(response)
},
success_sent=False)
else:
return {
'device': content[DEVICE_SECTION_PARAMETER],
'req_id': content[DATA_PARAMETER].get(RPC_ID_PARAMETER),
'content': {
content[DATA_PARAMETER][RPC_METHOD_PARAMETER]: str(response)
},
'success_sent': False
}
else:
self.__gateway.send_rpc_reply(device=content[DEVICE_SECTION_PARAMETER],
req_id=content[DATA_PARAMETER].get(RPC_ID_PARAMETER),
content=response)
if not return_result:
self.__gateway.send_rpc_reply(device=content[DEVICE_SECTION_PARAMETER],
req_id=content[DATA_PARAMETER].get(RPC_ID_PARAMETER),
content=response)
else:
return {
'device': content[DEVICE_SECTION_PARAMETER],
'req_id': content[DATA_PARAMETER].get(RPC_ID_PARAMETER),
'content': response
}

self.__log.debug("%r", response)

Expand Down
43 changes: 23 additions & 20 deletions thingsboard_gateway/connectors/mqtt/mqtt_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ def __process_rpc_request(self, content, rpc_config):
.replace("${methodName}", str(content['data']['method'])) \
.replace("${requestId}", str(content["data"]["id"]))

if content['device']:
if content.get('device'):
request_topic = request_topic.replace("${deviceName}", str(content["device"]))

request_topic = TBUtility.replace_params_tags(request_topic, content)
Expand Down Expand Up @@ -882,7 +882,7 @@ def __process_rpc_request(self, content, rpc_config):
return
if not expects_response or not defines_timeout:
self.__log.info("One-way RPC: sending ack to ThingsBoard immediately")
self.__gateway.send_rpc_reply(device=content["device"], req_id=content["data"]["id"],
self.__gateway.send_rpc_reply(device=content.get('device', ''), req_id=content["data"]["id"],
success_sent=True)

# Everything went out smoothly: RPC is served
Expand All @@ -907,28 +907,31 @@ def server_side_rpc_handler(self, content):
except ValueError:
pass

# check if RPC method is reserved get/set
if rpc_method == 'get' or rpc_method == 'set':
params = {}
for param in content['data']['params'].split(';'):
try:
(key, value) = param.split('=')
except ValueError:
continue
if content.get('device'):
# check if RPC method is reserved get/set
if rpc_method == 'get' or rpc_method == 'set':
params = {}
for param in content['data']['params'].split(';'):
try:
(key, value) = param.split('=')
except ValueError:
continue

if key and value:
params[key] = value
if key and value:
params[key] = value

return self.__process_rpc_request(content, params)
else:
# Check whether one of my RPC handlers can handle this request
for rpc_config in self.__server_side_rpc:
if search(rpc_config["deviceNameFilter"], content["device"]) \
and search(rpc_config["methodFilter"], rpc_method) is not None:
return self.__process_rpc_request(content, params)
else:
# Check whether one of my RPC handlers can handle this request
for rpc_config in self.__server_side_rpc:
if search(rpc_config["deviceNameFilter"], content["device"]) \
and search(rpc_config["methodFilter"], rpc_method) is not None:

return self.__process_rpc_request(content, rpc_config)
return self.__process_rpc_request(content, rpc_config)

self.__log.error("RPC not handled: %s", content)
self.__log.error("RPC not handled: %s", content)
else:
return self.__process_rpc_request(content, content['data']['params'])

@CustomCollectStatistics(start_stat_type='allBytesSentToDevices')
def _publish(self, request_topic, data_to_send, retain):
Expand Down
152 changes: 89 additions & 63 deletions thingsboard_gateway/connectors/opcua/opcua_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ def on_attributes_update(self, content):
def server_side_rpc_handler(self, content):
try:
if content.get('data') is None:
content['data'] = {'params': content['params'], 'method': content['method']}
content['data'] = {'params': content['params'], 'method': content['method'], 'id': content['id']}

rpc_method = content["data"].get("method")

Expand All @@ -528,76 +528,102 @@ def server_side_rpc_handler(self, content):
content['device'] = content['params'].split(' ')[0].split('=')[-1]
except (ValueError, IndexError):
self.__log.error('Invalid RPC method name: %s', rpc_method)
except AttributeError:
pass

# firstly check if a method is not service
if rpc_method == 'set' or rpc_method == 'get':
full_path = ''
args_list = []
device = content.get('device')
if content.get('device'):
# firstly check if a method is not service
if rpc_method == 'set' or rpc_method == 'get':
full_path = ''
args_list = []
device = content.get('device')

try:
args_list = content['data']['params'].split(';')
try:
args_list = content['data']['params'].split(';')

if 'ns' in content['data']['params']:
full_path = ';'.join(
[item for item in (args_list[0:-1] if rpc_method == 'set' else args_list)])
else:
full_path = args_list[0].split('=')[-1]
except IndexError:
self.__log.error('Not enough arguments. Expected min 2.')
self.__gateway.send_rpc_reply(device=device,
req_id=content['data'].get('id'),
content={content['data'][
'method']: 'Not enough arguments. Expected min 2.',
'code': 400})

result = {}
if rpc_method == 'get':
task = self.__loop.create_task(self.__read_value(full_path, result))

while not task.done():
sleep(.2)
elif rpc_method == 'set':
value = args_list[2].split('=')[-1]
task = self.__loop.create_task(self.__write_value(full_path, value, result))

while not task.done():
sleep(.2)

if 'ns' in content['data']['params']:
full_path = ';'.join([item for item in (args_list[0:-1] if rpc_method == 'set' else args_list)])
else:
full_path = args_list[0].split('=')[-1]
except IndexError:
self.__log.error('Not enough arguments. Expected min 2.')
self.__gateway.send_rpc_reply(device=device,
req_id=content['data'].get('id'),
content={content['data'][
'method']: 'Not enough arguments. Expected min 2.',
'code': 400})

result = {}
if rpc_method == 'get':
task = self.__loop.create_task(self.__read_value(full_path, result))

while not task.done():
sleep(.2)
elif rpc_method == 'set':
value = args_list[2].split('=')[-1]
task = self.__loop.create_task(self.__write_value(full_path, value, result))

while not task.done():
sleep(.2)

self.__gateway.send_rpc_reply(device=device,
req_id=content['data'].get('id'),
content={content['data']['method']: result})
else:
device = tuple(filter(lambda i: i.name == content['device'], self.__device_nodes))[0]

for rpc in device.config['rpc_methods']:
if rpc['method'] == content["data"]['method']:
arguments_from_config = rpc["arguments"]
arguments = content["data"].get("params") if content["data"].get(
"params") is not None else arguments_from_config
method_name = content['data']['method']

try:
result = {}
task = self.__loop.create_task(
self.__call_method(device.path, method_name, arguments, result))

while not task.done():
sleep(.2)

self.__gateway.send_rpc_reply(content["device"],
content["data"]["id"],
{content["data"]["method"]: result, "code": 200})
self.__log.debug("method %s result is: %s", rpc['method'], result)
except Exception as e:
self.__log.exception(e)
content={content['data']['method']: result})
else:
device = tuple(filter(lambda i: i.name == content['device'], self.__device_nodes))[0]

for rpc in device.config['rpc_methods']:
if rpc['method'] == content["data"]['method']:
arguments_from_config = rpc["arguments"]
arguments = content["data"].get("params") if content["data"].get(
"params") is not None else arguments_from_config
method_name = content['data']['method']

try:
result = {}
task = self.__loop.create_task(
self.__call_method(device.path, method_name, arguments, result))

while not task.done():
sleep(.2)

self.__gateway.send_rpc_reply(content["device"],
content["data"]["id"],
{content["data"]["method"]: result, "code": 200})
self.__log.debug("method %s result is: %s", rpc['method'], result)
except Exception as e:
self.__log.exception(e)
self.__gateway.send_rpc_reply(content["device"], content["data"]["id"],
{"error": str(e), "code": 500})
else:
self.__log.error("Method %s not found for device %s", rpc_method, content["device"])
self.__gateway.send_rpc_reply(content["device"], content["data"]["id"],
{"error": str(e), "code": 500})
else:
self.__log.error("Method %s not found for device %s", rpc_method, content["device"])
{"error": "%s - Method not found" % rpc_method,
"code": 404})
else:
results = []
for device in self.__device_nodes:
content['device'] = device.name

arguments = content['data']['params']["arguments"]

try:
result = {}
task = self.__loop.create_task(
self.__call_method(device.path, rpc_method, arguments, result))

while not task.done():
sleep(.2)

results.append(result)
self.__log.debug("method %s result is: %s", rpc_method, result)
except Exception as e:
self.__log.exception(e)
self.__gateway.send_rpc_reply(content["device"], content["data"]["id"],
{"error": "%s - Method not found" % rpc_method,
"code": 404})
{"error": str(e), "code": 500})

return results
except Exception as e:
self.__log.exception(e)

Expand Down
2 changes: 1 addition & 1 deletion thingsboard_gateway/gateway/tb_gateway_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -1242,7 +1242,7 @@ def _rpc_request_handler(self, request_id, content):
connector_name)
content['id'] = request_id
result = self.available_connectors_by_name[connector_name].server_side_rpc_handler(content) # noqa E501
elif module == 'gateway' or module in self.__remote_shell.shell_commands:
elif module == 'gateway' or (self.__remote_shell and module in self.__remote_shell.shell_commands):
result = self.__rpc_gateway_processing(request_id, content)
else:
log.error("Connector \"%s\" not found", module)
Expand Down
Loading