forked from JohnMarzulli/categorical-sectional
-
Notifications
You must be signed in to change notification settings - Fork 0
/
controller.py
270 lines (204 loc) · 6.73 KB
/
controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# Live Sectional Map controller
# Dylan Rush 2017
# Additional modifications:
# 2018-2020, John Marzulli
# dylanhrush.com
# Uses RPi.GPIO library: https://sourceforge.net/p/raspberry-gpio-python/wiki/BasicUsage/
# Free for personal use. Prohibited for commercial without consent
#
# pip install Adafruit-GPIO
# pip install RPi-GPIO
# pip install pytest
# pip install Adafruit_WS2801
#
# Raspberry Pi
# Run 'raspi-config' and enable the SPI bus under Advanced
#
# Wiring the WS2801 :
# https://learn.adafruit.com/12mm-led-pixels/wiring
# https://tutorials-raspberrypi.com/how-to-control-a-raspberry-pi-ws2801-rgb-led-strip/
# Blue -> 5V Minus AND Pi GND (Physical Pi 25)
# Red -> 5V Plus
# Yellow -> Pin 19(Physical)/SPI MOSI
# Green -> Pin 23(Physical)/SCLK/SPI
#
import threading
import time
from datetime import datetime
import lib.colors as colors_lib
import lib.local_debug as local_debug
import renderer
from configuration import configuration, configuration_server
from data_sources import weather
from lib import colors as colors_lib
from lib import logger, safe_logging
from lib.recurring_task import RecurringTask
from visualizers import visualizers
thread_lock_object = threading.Lock()
if not local_debug.is_debug():
import RPi.GPIO as GPIO
try:
GPIO.setmode(GPIO.BOARD)
except Exception:
# ws281x causes an exception
# when you try to set the board type
pass
stations = configuration.get_airport_configs()
rgb_colors = colors_lib.get_colors()
renderer = renderer.get_renderer()
def update_weather_for_all_stations():
"""
Updates the weather for all of the stations.
This does not update the conditions or category.
"""
weather.get_metars(stations.keys())
def __get_dimmed_color__(
starting_color: list
) -> list:
"""
Given a starting color, get the version that is dimmed.
Arguments:
starting_color {list} -- The starting color that will be dimmed.
Returns:
list -- The color with the dimming adjustment.
"""
dimmed_color = []
brightness_adjustment = configuration.get_brightness_proportion()
for color in starting_color:
reduced_color = float(color) * brightness_adjustment
# Some colors are floats, some are integers.
# Make sure we keep everything the same.
if isinstance(color, int):
reduced_color = int(reduced_color)
dimmed_color.append(reduced_color)
return dimmed_color
def all_stations(
color: list
):
"""
Sets all of the airports to the given color
Arguments:
color {triple} -- Three integer tuple(triple?) of the RGB values
of the color to set for ALL airports.
"""
[renderer.set_leds(stations[station], rgb_colors[color])
for station in stations]
renderer.show()
def __all_leds_to_color__(
color: list
):
renderer.set_all(color)
def get_station_by_led(
index: int
) -> str:
"""
Given an LED, find the station it is representing.
Args:
index (int): [description]
Returns:
str: The identifier of the station.
"""
for station_identifier in stations.keys():
if index in stations[station_identifier]:
return station_identifier
return "UNK"
def render_thread():
"""
Main logic loop for rendering the lights.
"""
safe_logging.safe_log("Starting rendering thread")
tic = time.perf_counter()
toc = time.perf_counter()
debug_pixels_timer = None
loaded_visualizers = visualizers.VisualizerManager.initialize_visualizers(
renderer,
stations)
last_visualizer = 0
while True:
try:
delta_time = toc - tic
tic = time.perf_counter()
visualizer_index = configuration.get_visualizer_index(
loaded_visualizers)
if visualizer_index != last_visualizer:
renderer.clear()
last_visualizer = visualizer_index
loaded_visualizers[visualizer_index].update(delta_time)
show_debug_pixels = debug_pixels_timer is None or (
datetime.utcnow() - debug_pixels_timer).total_seconds() > 60.0
if show_debug_pixels:
for index in range(renderer.pixel_count):
station = get_station_by_led(index)
safe_logging.safe_log('[{}/{}]={}'.format(
station,
index,
renderer.pixels[index]))
debug_pixels_timer = datetime.utcnow()
toc = time.perf_counter()
except KeyboardInterrupt:
quit()
except Exception as ex:
safe_logging.safe_log(ex)
def wait_for_all_stations():
"""
Waits for all of the airports to have been given a chance to initialize.
If an airport had an error, then that still counts.
"""
for airport in stations:
try:
weather.get_metar(airport)
except Exception as ex:
safe_logging.safe_log_warning(
"Error while initializing with airport={}, EX={}".format(airport, ex))
return True
def __get_test_cycle_colors__() -> list:
base_colors_test = [
colors_lib.MAGENTA,
colors_lib.RED,
colors_lib.BLUE,
colors_lib.GREEN,
colors_lib.YELLOW,
colors_lib.WHITE,
colors_lib.GRAY,
colors_lib.DARK_YELLOW
]
colors_to_init = []
for color in base_colors_test:
is_global_dimming = configuration.get_brightness_proportion() < 1.0
color_to_cycle = rgb_colors[color]
colors_to_init.append(color_to_cycle)
if is_global_dimming:
colors_to_init.append(__get_dimmed_color__(color_to_cycle))
colors_to_init.append(rgb_colors[colors_lib.OFF])
return colors_to_init
def __test_all_leds__():
"""
Test all of the LEDs, independent of the configuration
to make sure the wiring is correct and that none have failed.
"""
for color in __get_test_cycle_colors__():
safe_logging.safe_log("Setting to {}".format(color))
__all_leds_to_color__(color)
time.sleep(0.5)
if __name__ == '__main__':
# Start loading the METARs in the background
# while going through the self-test
safe_logging.safe_log("Initialize weather for all airports")
weather.get_metars(stations.keys())
__test_all_leds__()
web_server = configuration_server.WeatherMapServer()
all_stations(weather.OFF)
RecurringTask(
"rest_host",
0.1,
web_server.run,
logger.LOGGER,
True)
wait_for_all_stations()
while True:
try:
render_thread()
except KeyboardInterrupt:
break
if not local_debug.is_debug():
GPIO.cleanup()