-
Notifications
You must be signed in to change notification settings - Fork 3
/
ddcci.py
154 lines (108 loc) · 3.85 KB
/
ddcci.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import smbus
import time
from functools import wraps
HOST_SLAVE_ADDRESS = 0x51
PROTOCOL_FLAG = 0x80
DDCCI_COMMAND_READ = 0x01
DDCCI_REPLY_READ = 0x02
DDCCI_COMMAND_WRITE = 0x03
DEFAULT_DDCCI_ADDR = 0x37
READ_DELAY = WRITE_DELAY = 0.06
def throttle(delay):
"""usage:
@throttle
def func( ... ): ... (defaults to WRITE_DELAY)
@throttle(3)
def func( ... ): ... (delay provided explicitly)"""
def throttle_deco(func):
@wraps(func)
def wrapped(*args, **kwargs):
if hasattr(func, 'last_execution') and \
time.time() - func.last_execution < delay:
time.sleep(delay - (time.time() - func.last_execution))
r = func(*args, **kwargs)
func.last_execution = time.time()
return r
return wrapped
if callable(delay): # @throttle invocation
func, delay = delay, WRITE_DELAY
return throttle_deco(func)
else: # @throttle(...) invocation
return throttle_deco
class ReadException(Exception):
pass
class DDCCIDevice(object):
def __init__(self, bus, address=DEFAULT_DDCCI_ADDR):
if isinstance(bus, smbus.SMBus):
self.bus = bus
else:
self.bus = smbus.SMBus(bus)
self.address = address
def write(self, ctrl, value):
payload = self.prepare_payload(
self.address,
[DDCCI_COMMAND_WRITE, ctrl, (value >> 8) & 255, value & 255]
)
self.write_payload(payload)
def read(self, ctrl, extended=False):
payload = self.prepare_payload(
self.address,
[DDCCI_COMMAND_READ, ctrl]
)
self.write_payload(payload)
time.sleep(READ_DELAY)
if self.bus.read_byte(self.address) != self.address << 1:
raise ReadException("ACK invalid")
data_length = self.bus.read_byte(self.address) & ~PROTOCOL_FLAG
data = [self.bus.read_byte(self.address) for n in xrange(data_length)]
checksum = self.bus.read_byte(self.address)
xor = (self.address << 1 | 1) ^ HOST_SLAVE_ADDRESS ^ (PROTOCOL_FLAG | len(data))
for n in data:
xor ^= n
if xor != checksum:
raise ReadException("Invalid checksum")
if data[0] != DDCCI_REPLY_READ:
raise ReadException("Invalid response type")
if data[2] != ctrl:
raise ReadException("Received data for unrequested control")
max_value = data[4] << 8 | data[5]
value = data[6] << 8 | data[7]
if extended:
return value, max_value
else:
return value
def control_property(ctrl):
"""helper for adding control properties (see demo)"""
return property(lambda s: s.read(ctrl),
lambda s, v: s.write(ctrl, v))
brightness = control_property(0x10)
contrast = control_property(0x12)
@throttle
def write_payload(self, payload):
self.bus.write_i2c_block_data(self.address, payload[0], payload[1:])
def prepare_payload(self, addr, data):
payload = [HOST_SLAVE_ADDRESS, PROTOCOL_FLAG | len(data)]
if data[0] == DDCCI_COMMAND_READ:
xor = addr << 1 | 1
else:
xor = addr << 1
payload.extend(data)
for x in payload:
xor ^= x
payload.append(xor)
return payload
if __name__ == '__main__':
# You can obtain your bus id using `i2cdetect -l` or `ddccontrol -p`
d = DDCCIDevice(8)
print('Demo 1 ...')
d.write(0x10, 42)
time.sleep(1)
print('Demo 2 ...')
d.brightness = 12
d.contrast = 34
time.sleep(1)
print('Demo 3 ...')
d.write(0x12, 69)
print('Brightness: %d, Contrast: %d' % (d.brightness, d.contrast))
print('Max brightness: %d, Max contrast: %d' % (
d.read(0x10, True)[1], d.read(0x12, True)[1]))