Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
imbeacon authored Sep 17, 2024
2 parents 13e423d + 746c513 commit 767dff6
Show file tree
Hide file tree
Showing 3 changed files with 241 additions and 117 deletions.
13 changes: 13 additions & 0 deletions thingsboard_gateway/connectors/modbus/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from enum import Enum

from thingsboard_gateway.gateway.constants import *

Expand Down Expand Up @@ -71,6 +72,18 @@
INPUT_REGISTERS = "input_registers"
DISCRETE_INPUTS = "discrete_inputs"

# Report strategy parameters
REPORT_STRATEGY_PARAMETER = "dataReportStrategy"
REPORT_PERIOD_PARAMETER = "reportPeriod"
class ReportStrategy(Enum):
ON_REPORT_PERIOD = "ON_REPORT_PERIOD"
ON_CHANGE = "ON_CHANGE"
ON_CHANGE_OR_REPORT_PERIOD = "ON_CHANGE_OR_REPORT_PERIOD"

class RequestType(Enum):
POLL = "POLL"
SEND_DATA = "SEND_DATA"

# Default values

TIMEOUT = 30
203 changes: 103 additions & 100 deletions thingsboard_gateway/connectors/modbus/modbus_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.
from copy import deepcopy
from threading import Thread, Lock
from time import sleep
from time import sleep, monotonic
from time import monotonic as time
from queue import Queue
from random import choice
Expand Down Expand Up @@ -258,8 +258,8 @@ def __load_slaves(self):
'callback': ModbusConnector.callback}))

@classmethod
def callback(cls, slave):
cls.process_requests.put(slave)
def callback(cls, slave: Slave, request_type: RequestType, data=None):
cls.process_requests.put((slave, request_type, data))

@property
def connector_type(self):
Expand Down Expand Up @@ -302,24 +302,13 @@ def __convert_data(self, params):
ATTRIBUTES_PARAMETER: []
}

if current_device_config.get('sendDataOnlyOnChange'):
self.statistics[STATISTIC_MESSAGE_RECEIVED_PARAMETER] += 1

for converted_data_section in CONVERTED_DATA_SECTIONS:
for current_section_dict in converted_data[converted_data_section]:
for key, value in current_section_dict.items():
if device.config[LAST_PREFIX + converted_data_section].get(key) is None or \
device.config[LAST_PREFIX + converted_data_section][key] != value:
device.config[LAST_PREFIX + converted_data_section][key] = value
to_send[converted_data_section].append({key: value})
elif converted_data and current_device_config.get('sendDataOnlyOnChange') is None or \
not current_device_config.get('sendDataOnlyOnChange'):
self.statistics[STATISTIC_MESSAGE_RECEIVED_PARAMETER] += 1

for converted_data_section in CONVERTED_DATA_SECTIONS:
device.config[LAST_PREFIX + converted_data_section] = converted_data[
converted_data_section]
to_send[converted_data_section] = converted_data[converted_data_section]
# Check report strategy for each key in attributes and telemetry for device and send data only if it is necessary
for converted_data_section in CONVERTED_DATA_SECTIONS:
for current_section_dict in converted_data[converted_data_section]:
for key, value in current_section_dict.items():
should_send = device.update_cached_data_and_check_is_data_should_be_send(converted_data_section, key, value)
if should_send:
to_send[converted_data_section].append({key: value})

if to_send.get(ATTRIBUTES_PARAMETER) or to_send.get(TELEMETRY_PARAMETER):
return to_send
Expand Down Expand Up @@ -364,88 +353,102 @@ def get_id(self):
def __process_slaves(self):
while not self.__stopped:
if not self.__stopped and not ModbusConnector.process_requests.empty():
device: Slave = ModbusConnector.process_requests.get()
device_connected = False
device_disconnected = False
(device, request_type, data) = ModbusConnector.process_requests.get()
if request_type == RequestType.POLL:
self.__poll_device(device)
elif request_type == RequestType.SEND_DATA:
self.__send_data_from_device_by_strategy(device, data)
sleep(.001)

self.__log.debug("Checking %s", device)
if device.config.get(TYPE_PARAMETER).lower() == 'serial':
self.lock.acquire()
def __send_data_from_device_by_strategy(self, device, data):
self.__gateway.send_to_storage(self.get_name(), self.get_id(), data)
self.statistics[STATISTIC_MESSAGE_SENT_PARAMETER] += 1

device_responses = {'timeseries': {}, 'attributes': {}}
current_device_config = {}
try:
for config_section in device_responses:
if device.config.get(config_section) is not None and len(device.config.get(config_section)):
current_device_config = device.config

if self.__connect_to_current_master(device):
if not device_connected and device.config['master'].is_socket_open():
device_connected = True
self.__gateway.add_device(device.device_name, {CONNECTOR_PARAMETER: self},
device_type=device.config.get(DEVICE_TYPE_PARAMETER))
else:
device_disconnected = True
else:
if not device_disconnected:
device_disconnected = True
self.__gateway.del_device(device.device_name)
continue

if (not device.config['master'].is_socket_open()
or not len(current_device_config[config_section])):
if not device.config['master'].is_socket_open():
error = 'Socket is closed, connection is lost, for device %s with config %s' % (
device.device_name, current_device_config)
else:
error = 'Config is invalid or empty for device %s, config %s' % (
device.device_name, current_device_config)
self.__log.error(error)
self.__log.debug("Device %s is not connected, data will not be processed",
device.device_name)
continue

# Reading data from device
for interested_data in range(len(current_device_config[config_section])):
current_data = deepcopy(current_device_config[config_section][interested_data])
current_data[DEVICE_NAME_PARAMETER] = device.device_name
input_data = self.__function_to_device(device, current_data)

# due to issue #1056
if isinstance(input_data, ModbusIOException) or isinstance(input_data, ExceptionResponse):
device.config.pop('master', None)
self.__gateway.del_device(device.device_name)
self.__connect_to_current_master(device)
break

device_responses[config_section][current_data[TAG_PARAMETER]] = {
"data_sent": current_data,
"input_data": input_data
}

self.__log.debug("Checking %s for device %s", config_section, device)
self.__log.debug('Device response: ', device_responses)

if device_responses.get('timeseries') or device_responses.get('attributes'):
self._convert_msg_queue.put((self.__convert_data, (device, current_device_config, {
**current_device_config,
BYTE_ORDER_PARAMETER: current_device_config.get(BYTE_ORDER_PARAMETER, device.byte_order),
WORD_ORDER_PARAMETER: current_device_config.get(WORD_ORDER_PARAMETER, device.word_order)
}, device_responses)))

except ConnectionException:
self.__gateway.del_device(device.device_name)
sleep(5)
self.__log.error("Connection lost! Reconnecting...")
except Exception as e:
self.__gateway.del_device(device.device_name)
self.__log.exception(e)

# Release mutex if "serial" type only
if device.config.get(TYPE_PARAMETER) == 'serial':
self.lock.release()

sleep(.001)
def __poll_device(self, device):
device_connected = device.last_connect_time != 0 and monotonic() - device.last_connect_time < 10
device_disconnected = False

self.__log.debug("Checking %s", device)
if device.config.get(TYPE_PARAMETER).lower() == 'serial':
self.lock.acquire()

device_responses = {'timeseries': {}, 'attributes': {}}
current_device_config = {}
try:
for config_section in device_responses:
if device.config.get(config_section) is not None and len(device.config.get(config_section)):
current_device_config = device.config
connected_to_current_master = self.__connect_to_current_master(device)
if connected_to_current_master:
is_socket_open = device.config['master'].is_socket_open()
if not device_connected and is_socket_open:
device_connected = True
device.last_connect_time = monotonic()
self.__gateway.add_device(device.device_name, {CONNECTOR_PARAMETER: self},
device_type=device.config.get(DEVICE_TYPE_PARAMETER))
elif not is_socket_open:
device.last_connect_time = 0
device_disconnected = True
else:
if not device_disconnected:
device.last_connect_time = 0
device_disconnected = True
self.__gateway.del_device(device.device_name)
continue

if (not device.config['master'].is_socket_open()
or not len(current_device_config[config_section])):
if not device.config['master'].is_socket_open():
error = 'Socket is closed, connection is lost, for device %s with config %s' % (
device.device_name, current_device_config)
else:
error = 'Config is invalid or empty for device %s, config %s' % (
device.device_name, current_device_config)
self.__log.error(error)
self.__log.debug("Device %s is not connected, data will not be processed",
device.device_name)
continue

# Reading data from device
for interested_data in range(len(current_device_config[config_section])):
current_data = deepcopy(current_device_config[config_section][interested_data])
current_data[DEVICE_NAME_PARAMETER] = device.device_name
input_data = self.__function_to_device(device, current_data)

# due to issue #1056
if isinstance(input_data, ModbusIOException) or isinstance(input_data, ExceptionResponse):
device.config.pop('master', None)
self.__gateway.del_device(device.device_name)
self.__connect_to_current_master(device)
break

device_responses[config_section][current_data[TAG_PARAMETER]] = {
"data_sent": current_data,
"input_data": input_data
}

self.__log.debug("Checking %s for device %s", config_section, device)
self.__log.debug('Device response: ', device_responses)

if device_responses.get('timeseries') or device_responses.get('attributes'):
self._convert_msg_queue.put((self.__convert_data, (device, current_device_config, {
**current_device_config,
BYTE_ORDER_PARAMETER: current_device_config.get(BYTE_ORDER_PARAMETER, device.byte_order),
WORD_ORDER_PARAMETER: current_device_config.get(WORD_ORDER_PARAMETER, device.word_order)
}, device_responses)))

except ConnectionException:
self.__gateway.del_device(device.device_name)
sleep(5)
self.__log.error("Connection lost! Reconnecting...")
except Exception as e:
self.__gateway.del_device(device.device_name)
self.__log.exception(e)
finally:
# Release mutex if "serial" type only
if device.config.get(TYPE_PARAMETER) == 'serial':
self.lock.release()

def __connect_to_current_master(self, device: Slave=None):
connect_attempt_count = 5
Expand Down
Loading

0 comments on commit 767dff6

Please sign in to comment.