forked from AceCentre/RelayKeys
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserial_wrappers.py
85 lines (64 loc) · 2.2 KB
/
serial_wrappers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import asyncio
import time
class BLESerialWrapper(object):
UART_SERVICE_UUID = "6E400001-B5A3-F393-E0A9-E50E24DCCA9E"
UART_RX_CHAR_UUID = "6E400002-B5A3-F393-E0A9-E50E24DCCA9E"
UART_TX_CHAR_UUID = "6E400003-B5A3-F393-E0A9-E50E24DCCA9E"
def __init__(self, ble_client):
self.ble_client = ble_client
self.receive_buff = b''
self.timeout_val = 3
async def init_receive(self):
await self.ble_client.start_notify(BLESerialWrapper.UART_TX_CHAR_UUID, self.receive_data)
def __enter__(self):
return self
def __exit__(self, a, b, c):
pass
def flushInput(self):
self.receive_buff = b''
def flushOutput(self):
pass
async def write(self, data):
await self.ble_client.write_gatt_char(BLESerialWrapper.UART_RX_CHAR_UUID, data)
def receive_data(self, _: int, data: bytearray):
self.receive_buff += data
def read_all(self):
ret_buffer = self.receive_buff
self.receive_buff = b''
return ret_buffer
async def readline(self):
start = 0
end = 0
start_time = time.time()
#print("start readline")
while True:
if(end != len(self.receive_buff)):
if self.receive_buff[end] == 10:
line = self.receive_buff[start:end+1]
self.receive_buff = self.receive_buff[end+1::]
return line
#print("iterate")
end+=1
else:
#print("reached end, waiting")
await asyncio.sleep(0.05)
# timeout check
if (time.time()-start_time) > self.timeout_val:
return b''
class DummySerial (object):
def __init__(self, devpath, baud, **kwargs):
pass
def __enter__(self):
return self
def __exit__(self, a, b, c):
pass
def write(self, data):
print("{}".format(data))
def flushInput(self):
pass
def flushOutput(self):
pass
def readline(self):
return b"OK\n"
def read_all(self):
return b"Dummy1\nDummy2\nDummy3\n"