-
Notifications
You must be signed in to change notification settings - Fork 0
/
YeeBulb.py
332 lines (302 loc) · 12.3 KB
/
YeeBulb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
import socket
import re
#Bulb class
class YeeBulb:
"""
All functions return a tuple: result_tup(x, y)
x - True|False depending whether the function executed successfully
y - List of requested params|"ok"|error message
"""
DISPLAY_MSG = True #Turn on/off YeeBulb messages
HANDLE_RESPONSE = True #Turn on/off handling response messages
supported_properties = ["power", "bright", "ct", "rgb", "hue", "sat", "color_mode", "flowing", "delayoff", "flow_params", "music_on", "name"]
def __init__(self, bulb_id, bulb_ip, bulb_port, model, name, methods):
self.id = bulb_id
self.ip = bulb_ip
self.port = bulb_port
self.model = model
self.name = name #Could be used instead of id to represent the bulb
self.methods = methods
self.cmd_id = int(0)
@classmethod
def display(cls, msg):
if YeeBulb.DISPLAY_MSG:
print(msg)
def supports_method(self, method):
if method in self.methods:
return True
else:
return False
def next_id(self):
"""Creates an Id to help request sender to correlate request and response"""
self.cmd_id += 1
return self.cmd_id
def info(self):
"""Returns bulb information"""
#Including local info
info = ("Id = " + str(self.id)
+"\nIP = " + str(self.ip)
+"\nPort = " + str(self.port)
+"\nModel = " + str(self.model))
#Collecting local states
response = self.get_state(YeeBulb.supported_properties)
current_states = response[1]
for prop, state in zip(YeeBulb.supported_properties, current_states):
info += "\n" + prop + " = " + state
#Adding supported methods
info += "\nMethods =\n"
for i in range(0, len(self.methods)):
info+="\t"+self.methods[i]+"\n"
return info
@staticmethod
def get_val(data, param):
""" Match line of 'param = value' """
param_re = re.compile(param)
match = param_re.search(data)
value = ""
if match != None:
value = match.group(1)
return value
@staticmethod
def handle_result_message(method, data):
"""
Method to handle the bulb's response to operation request.
"""
print(data)
if '"error"' in data:
#respose = (False, YeeBulb.get_val(data, '"message":(.*)\}\}'))
print("yee")
elif method == "get_prop":
result = YeeBulb.get_val(data, '"result":\[([ -~]*)\]\}').replace('"', '') #Retrieve the result and remove quotes
response = (True, result.split(','))
elif '"ok"' in data:
response = (True, "ok")
else:
response = (False, "Unknown error.\n Received data:\n" + data)
return response
def operate(self, method, params):
"""
Input data 'params' must be a compiled into one string.
E.g. params="1"; params="\"smooth\"", params="1,\"smooth\",80"
E.x. { "id": 1, "method": "set_power", "params":["on", "smooth", 500]}
"""
YeeBulb.display("\nOperating")
if not self.supports_method(method):
YeeBulb.display("ERROR\nMethod is not supported.")
else:
try:
tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
YeeBulb.display("connecting " + self.ip +" "+ self.port +"...")
tcp_socket.connect((self.ip, int(self.port)))
msg="{\"id\":" + str(self.next_id()) + ",\"method\":\""
msg += method + "\",\"params\":[" + params + "]}\r\n"
tcp_socket.send(msg.encode())
if YeeBulb.HANDLE_RESPONSE:
YeeBulb.display("Handling response")
DataBytes = tcp_socket.recv(2048)
data = DataBytes.decode()#Decode bytes->str
result = YeeBulb.handle_result_message(method, data)
else:
result = (True, "")
tcp_socket.close()
return result
except Exception as e:
YeeBulb.display("Unexpected error:" + e)
return (False, e)
def get_state(self, req_params):
""" Method to retrieve current state of specified bulb parameters """
params = ""
for i in range(0, len(req_params)):
params += "\"" + req_params[i] + "\""
if i != len(req_params) - 1:
params +=","
return self.operate("get_prop", params)
def set_ct(self, ct_value, effect = "sudden", duration = 30):
"""
Method to change the color temperature of the bulb
ct_value - targeted color temperature (1700 <= ct_value <= 6500 (k))
effect - sudden/smooth
duration - total time of gradual change if smooth mode is selected (duration > 30 (ms))
"""
if 1700 <= int(ct_value) <= 6500 and int(duration) >= 30:
params = str(ct_value) +",\"" + str(effect) + "\"," + str(duration)
return self.operate("set_ct_abx", params)
else:
return (False, "Parameters out of range")
def set_rgb(self, rgb_value, effect = "sudden", duration = 30):
"""
Metod to change the color of the bulb
rgb_value - the target color (decimal int; 0 <= rgb_value <= 16777215)
"""
if 0 <= int(rgb_value) <= 16777215 and int(duration) >= 30:
params = str(rgb_value) +",\"" + str(effect) + "\"," + str(duration)
return self.operate("set_rgb", params)
else:
return (False, "Parameters out of range")
def set_hue(self, hue, sat = 0, effect = "sudden", duration = 30):
"""
hue - target hue value (decimal int; 0 <= hue <= 359)
sat - target saturation value (int; 0 <= sat <= 100)
"""
if 0 <= int(hue) <= 359 and 0 <= int(sat) <= 100 and int(duration) >= 30:
params = str(hue) + "," + str(sat) +",\"" + str(effect) + "\"," + str(duration)
return self.operate("set_hsv", params)
else:
return (False, "Parameters out of range")
def set_bright(self, bright, effect = "sudden", duration = 30):
"""
Method to set the brightness of the bulb
bright - target brightness (1 <= bright <= 100)
"""
if (1 <= int(bright) <= 100) and (int(duration) >= 30):
params = str(bright) + ",\"" + str(effect) + "\"," + str(duration)
return self.operate("set_bright", params)
else:
return (False, "Parameters out of range")
def turn_on(self, effect = "sudden", duration = 30):
""" Method to turn on the bulb. """
params = "\"on\"" + ",\"" + str(effect) + "\"," + str(duration)
self.operate("set_power", params)
def turn_off(self, effect = "sudden", duration = 30):
""" Method to turn off the bulb. """
params = "\"off\"" + ",\"" + str(effect) + "\"," + str(duration)
self.operate("set_power", params)
def toggle(self):
""" Toggles on/off. """
self.operate("toggle", "")
def set_default(self):
"""Sets current bulb state as default. """
self.operate("set_default", "")
#NOT TESTED
def start_cf(self, count, action, *flow_expressions):
"""
This method is used to start a color flow. Color flow is a series of smart
LED visible state changing. It can be brightness changing, color changing or color
temperature changing. This is the most powerful command. All recommended scenes,
e.g. Sunrise/Sunset effect is implemented using this method. With the flow expression, user
can actually “program” the light effect.
Args:
count: total number of visible state changing. 0 means infinite loop.
action: action taken after the flow is stopped.
0 means smart LED recover to the state before the color flow started.
1 means smart LED stay at the state when the flow is stopped.
2 means turn off the smart LED after the flow is stopped.
flow_expression: the expression of the state changing series. [duration, mode, value, brightness]
duration: Gradual change time or sleep time in milliseconds, minimum value 50.
mode: 1 – color, 2 – color temperature, 7 -- sleep.
value: RGB value when mode is 1, CT value when mode is 2, ignored when mode is 7.
brightness: Brightness value, 1 ~ 100. Ignored when mode is 7.
Request Example:
{"id":1,"method":"start_cf","params":[ 4, 2, "1000, 2, 2700, 100, 500, 1,255, 10, 5000, 7, 0,0, 500, 2, 5000, 1"]
"""
if (flow_expressions % 4 == 0) and (flow_expressions / 4 == count): #Check if "flow_expressions" corelate with "count"
params = str(count) +"," + str(action) + ","
params += '"' + flow_expressions[0]
for expression in flow_expressions[1:]:
params += ', ' + expression
params += '"'
return self.operate("start_cf", params )
else:
return (False, "Incorect parameters")
def stop_cf(self):
""" Method to stop the color flow """
return self.operate("stop_cf", "")
def set_scene(self, class_type, *args):
"""
This method is used to set the smart LED directly to specified state.
If the smart LED is off, then it will turn on the smart LED firstly and then apply the specified command.
Args:
class_type: "color", "hsv", "ct", "cf", "auto_dealy_off".
"color": change the smart LED to specified color and brightness.
"hsv": change the smart LED to specified color and brightness.
"ct": change the smart LED to specified ct and brightness.
"cf": start a color flow in specified fashion.
"auto_delay_off": turn on the smart LED to specified brightness and start a sleep timer to turn off the light after the specified time
args: class specific.
"""
#TODO check if arg is in range for specific class_type
class_list =["color", "hsv", "ct", "cf", "auto_delay_off"]
if class_type in class_list:
params = str(class_type)
for arg in args:
param += str(arg) + ','
return self.operate("set_scene", params)
else:
return (False, "Parameters out of range")
def cron_add(self, value, mode = 0):
"""
This method is used to start a timer job on the smart LED.
Args:
value: is the length of the timer (in minutes).
mode: currently can only be 0. (means power off)
"""
params = str(mode) + ',' + str(value)
return self.operate("cron_add", params)
def cron_get(self, mode = 0):
"""
This method is used to retrieve the setting of the current cron job of the specified type.
Args:
mode: type of the cron job. (currently only support 0).
"""
return self.operate("cron_get", str(mode))
def cron_del(self, mode):
"""
This method is used to stop the specified cron job.
Args:
mode: the type of the cron job. (currently only support 0).
"""
return self.operate("cron_del", str(mode))
def set_adjust(self, prop, action = "circle"):
#NOT working, seems like a bug in the bulb firmware
#{"id":(null), "error":{"code":-1, "message":"invalid command"}}
"""
This method is used to change brightness, CT or color of a smart LED without knowing the current value.
Args:
action: direction of the adjustment. The valid values:
“increase": increase the specified property
“decrease": decrease the specified property
“circle": increase the specified property, after it reaches the max value, go back to minimum value.
prop: property to adjust. The valid values:
“bright": adjust brightness.
“ct": adjust color temperature.
“color": adjust color. (When “prop" is “color", the “action" can only be “circle", otherwise, it will be deemed as invalid request.)
"""
params = str(action) + ',' + str(prop)
return self.operate("set_adjust", params)
def set_music(self, action, host, port):
"""
This method is used to start or stop music mode on a device. Under music mode, no property will be reported and no message quota is checked.
Args:
action: action of set_music command. The valid values:
0: turn off music mode.
1: turn on music mode.
host: IP address of the music server.
port: TCP port music application is listening on.
Note:
When control device wants to start music mode, it needs start a TCP
server firstly and then call “set_music” command to let the device know the IP and Port of the
TCP listen socket. After the command is received, LED device will try to connect the specified
peer address. If the TCP connection is established successfully, then control device can
send all supported commands through this channel without any limits to simulate any music effect.
The control device can stop music mode by explicitly sending a stop command or by closing the socket.
TEMP Notes:
*possibly create a new thread pointing to music_mode()
*split set_music() into music_mode_start() and music_mode_stop()
"""
pass
#Turn music mode on
if int(action) == 1:
YeeBulb.DISPLAY_MSG = False #Turn off YeeBulb messages
YeeBulb.HANDLE_RESPONSE = False #Turn off handling response messages
#Turn music mode off
else:
YeeBulb.DISPLAY_MSG = True #Turn on YeeBulb messages
YeeBulb.HANDLE_RESPONSE = True #Turn on handling response messages
def set_name(self, name):
"""
This method is used to name the device. The name will be stored on the device and reported in discovering response.
User can also read the name through “get_prop” method.
Args:
name: new name of the bulb
"""
return self.operate("set_name", str(name))