-
Notifications
You must be signed in to change notification settings - Fork 2
/
modbus_utils.py
81 lines (62 loc) · 2.72 KB
/
modbus_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
from typing import Any, Dict, List, Optional, Tuple, Union
from pathlib import Path
import xml.etree.cElementTree as ET
from pyModbusTCP.client import ModbusClient as mbClient
class ModbusClient(mbClient):
def __init__(self, host: str, port: int, unit_id: Optional[int] = None, timeout: Optional[float] = None, debug: Optional[bool] = None):
try:
super().__init__(host=host, port=port, unit_id=unit_id,
timeout=timeout, debug=debug, auto_open=True)
except ValueError:
print("Error with host or port params")
def __enter__(self):
return self
def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any):
self.close()
class InputEvent:
triggers: List[Tuple[int, bool]]
def __init__(self):
self.triggers = []
class OutputEvent:
actions: List[Tuple[int, bool]]
def __init__(self):
self.actions = []
def parseXMLConfig(filepath: Union[Path, str]):
"""
Parse config for virtual Modbus. The XML can be exported from (tina/flexfact)
Returns (inputs, outputs)
"""
inputs: Dict[str, InputEvent] = {}
outputs: Dict[str, OutputEvent] = {}
root = ET.parse(filepath).getroot()
for tag in root.findall('EventConfiguration/Event'):
event = {}
name = tag.get('name')
assert name is not None, 'Expected name in XML'
if tag.get('iotype') == 'input':
event = InputEvent()
raw_triggers = tag.find('Triggers')
assert raw_triggers is not None, 'Expected Triggers in XML'
triggers = [trigger for trigger in raw_triggers.iter(
) if trigger is not tag.find('Triggers')]
for element in triggers:
raw_address = element.get('address')
assert raw_address is not None, 'Expected address in XML'
address = int(raw_address)
rising = element.tag == 'PositiveEdge' # Rising edge or not
event.triggers.append((address, rising))
inputs[name] = event
else:
event = OutputEvent()
raw_actions = tag.find('Actions')
assert raw_actions is not None, 'Expected Actions in XML'
actions = [action for action in raw_actions.iter(
) if action is not tag.find('Actions')]
for element in actions:
raw_address = element.get('address')
assert raw_address is not None, 'Expected address in XML'
address = int(raw_address)
value = element.tag == 'Set' # Otherwise it's clr so it should be false
event.actions.append((address, value))
outputs[name] = event
return inputs, outputs