Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connectors/opcua/high load #1501

Merged
merged 37 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
7d433cd
Refactored OPC-UA connector
samson0v Aug 14, 2024
ef3ce44
Merge pull request #1500 from samson0v/connectors/opcua/optimization
samson0v Aug 14, 2024
e9683a0
Added handling for non primitive types
imbeacon Aug 14, 2024
91cb991
Fix for getting connector configuration from main configuration file
imbeacon Aug 15, 2024
2fd84ed
Adjusted non-primitive parsing
imbeacon Aug 15, 2024
9d18a0a
build docker script added. requirement for asyncua added
smatvienko-tb Aug 14, 2024
23c3fc4
Merge pull request #1502 from smatvienko-tb/connectors/opcua/high-load
imbeacon Aug 15, 2024
4f8b1f4
Adjusted batch reading, changed method of reading to get correct proc…
samson0v Aug 15, 2024
fe972e4
docker python12 and cleanup
smatvienko-tb Aug 15, 2024
917860e
Merge pull request #1503 from smatvienko-tb/docker-python12-and-cleanup
imbeacon Aug 15, 2024
db7cc12
Fixed passing arguments
samson0v Aug 15, 2024
7ccf445
Fixed attribute error
samson0v Aug 15, 2024
1d2db94
Merge branch 'master' into connectors/opcua/high-load
samson0v Aug 15, 2024
84a6d02
Added processing primitive types
samson0v Aug 15, 2024
ed0fe1e
Added additional log
samson0v Aug 15, 2024
1442a2a
Added error handling
samson0v Aug 15, 2024
27772d2
Fixed list data converting
samson0v Aug 15, 2024
ca4b0ae
Fixed BadNothingToDo error
samson0v Aug 15, 2024
3a47629
Hardcoded queue parameters
imbeacon Aug 15, 2024
d61788d
Changed polling delay logic
imbeacon Aug 15, 2024
05dfc5b
Added pollPeriodInMillis parameter, renamed scanPeriodInSec parameter…
samson0v Aug 16, 2024
ef8acad
Updated default values in tb_gateway.json
samson0v Aug 16, 2024
58fbf71
Fix for asyncua library installation at runtime
imbeacon Aug 16, 2024
4ed867e
Fix for loosing mapping array
imbeacon Aug 16, 2024
983d570
Reduced sleeping and refactored max payload size processing configura…
imbeacon Aug 16, 2024
2f79b61
Added session timeout for OCPUA client
imbeacon Aug 17, 2024
267128d
Added conversion for scan period
imbeacon Aug 17, 2024
33b8ca2
Changed configuration conversion, fix for opcua canncelled error hand…
imbeacon Aug 17, 2024
b89c4b9
Fix for unintialized gateway main loop messages processing
imbeacon Aug 17, 2024
fb2f036
Fixed getting scan period
samson0v Aug 19, 2024
a5c8534
Renamed scanPeriodInSec to scanPeriodInMillis
samson0v Aug 19, 2024
0d60037
Fixed converting data from subscriptions
samson0v Aug 19, 2024
fa5a005
Fixed error handling in the main loop (BadTooManySessions)
samson0v Aug 19, 2024
d737c8b
Minor performance improving
samson0v Aug 20, 2024
e32b93c
Avoiding received connector config mutating
samson0v Aug 20, 2024
8c480ff
Minor improvements for initialization
imbeacon Aug 21, 2024
a1283de
Small fixes
samson0v Aug 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions thingsboard_gateway/connectors/opcua/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(self, path, name, config, converter, converter_for_sub, logger):
'timeseries': [],
'attributes': []
}
self.nodes = []

self.load_values()

Expand Down
122 changes: 75 additions & 47 deletions thingsboard_gateway/connectors/opcua/opcua_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def close(self):
self.__connected = False
self.__log.info("Stopping OPC-UA Connector")

asyncio.run_coroutine_threadsafe(self.__disconnect(), self.__loop)
asyncio.run_coroutine_threadsafe(self.__cancel_all_tasks(), self.__loop)

start_time = monotonic()
Expand All @@ -134,6 +135,13 @@ async def __cancel_all_tasks(self):
for task in asyncio.all_tasks(self.__loop):
task.cancel()

async def __disconnect(self):
try:
await self.__client.disconnect()
self.__log.info('%s has been disconnected from OPC-UA Server.', self.get_name())
except Exception as e:
self.__log.error('%s could not be disconnected from OPC-UA Server: %s', self.name, e)

async def __reset_node(self, node):
node['valid'] = False
if node.get('sub_on', False):
Expand Down Expand Up @@ -213,9 +221,11 @@ async def start_client(self):
self.__log.error("Error on loading type definitions:\n %s", e)

scan_period = self.__server_conf.get('scanPeriodInMillis', 5000) / 1000

await self.__scan_device_nodes()

while not self.__stopped:
if monotonic() - self.__last_poll >= scan_period:
await self.__scan_device_nodes()
await self.__poll_nodes()
self.__last_poll = monotonic()

Expand Down Expand Up @@ -332,40 +342,6 @@ async def find_node_name_space_index(self, path):
unresolved = path[resolved_level:]
return await self.__find_nodes(unresolved, current_parent_node=parent_node, nodes=resolved)

async def __scan_device_nodes(self):
existing_devices = list(map(lambda dev: dev.name, self.__device_nodes))

scanned_devices = []
for device in self.__config.get('mapping', []):
nodes = await self.find_nodes(device['deviceNodePattern'])
self.__log.debug('Found devices: %s', nodes)

device_names = await self._get_device_info_by_pattern(
device.get('deviceInfo', {}).get('deviceNameExpression'))

for device_name in device_names:
scanned_devices.append(device_name)
if device_name not in existing_devices:
for node in nodes:
converter = self.__load_converter(device)
device_type = await self._get_device_info_by_pattern(
device.get('deviceInfo', {}).get('deviceProfileExpression', 'default'),
get_first=True)
device_config = {**device, 'device_name': device_name, 'device_type': device_type}
self.__device_nodes.append(
Device(path=node, name=device_name, config=device_config,
converter=converter(device_config, self.__log),
converter_for_sub=converter(device_config, self.__log) if not self.__server_conf.get(
'disableSubscriptions',
False) else None, logger=self.__log))
self.__log.info('Added device node: %s', device_name)

for device_name in existing_devices:
if device_name not in scanned_devices:
await self.__reset_nodes(device_name)

self.__log.debug('Device nodes: %s', self.__device_nodes)

async def _get_device_info_by_pattern(self, pattern, get_first=False):
result = []

Expand Down Expand Up @@ -411,9 +387,48 @@ def __convert_sub_data(self):
self.__data_to_send.put(*converter_data)
device.converter_for_sub.clear_data()
else:
sleep(.2)
sleep(.05)

async def __poll_nodes(self):
async def __scan_device_nodes(self):
await self._create_new_devices()
await self._load_devices_nodes()

async def _create_new_devices(self):
existing_devices = list(map(lambda dev: dev.name, self.__device_nodes))

scanned_devices = []
for device in self.__config.get('mapping', []):
nodes = await self.find_nodes(device['deviceNodePattern'])
self.__log.debug('Found devices: %s', nodes)

device_names = await self._get_device_info_by_pattern(
device.get('deviceInfo', {}).get('deviceNameExpression'))

for device_name in device_names:
scanned_devices.append(device_name)
if device_name not in existing_devices:
for node in nodes:
converter = self.__load_converter(device)
device_type = await self._get_device_info_by_pattern(
device.get('deviceInfo', {}).get('deviceProfileExpression', 'default'),
get_first=True)
device_config = {**device, 'device_name': device_name, 'device_type': device_type}
self.__device_nodes.append(
Device(path=node, name=device_name, config=device_config,
converter=converter(device_config, self.__log),
converter_for_sub=converter(device_config, self.__log) if not self.__server_conf.get(
'disableSubscriptions',
False) else None, logger=self.__log))

self.__log.info('Added device node: %s', device_name)

for device_name in existing_devices:
if device_name not in scanned_devices:
await self.__reset_nodes(device_name)

self.__log.debug('Device nodes: %s', self.__device_nodes)

async def _load_devices_nodes(self):
for device in self.__device_nodes:
for section in ('attributes', 'timeseries'):
for node in device.values.get(section, []):
Expand All @@ -426,24 +441,25 @@ async def __poll_nodes(self):
qualified_path = await self.find_node_name_space_index(path)
if len(qualified_path) == 0:
if node.get('valid', True):
self.__log.warning('Node not found; device: %s, key: %s, path: %s', device.name,
self.__log.warning('Node not found; device: %s, key: %s, path: %s',
device.name,
node['key'], node['path'])
await self.__reset_node(node)
continue
elif len(qualified_path) > 1:
self.__log.warning(
'Multiple matching nodes found; device: %s, key: %s, path: %s; %s', device.name,
'Multiple matching nodes found; device: %s, key: %s, path: %s; %s',
device.name,
node['key'], node['path'], qualified_path)
node['qualified_path'] = qualified_path[0]
path = qualified_path[0]

var = await self.__client.nodes.root.get_child(path)

device.nodes.append({'var': var, 'key': node['key'], 'section': section})

if (node.get('valid') is None or
(node.get('valid') and self.__server_conf.get('disableSubscriptions', False))):
value = await var.read_data_value()
device.converter.convert(config={'section': section, 'key': node['key']}, val=value)

if (not self.__server_conf.get('disableSubscriptions', False)
and not node.get('sub_on', False)
and not self.__stopped):
Expand All @@ -454,14 +470,16 @@ async def __poll_nodes(self):
node['subscription'] = handle
node['sub_on'] = True
node['id'] = var.nodeid.to_string()
self.__log.info("Subscribed on data change; device: %s, key: %s, path: %s", device.name,
self.__log.info("Subscribed on data change; device: %s, key: %s, path: %s",
device.name,
node['key'], node['id'])

node['valid'] = True
except ConnectionError:
raise
except (BadNodeIdUnknown, BadConnectionClosed, BadInvalidState, BadAttributeIdInvalid,
BadCommunicationError, BadOutOfService, BadNoMatch, BadUnexpectedError, UaStatusCodeErrors,
BadCommunicationError, BadOutOfService, BadNoMatch, BadUnexpectedError,
UaStatusCodeErrors,
BadWaitingForInitialData):
if node.get('valid', True):
self.__log.warning('Node not found (2); device: %s, key: %s, path: %s', device.name,
Expand All @@ -476,23 +494,33 @@ async def __poll_nodes(self):
self.__log.exception(e)
await self.__reset_node(node)

async def __poll_nodes(self):
values = await self.__client.read_values([node_config['var'] for device in self.__device_nodes for node_config in device.nodes])

converted_nodes_count = 0
for device in self.__device_nodes:
nodes_count = len(device.nodes)
device_values = values[converted_nodes_count:converted_nodes_count + nodes_count]
converted_nodes_count += nodes_count
device.converter.convert(device.nodes, device_values)
converter_data = device.converter.get_data()
if converter_data:
self.__data_to_send.put(*converter_data)

device.converter.clear_data()

self.__log.debug('Converted nodes values count: %s', converted_nodes_count)

def __send_data(self):
while not self.__stopped:
if not self.__data_to_send.empty():
data = self.__data_to_send.get()
self.statistics['MessagesReceived'] = self.statistics['MessagesReceived'] + 1
self.__log.debug(data)
self.__gateway.send_to_storage(self.get_name(), self.get_id(), data)
self.statistics['MessagesSent'] = self.statistics['MessagesSent'] + 1
self.__log.debug('Data to ThingsBoard %s', data)
self.__log.debug('Count data packs to ThingsBoard: %s', self.statistics['MessagesSent'])
else:
sleep(.2)
sleep(.05)

async def get_shared_attr_node_id(self, path, result={}):
try:
Expand Down
56 changes: 6 additions & 50 deletions thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,54 +59,10 @@ def get_data(self):

return None

def convert(self, config, val):
if not val:
return
def convert(self, configs, values):
for (val, config) in zip(values, configs):
if not val:
continue

data = val.Value.Value

if data is not None:
if isinstance(data, LocalizedText):
data = data.Text
elif val.Value.VariantType == VariantType.ExtensionObject:
data = str(data)
elif val.Value.VariantType == VariantType.DateTime:
if data.tzinfo is None:
data = data.replace(tzinfo=timezone.utc)
data = data.isoformat()
elif val.Value.VariantType == VariantType.StatusCode:
data = data.name
elif (val.Value.VariantType == VariantType.QualifiedName
or val.Value.VariantType == VariantType.NodeId
or val.Value.VariantType == VariantType.ExpandedNodeId):
data = data.to_string()
elif val.Value.VariantType == VariantType.ByteString:
data = data.hex()
elif val.Value.VariantType == VariantType.XmlElement:
data = data.decode('utf-8')
elif val.Value.VariantType == VariantType.Guid:
data = str(data)
elif val.Value.VariantType == VariantType.DiagnosticInfo:
data = data.to_string()
elif val.Value.VariantType == VariantType.Null:
data = None



if config['section'] == 'timeseries':
if val.SourceTimestamp and int(val.SourceTimestamp.replace(
tzinfo=timezone.utc).timestamp() * 1000) != self._last_node_timestamp:
timestamp = int(val.SourceTimestamp.replace(tzinfo=timezone.utc).timestamp() * 1000)
self._last_node_timestamp = timestamp
elif val.ServerTimestamp and int(val.ServerTimestamp.replace(
tzinfo=timezone.utc).timestamp() * 1000) != self._last_node_timestamp:
timestamp = int(val.ServerTimestamp.replace(tzinfo=timezone.utc).timestamp() * 1000)
self._last_node_timestamp = timestamp
else:
timestamp = int(time() * 1000)

self.data[DATA_TYPES[config['section']]].append({'ts': timestamp, 'values': {config['key']: data}})
else:
self.data[DATA_TYPES[config['section']]].append({config['key']: data})

self._log.debug('Converted data: %s', self.data)
if val is not None:
self.data[DATA_TYPES[config['section']]].append({config['key']: val})
Loading
Loading