Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored OPC-UA connector #1500

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions thingsboard_gateway/connectors/opcua/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(self, path, name, config, converter, converter_for_sub, logger):
'timeseries': [],
'attributes': []
}
self.nodes = []

self.load_values()

Expand Down
122 changes: 75 additions & 47 deletions thingsboard_gateway/connectors/opcua/opcua_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def close(self):
self.__connected = False
self.__log.info("Stopping OPC-UA Connector")

asyncio.run_coroutine_threadsafe(self.__disconnect(), self.__loop)
asyncio.run_coroutine_threadsafe(self.__cancel_all_tasks(), self.__loop)

start_time = monotonic()
Expand All @@ -134,6 +135,13 @@ async def __cancel_all_tasks(self):
for task in asyncio.all_tasks(self.__loop):
task.cancel()

async def __disconnect(self):
try:
await self.__client.disconnect()
self.__log.info('%s has been disconnected from OPC-UA Server.', self.get_name())
except Exception as e:
self.__log.error('%s could not be disconnected from OPC-UA Server: %s', self.name, e)

async def __reset_node(self, node):
node['valid'] = False
if node.get('sub_on', False):
Expand Down Expand Up @@ -213,9 +221,11 @@ async def start_client(self):
self.__log.error("Error on loading type definitions:\n %s", e)

scan_period = self.__server_conf.get('scanPeriodInMillis', 5000) / 1000

await self.__scan_device_nodes()

while not self.__stopped:
if monotonic() - self.__last_poll >= scan_period:
await self.__scan_device_nodes()
await self.__poll_nodes()
self.__last_poll = monotonic()

Expand Down Expand Up @@ -332,40 +342,6 @@ async def find_node_name_space_index(self, path):
unresolved = path[resolved_level:]
return await self.__find_nodes(unresolved, current_parent_node=parent_node, nodes=resolved)

async def __scan_device_nodes(self):
existing_devices = list(map(lambda dev: dev.name, self.__device_nodes))

scanned_devices = []
for device in self.__config.get('mapping', []):
nodes = await self.find_nodes(device['deviceNodePattern'])
self.__log.debug('Found devices: %s', nodes)

device_names = await self._get_device_info_by_pattern(
device.get('deviceInfo', {}).get('deviceNameExpression'))

for device_name in device_names:
scanned_devices.append(device_name)
if device_name not in existing_devices:
for node in nodes:
converter = self.__load_converter(device)
device_type = await self._get_device_info_by_pattern(
device.get('deviceInfo', {}).get('deviceProfileExpression', 'default'),
get_first=True)
device_config = {**device, 'device_name': device_name, 'device_type': device_type}
self.__device_nodes.append(
Device(path=node, name=device_name, config=device_config,
converter=converter(device_config, self.__log),
converter_for_sub=converter(device_config, self.__log) if not self.__server_conf.get(
'disableSubscriptions',
False) else None, logger=self.__log))
self.__log.info('Added device node: %s', device_name)

for device_name in existing_devices:
if device_name not in scanned_devices:
await self.__reset_nodes(device_name)

self.__log.debug('Device nodes: %s', self.__device_nodes)

async def _get_device_info_by_pattern(self, pattern, get_first=False):
result = []

Expand Down Expand Up @@ -411,9 +387,48 @@ def __convert_sub_data(self):
self.__data_to_send.put(*converter_data)
device.converter_for_sub.clear_data()
else:
sleep(.2)
sleep(.05)

async def __poll_nodes(self):
async def __scan_device_nodes(self):
await self._create_new_devices()
await self._load_devices_nodes()

async def _create_new_devices(self):
existing_devices = list(map(lambda dev: dev.name, self.__device_nodes))

scanned_devices = []
for device in self.__config.get('mapping', []):
nodes = await self.find_nodes(device['deviceNodePattern'])
self.__log.debug('Found devices: %s', nodes)

device_names = await self._get_device_info_by_pattern(
device.get('deviceInfo', {}).get('deviceNameExpression'))

for device_name in device_names:
scanned_devices.append(device_name)
if device_name not in existing_devices:
for node in nodes:
converter = self.__load_converter(device)
device_type = await self._get_device_info_by_pattern(
device.get('deviceInfo', {}).get('deviceProfileExpression', 'default'),
get_first=True)
device_config = {**device, 'device_name': device_name, 'device_type': device_type}
self.__device_nodes.append(
Device(path=node, name=device_name, config=device_config,
converter=converter(device_config, self.__log),
converter_for_sub=converter(device_config, self.__log) if not self.__server_conf.get(
'disableSubscriptions',
False) else None, logger=self.__log))

self.__log.info('Added device node: %s', device_name)

for device_name in existing_devices:
if device_name not in scanned_devices:
await self.__reset_nodes(device_name)

self.__log.debug('Device nodes: %s', self.__device_nodes)

async def _load_devices_nodes(self):
for device in self.__device_nodes:
for section in ('attributes', 'timeseries'):
for node in device.values.get(section, []):
Expand All @@ -426,24 +441,25 @@ async def __poll_nodes(self):
qualified_path = await self.find_node_name_space_index(path)
if len(qualified_path) == 0:
if node.get('valid', True):
self.__log.warning('Node not found; device: %s, key: %s, path: %s', device.name,
self.__log.warning('Node not found; device: %s, key: %s, path: %s',
device.name,
node['key'], node['path'])
await self.__reset_node(node)
continue
elif len(qualified_path) > 1:
self.__log.warning(
'Multiple matching nodes found; device: %s, key: %s, path: %s; %s', device.name,
'Multiple matching nodes found; device: %s, key: %s, path: %s; %s',
device.name,
node['key'], node['path'], qualified_path)
node['qualified_path'] = qualified_path[0]
path = qualified_path[0]

var = await self.__client.nodes.root.get_child(path)

device.nodes.append({'var': var, 'key': node['key'], 'section': section})

if (node.get('valid') is None or
(node.get('valid') and self.__server_conf.get('disableSubscriptions', False))):
value = await var.read_data_value()
device.converter.convert(config={'section': section, 'key': node['key']}, val=value)

if (not self.__server_conf.get('disableSubscriptions', False)
and not node.get('sub_on', False)
and not self.__stopped):
Expand All @@ -454,14 +470,16 @@ async def __poll_nodes(self):
node['subscription'] = handle
node['sub_on'] = True
node['id'] = var.nodeid.to_string()
self.__log.info("Subscribed on data change; device: %s, key: %s, path: %s", device.name,
self.__log.info("Subscribed on data change; device: %s, key: %s, path: %s",
device.name,
node['key'], node['id'])

node['valid'] = True
except ConnectionError:
raise
except (BadNodeIdUnknown, BadConnectionClosed, BadInvalidState, BadAttributeIdInvalid,
BadCommunicationError, BadOutOfService, BadNoMatch, BadUnexpectedError, UaStatusCodeErrors,
BadCommunicationError, BadOutOfService, BadNoMatch, BadUnexpectedError,
UaStatusCodeErrors,
BadWaitingForInitialData):
if node.get('valid', True):
self.__log.warning('Node not found (2); device: %s, key: %s, path: %s', device.name,
Expand All @@ -476,23 +494,33 @@ async def __poll_nodes(self):
self.__log.exception(e)
await self.__reset_node(node)

async def __poll_nodes(self):
values = await self.__client.read_values([node_config['var'] for device in self.__device_nodes for node_config in device.nodes])

converted_nodes_count = 0
for device in self.__device_nodes:
nodes_count = len(device.nodes)
device_values = values[converted_nodes_count:converted_nodes_count + nodes_count]
converted_nodes_count += nodes_count
device.converter.convert(device.nodes, device_values)
converter_data = device.converter.get_data()
if converter_data:
self.__data_to_send.put(*converter_data)

device.converter.clear_data()

self.__log.debug('Converted nodes values count: %s', converted_nodes_count)

def __send_data(self):
while not self.__stopped:
if not self.__data_to_send.empty():
data = self.__data_to_send.get()
self.statistics['MessagesReceived'] = self.statistics['MessagesReceived'] + 1
self.__log.debug(data)
self.__gateway.send_to_storage(self.get_name(), self.get_id(), data)
self.statistics['MessagesSent'] = self.statistics['MessagesSent'] + 1
self.__log.debug('Data to ThingsBoard %s', data)
self.__log.debug('Count data packs to ThingsBoard: %s', self.statistics['MessagesSent'])
else:
sleep(.2)
sleep(.05)

async def get_shared_attr_node_id(self, path, result={}):
try:
Expand Down
56 changes: 6 additions & 50 deletions thingsboard_gateway/connectors/opcua/opcua_uplink_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,54 +59,10 @@ def get_data(self):

return None

def convert(self, config, val):
if not val:
return
def convert(self, configs, values):
for (val, config) in zip(values, configs):
if not val:
continue

data = val.Value.Value

if data is not None:
if isinstance(data, LocalizedText):
data = data.Text
elif val.Value.VariantType == VariantType.ExtensionObject:
data = str(data)
elif val.Value.VariantType == VariantType.DateTime:
if data.tzinfo is None:
data = data.replace(tzinfo=timezone.utc)
data = data.isoformat()
elif val.Value.VariantType == VariantType.StatusCode:
data = data.name
elif (val.Value.VariantType == VariantType.QualifiedName
or val.Value.VariantType == VariantType.NodeId
or val.Value.VariantType == VariantType.ExpandedNodeId):
data = data.to_string()
elif val.Value.VariantType == VariantType.ByteString:
data = data.hex()
elif val.Value.VariantType == VariantType.XmlElement:
data = data.decode('utf-8')
elif val.Value.VariantType == VariantType.Guid:
data = str(data)
elif val.Value.VariantType == VariantType.DiagnosticInfo:
data = data.to_string()
elif val.Value.VariantType == VariantType.Null:
data = None



if config['section'] == 'timeseries':
if val.SourceTimestamp and int(val.SourceTimestamp.replace(
tzinfo=timezone.utc).timestamp() * 1000) != self._last_node_timestamp:
timestamp = int(val.SourceTimestamp.replace(tzinfo=timezone.utc).timestamp() * 1000)
self._last_node_timestamp = timestamp
elif val.ServerTimestamp and int(val.ServerTimestamp.replace(
tzinfo=timezone.utc).timestamp() * 1000) != self._last_node_timestamp:
timestamp = int(val.ServerTimestamp.replace(tzinfo=timezone.utc).timestamp() * 1000)
self._last_node_timestamp = timestamp
else:
timestamp = int(time() * 1000)

self.data[DATA_TYPES[config['section']]].append({'ts': timestamp, 'values': {config['key']: data}})
else:
self.data[DATA_TYPES[config['section']]].append({config['key']: data})

self._log.debug('Converted data: %s', self.data)
if val is not None:
self.data[DATA_TYPES[config['section']]].append({config['key']: val})
Loading