-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDDS8558_pymodbus.py
87 lines (74 loc) · 4.15 KB
/
DDS8558_pymodbus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from pymodbus.client.sync import ModbusSerialClient as ModbusClient
import logging
# initialize a serial RTU client instance
from pymodbus.payload import BinaryPayloadDecoder
from pymodbus.constants import Endian
class DDS8558_Modbus_pymodbus:
"""Simple method for reading DDS8558 over serial Modbus interface"""
def __init__(self, serial_device='/dev/ttyUSB0', modbus_address=0x001, baudrate=9600, debug=False):
self.voltage = 0x0000
self.current = 0x0008
self.activePower = 0x0012
self.reactivePower = 0x001A # TODO: figure out why I can't read reactive power
self.PowerFactor = 0x002A
self.Frequency = 0x0036
self.totalActiveEnergy = 0x0100
self.totalReactiveEnergy = 0x0400 # TODO: again, no reactive energy
self.modbus_address = modbus_address
self.client = ModbusClient(method='rtu', port=serial_device, baudrate=baudrate, retries=1000, timeout=0.1,
parity='E', stopbits=1)
self.client.connect()
if debug:
logging.basicConfig()
log = logging.getLogger()
log.setLevel(logging.DEBUG)
def read_voltage(self):
"""Read voltages from meter"""
result = self.client.read_input_registers(address=self.voltage, count=2, unit=self.modbus_address)
decoder = BinaryPayloadDecoder.fromRegisters(result.registers, Endian.Big, wordorder=Endian.Big)
value = decoder.decode_32bit_float()
return value
def read_current(self):
"""Read smart meter current"""
result = self.client.read_input_registers(address=self.current, count=2, unit=self.modbus_address)
decoder = BinaryPayloadDecoder.fromRegisters(result.registers, Endian.Big, wordorder=Endian.Big)
value = decoder.decode_32bit_float()
return value
# reactive power is supposed to be there but based on testing it returns real power.
def read_reactive_power(self):
"""Data register Address"""
result = self.client.read_input_registers(address=self.reactivePower, count=2, unit=self.modbus_address)
decoder = BinaryPayloadDecoder.fromRegisters(result.registers, Endian.Big, wordorder=Endian.Big)
value = decoder.decode_32bit_float()
return value
#even though there is a register on pdf supplied this function does not work ?!
#def read_active_power(self):
# result = self.client.read_input_registers(address=self.activePower, count=2, unit=self.modbus_address)
# decoder = BinaryPayloadDecoder.fromRegisters(result.registers, Endian.Big, wordorder=Endian.Big)
# value = decoder.decode_32bit_float()
# return value
#power factor works ?!
def read_power_factor(self):
result = self.client.read_input_registers(address=self.PowerFactor, count=2, unit=self.modbus_address)
decoder = BinaryPayloadDecoder.fromRegisters(result.registers, Endian.Big, wordorder=Endian.Big)
value = decoder.decode_32bit_float()
return value
def read_total_active_energy(self):
"""Read total active energy"""
result = self.client.read_input_registers(address=self.totalActiveEnergy, count=2, unit=self.modbus_address)
decoder = BinaryPayloadDecoder.fromRegisters(result.registers, Endian.Big, wordorder=Endian.Big)
value = decoder.decode_32bit_float()
return value
# same as reactive power there is nothing in this register
#def read_total_reactive_energy(self):
# """Read total active energy"""
# result = self.client.read_input_registers(address=self.totalReactiveEnergy, count=2, unit=self.modbus_address)
# decoder = BinaryPayloadDecoder.fromRegisters(result.registers, Endian.Big, wordorder=Endian.Big)
# value = decoder.decode_32bit_float()
# return value
def read_frequency(self):
"""Read system frequency (not sure why would you need that though) """
result = self.client.read_input_registers(address=self.Frequency, count=2, unit=self.modbus_address)
decoder = BinaryPayloadDecoder.fromRegisters(result.registers, Endian.Big, wordorder=Endian.Big)
value = decoder.decode_32bit_float()
return value