This repository has been archived by the owner on Feb 19, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mqtt-light.py
238 lines (211 loc) · 7.74 KB
/
mqtt-light.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
"""
MQTTLight
Purpose: Raspberry Pi MQTT Light Controller for ws281x LEDs
Author: Kevin Ahr
Email: meowmeowahr@gmail.com
"""
import paho.mqtt.client as mqtt
import pyfiglet
import configparser
import logging
import json
import lighting
import threading
import time
import board
import sys
__author__ = "Kevin Ahr"
__version__ = "0.3.1"
# bold banner
print("\x1b[1m" + pyfiglet.figlet_format("MQTTLight") + "\x1b[0m")
print("v{}".format(__version__))
# variables
config = configparser.ConfigParser()
config.read('config.ini')
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', filename='mqttlight.log', filemode='w')
host = config['MQTT']['Host']
port = config.getint('MQTT', 'Port')
state_topic = config['MQTT']['LEDStateTopic']
command_topic = config['MQTT']['LEDCommandTopic']
online_topic = config['MQTT']['OnlineTopic']
# see if there is "LED" section in config
if config.has_section('LED'):
led_pin = config.getint('LED', 'Pin')
num_pix = config.getint('LED', 'NumPix')
pix_order = config['LED']['PixOrder']
led_type = 0
lighting.start(led_pin, num_pix, order=pix_order)
elif config.has_section('DumbLED'):
r_pin = config.getint('DumbLED', 'RedPin')
g_pin = config.getint('DumbLED', 'GreenPin')
b_pin = config.getint('DumbLED', 'BluePin')
led_type = 1
lighting.start_dumb(r_pin, g_pin, b_pin)
global RED, GREEN, BLUE, BRIGHTNESS, EFFECT, STATE
RED = 255
GREEN = 255
BLUE = 255
BRIGHTNESS = 127
EFFECT = "None"
STATE = "OFF"
global old_red, old_green, old_blue, old_brightness, old_effect, old_state
old_red = RED
old_green = GREEN
old_blue = BLUE
old_brightness = BRIGHTNESS
old_effect = EFFECT
old_state = STATE
# mqtt client
def on_connect(client, userdata, flags, rc):
logging.info("Connected with result code %s", str(rc))
# subscribe to topics
client.subscribe(command_topic)
client.publish(online_topic, "online")
client.publish(state_topic, construct_message())
logging.debug("Published Message: {}, to {}".format(construct_message(), state_topic))
def on_message(client, userdata, msg):
global RED, GREEN, BLUE, BRIGHTNESS, EFFECT, STATE
# decode message
message = json.loads(msg.payload.decode('utf-8'))
# log
logging.debug("Recieved Message: {}".format(message))
old_brigtness = BRIGHTNESS
old_red, old_green, old_blue = RED, GREEN, BLUE
STATE = message['state']
if STATE == "OFF":
lighting.pixels.fill((0,0,0))
lighting.pixels.show()
# try to get brightness
try:
BRIGHTNESS = message['brightness']
except KeyError:
pass
# try to get color
try:
RED = message['color']['r']
GREEN = message['color']['g']
BLUE = message['color']['b']
except KeyError:
pass
# try to get effect
try:
EFFECT = message['effect']
except KeyError:
pass
# publish state
client.publish(state_topic, construct_message())
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(host, port, 60)
stop = False
def loop():
while True:
client.loop()
if stop:
sys.exit()
def round_tuple(t):
return tuple(map(lambda x: round(x), t))
def light_loop():
while True:
if EFFECT == "None":
if STATE == "OFF":
# make actual rgb with brightness
if led_type == 0:
actual_old = lighting.pixels[0]
elif led_type == 1:
actual_old = (lighting.pixels.red, lighting.pixels.green, lighting.pixels.blue)
lighting.fade_all(actual_old, (0,0,0))
else:
# make actual rgb with brightness
if led_type == 0:
actual_old = lighting.pixels[0]
elif led_type == 1:
actual_old = (lighting.pixels.red, lighting.pixels.green, lighting.pixels.blue)
actual_new = (RED * (BRIGHTNESS/255), GREEN * (BRIGHTNESS/255), BLUE * (BRIGHTNESS/255))
lighting.fade_all(round_tuple(actual_old), round_tuple(actual_new))
elif EFFECT == "Breathing":
lighting.breath((RED, GREEN, BLUE), BRIGHTNESS)
time.sleep(0.01)
elif EFFECT == "Colorloop":
lighting.colorloop(BRIGHTNESS)
time.sleep(0.03)
elif EFFECT == "ColoredLights":
if led_type == 0:
lighting.classic_colors(BRIGHTNESS)
else:
logging.critical("{} is not supported for led mode {}".format(EFFECT, led_type))
break
elif EFFECT == "WipeRedToGreen":
if led_type == 0:
lighting.wipe_red_to_green(BRIGHTNESS)
time.sleep(0.05)
else:
logging.critical("{} is not supported for led mode {}".format(EFFECT, led_type))
break
elif EFFECT == "Rainbow":
if led_type == 0:
lighting.rainbow_cycle(BRIGHTNESS)
time.sleep(0.01)
else:
logging.critical("{} is not supported for led mode {}".format(EFFECT, led_type))
break
elif EFFECT == "Magic":
if led_type == 0:
lighting.magic_cycle(BRIGHTNESS)
time.sleep(0.05)
else:
logging.critical("{} is not supported for led mode {}".format(EFFECT, led_type))
break
elif EFFECT == "Fire":
if led_type == 0:
lighting.fire_cycle(BRIGHTNESS)
time.sleep(0.05)
elif EFFECT == "FadeColorToWhite":
if led_type == 0:
lighting.fade_classic_to_white(BRIGHTNESS)
time.sleep(0.05)
else:
logging.critical("{} is not supported for led mode {}".format(EFFECT, led_type))
break
elif EFFECT == "FlashColorToWhite":
if led_type == 0:
lighting.flash_classic_to_white(BRIGHTNESS)
time.sleep(1)
else:
logging.critical("{} is not supported for led mode {}".format(EFFECT, led_type))
break
elif EFFECT == "Random":
if led_type == 0:
lighting.randomc((RED, GREEN, BLUE), BRIGHTNESS)
time.sleep(0.3)
else:
logging.critical("{} is not supported for led mode {}".format(EFFECT, led_type))
break
elif EFFECT == "RandomColor":
if led_type == 0:
lighting.random_color(BRIGHTNESS)
time.sleep(0.3)
else:
logging.critical("{} is not supported for led mode {}".format(EFFECT, led_type))
break
global stop
stop = True
def construct_message():
msg = '{{"brightness": {0}, "color_mode": "rgb", "color": {{ "r": {1}, "g": {2}, "b": {3}}}, "effect": "{4}", "state": "{5}", "transition": 2}}'.format(BRIGHTNESS, RED, GREEN, BLUE, EFFECT, STATE)
client.publish(state_topic, msg)
return msg
if __name__ == "__main__":
# start mqtt loop
try:
# start light loop
light_thread = threading.Thread(target=light_loop, daemon=True)
light_thread.start()
loop()
except Exception as e:
logging.error("Program experienced Exception: %s", e)
try:
client.publish(online_topic, "offline")
client.disconnect()
except Exception as e:
logging.error("Program experienced Exception %s, while disconnecting", e)