-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pairing.py
138 lines (121 loc) · 5.54 KB
/
pairing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# The MIT License (MIT)
# Copyright (c) 2023 Edgaras Janušauskas and Inovatorius MB (www.fildz.com)
################################################################################
# FILDZ CYBEROS PAIRING
#
# Pairing mechanism for cyberwares.
# TODO:
# 1. Pairing mechanism is unreliable due to if one cyberware sends last pairing request while second cyberware sends
# pairing request after it, first cyberware will receive the request while the other one will not.
import uasyncio as asyncio
from uasyncio import Event
import ubinascii
import fildz_cyberos as cyberos
class Pairing:
def __init__(self):
self._on_pair = Event()
self._on_pairing = Event()
self._on_paired = Event()
asyncio.create_task(self._event_pair())
asyncio.create_task(self._event_pairing())
asyncio.create_task(self._event_pairing_mode())
# Events.
asyncio.create_task(self._push())
################################################################################
# Events
#
@property
def on_pair(self):
return self._on_pair
@property
def on_pairing(self):
return self._on_pairing
@property
def on_paired(self):
return self._on_paired
################################################################################
# Tasks
#
# Pairing mode.
async def _event_pairing_mode(self):
_sta_reconnect = False
_ap_disable = False
while True:
await cyberos.cyberware.power_button.on_click.wait()
try:
await asyncio.wait_for_ms(cyberos.cyberware.power_button.on_down.wait(),
cyberos.cyberware.power_button._double_click_ms)
except asyncio.TimeoutError:
continue
try:
await asyncio.wait_for(cyberos.cyberware.power_button.on_up.wait(), 3)
except asyncio.TimeoutError:
print('CYBEROS > Pairing mode started')
self._on_pair.set()
_sta_reconnect = True if cyberos.network.on_sta_connected.is_set() else False
_ap_disable = False if cyberos.network.on_ap_active.is_set() else True
if _sta_reconnect:
await cyberos.network.disconnect()
cyberos.network.on_ap_up.set()
elif _ap_disable:
cyberos.network.on_ap_up.set()
await asyncio.sleep(0) # Wait for all the events to complete.
await asyncio.sleep(3)
self._on_pair.clear()
print('CYBEROS > Pairing mode timeout')
if _sta_reconnect:
await cyberos.network.connect(cyberos.network.sta_ssid, cyberos.network.sta_key)
if _ap_disable:
cyberos.network.on_ap_down.set()
if cyberos.cyberware.power_button.on_hold.is_set():
await cyberos.cyberware.power_button.on_up.wait()
# Send pairing request.
async def _event_pair(self):
while True:
await self._on_pair.wait()
await cyberos.network.on_ap_active.wait()
event = await cyberos.event.encode('on_pairing',
(cyberos.cyberware.mac_private,
cyberos.network.ap_ch.to_bytes(1, 'little')))
# Send pairing request every second.
while self._on_pair.is_set() and cyberos.network.on_ap_active.is_set():
print('CYBEROS > Pairing mode')
await cyberos.espnow.asend(cyberos.cyberware.mac_public, event, sync=False)
await cyberos.cyberware.buzzer.play(4)
await asyncio.sleep(1)
# Received a new pairing request.
async def _event_pairing(self):
while True:
await self._on_pairing.wait()
# Pairing mode is active?
if self._on_pair.is_set():
# Are we paired with the cyberware?
if cyberos.event.sender in cyberos.cyberwares['subscribed']:
if 'mac' in cyberos.cyberwares['subscribed'][cyberos.event.sender]:
self._on_pairing.clear()
continue
else:
# We are not paired, but subscribed to cyberware events.
cyberos.cyberwares['subscribed'][cyberos.event.sender].update({
'mac': cyberos.event.args[0],
'mac_str': ubinascii.hexlify(cyberos.event.args[0], ':').decode().upper(),
'ch': int.from_bytes(cyberos.event.args[1], 'little')})
else:
cyberos.cyberwares['subscribed'][cyberos.event.sender] = {
'mac': cyberos.event.args[0],
'mac_str': ubinascii.hexlify(cyberos.event.args[0], ':').decode().upper(),
'ch': int.from_bytes(cyberos.event.args[1], 'little'),
'events': {}}
print('CYBEROS > Paired with', cyberos.event.sender)
self._on_paired.set()
cyberos.settings.on_save_cyberwares.set()
self._on_pairing.clear()
self._on_paired.clear()
await cyberos.cyberware.buzzer.play(2)
else:
self._on_pairing.clear()
async def _push(self):
cyberos.cyberwares[cyberos.network.ap_ssid]['events'].update(
{
'on_pairing': self._on_pairing,
})