-
Notifications
You must be signed in to change notification settings - Fork 161
/
lv_utils.py
163 lines (136 loc) · 4.96 KB
/
lv_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
##############################################################################
# Event Loop module: advancing tick count and scheduling lvgl task handler.
# Import after lvgl module.
# This should be imported and used by display driver.
# Display driver should first check if an event loop is already running.
#
# Usage example with SDL:
#
# SDL.init(auto_refresh=False)
# # Register SDL display driver.
# # Register SDL mouse driver
# event_loop = lv_utils.event_loop()
#
#
# uasyncio example with SDL:
#
# SDL.init(auto_refresh=False)
# # Register SDL display driver.
# # Register SDL mouse driver
# event_loop = lv_utils.event_loop(asynchronous=True)
# uasyncio.Loop.run_forever()
#
# uasyncio example with ili9341:
#
# event_loop = lv_utils.event_loop(asynchronous=True) # Optional!
# self.disp = ili9341(asynchronous=True)
# uasyncio.Loop.run_forever()
#
# MIT license; Copyright (c) 2021 Amir Gonnen
#
##############################################################################
import lvgl as lv
import micropython
import usys
# Try standard machine.Timer, or custom timer from lv_timer, if available
try:
from machine import Timer
except:
try:
from lv_timer import Timer
except:
raise RuntimeError("Missing machine.Timer implementation!")
# Try to determine default timer id
default_timer_id = 0
if usys.platform == 'pyboard':
# stm32 only supports SW timer -1
default_timer_id = -1
if usys.platform == 'rp2':
# rp2 only supports SW timer -1
default_timer_id = -1
# Try importing uasyncio, if available
try:
import uasyncio
uasyncio_available = True
except:
uasyncio_available = False
##############################################################################
class event_loop():
_current_instance = None
def __init__(self, freq=25, timer_id=default_timer_id, max_scheduled=2, refresh_cb=None, asynchronous=False, exception_sink=None):
if self.is_running():
raise RuntimeError("Event loop is already running!")
if not lv.is_initialized():
lv.init()
event_loop._current_instance = self
self.delay = 1000 // freq
self.refresh_cb = refresh_cb
self.exception_sink = exception_sink if exception_sink else self.default_exception_sink
self.asynchronous = asynchronous
if self.asynchronous:
if not uasyncio_available:
raise RuntimeError("Cannot run asynchronous event loop. uasyncio is not available!")
self.refresh_event = uasyncio.Event()
self.refresh_task = uasyncio.create_task(self.async_refresh())
self.timer_task = uasyncio.create_task(self.async_timer())
else:
self.timer = Timer(timer_id)
self.task_handler_ref = self.task_handler # Allocation occurs here
self.timer.init(mode=Timer.PERIODIC, period=self.delay, callback=self.timer_cb)
self.max_scheduled = max_scheduled
self.scheduled = 0
def deinit(self):
if self.asynchronous:
self.refresh_task.cancel()
self.timer_task.cancel()
else:
self.timer.deinit()
event_loop._current_instance = None
def disable(self):
self.scheduled += self.max_scheduled
def enable(self):
self.scheduled -= self.max_scheduled
@staticmethod
def is_running():
return event_loop._current_instance is not None
@staticmethod
def current_instance():
return event_loop._current_instance
def task_handler(self, _):
try:
if lv._nesting.value == 0:
lv.task_handler()
if self.refresh_cb: self.refresh_cb()
self.scheduled -= 1
except Exception as e:
if self.exception_sink:
self.exception_sink(e)
def timer_cb(self, t):
# Can be called in Interrupt context
# Use task_handler_ref since passing self.task_handler would cause allocation.
lv.tick_inc(self.delay)
if self.scheduled < self.max_scheduled:
try:
micropython.schedule(self.task_handler_ref, 0)
self.scheduled += 1
except:
pass
async def async_refresh(self):
while True:
await self.refresh_event.wait()
if lv._nesting.value == 0:
self.refresh_event.clear()
try:
lv.task_handler()
except Exception as e:
if self.exception_sink:
self.exception_sink(e)
if self.refresh_cb: self.refresh_cb()
async def async_timer(self):
while True:
await uasyncio.sleep_ms(self.delay)
lv.tick_inc(self.delay)
self.refresh_event.set()
def default_exception_sink(self, e):
usys.print_exception(e)
event_loop.current_instance().deinit()