Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tools/upload_file.py: turn into a class #270

Merged
merged 1 commit into from
Oct 9, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 86 additions & 88 deletions tools/upload_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,108 +13,106 @@
from bleak.uuids import register_uuids


UART_SERVICE_UUID = "6e400001-b5a3-f393-e0a9-e50e24dcca9e"
UART_RX_CHAR_UUID = "6e400002-b5a3-f393-e0a9-e50e24dcca9e"
UART_TX_CHAR_UUID = "6e400003-b5a3-f393-e0a9-e50e24dcca9e"

DATA_SERVICE_UUID = "e5700001-7bac-429a-b4ce-57ff900f479d"
DATA_RX_CHAR_UUID = "e5700002-7bac-429a-b4ce-57ff900f479d"
DATA_TX_CHAR_UUID = "e5700003-7bac-429a-b4ce-57ff900f479d"

register_uuids({
DATA_SERVICE_UUID: "Monocle Raw Serivce",
DATA_TX_CHAR_UUID: "Monocle Raw RX",
DATA_RX_CHAR_UUID: "Monocle Raw TX",
})

repl_rx_event = asyncio.Event()
repl_rx_last = None
data_rx_event = asyncio.Event()
repl_rx_last = None

async def upload_file(file):

def match_repl_uuid(device: BLEDevice, adv: AdvertisementData):
class MonocleScript:
UART_SERVICE_UUID = "6e400001-b5a3-f393-e0a9-e50e24dcca9e"
UART_RX_CHAR_UUID = "6e400002-b5a3-f393-e0a9-e50e24dcca9e"
UART_TX_CHAR_UUID = "6e400003-b5a3-f393-e0a9-e50e24dcca9e"

DATA_SERVICE_UUID = "e5700001-7bac-429a-b4ce-57ff900f479d"
DATA_RX_CHAR_UUID = "e5700002-7bac-429a-b4ce-57ff900f479d"
DATA_TX_CHAR_UUID = "e5700003-7bac-429a-b4ce-57ff900f479d"

def __init__(self):
register_uuids({
self.DATA_SERVICE_UUID: "Monocle Data Serivce",
self.DATA_TX_CHAR_UUID: "Monocle Data RX",
self.DATA_RX_CHAR_UUID: "Monocle Data TX",
})

@classmethod
async def run(cls, *args):
self = cls()

device = await BleakScanner.find_device_by_filter(self.match_uart_uuid)
if device is None:
print("no matching device found\n")
exit(1)
print("Connected to the Monocle")

async with BleakClient(device, disconnected_callback=self.handle_disconnect) as c:
self.client = c
await self.init_uart_service()
await self.init_data_service()
await self.set_monocle_raw_mode()
await self.script(*args)
await self.client.write_gatt_char(self.uart_rx_char, b"\x02")

def match_uart_uuid(self, device:BLEDevice, adv:AdvertisementData):
print(f"uuids={adv.service_uuids}")
return UART_SERVICE_UUID.lower() in adv.service_uuids
return self.UART_SERVICE_UUID.lower() in adv.service_uuids

def handle_disconnect(_: BleakClient):
def handle_disconnect(self, _:BleakClient):
print("\nDevice was disconnected.")
for task in asyncio.all_tasks():
task.cancel()
exit(1)

def handle_repl_tx(_: BleakGATTCharacteristic, data: bytearray):
def handle_uart_tx(self, _:BleakGATTCharacteristic, data:bytearray):
# Here, handle data sent by the Monocle with `print()`
global repl_rx_last
repl_rx_last = data
repl_rx_event.set()
self.uart_rx_last = data
self.uart_rx_event.set()

def handle_data_tx(_: BleakGATTCharacteristic, data: bytearray):
def handle_data_tx(self, _:BleakGATTCharacteristic, data:bytearray):
# Here, handle data sent by the Monocle with `bluetooth.send()`
global data_rx_last
data_rx_last = data
data_rx_event.set()

device = await BleakScanner.find_device_by_filter(match_repl_uuid)
if device is None:
print("no matching device found\n")
exit(1)

async def send_command(client, cmd):
global repl_rx_event
global repl_rx_last

await client.write_gatt_char(repl_rx_char, cmd)
await repl_rx_event.wait()
repl_rx_event.clear()
print(repl_rx_last)
assert repl_rx_last[0:2] == b'OK'

print("Connected to the Monocle")

async with BleakClient(device, disconnected_callback=handle_disconnect) as client:
await client.start_notify(UART_TX_CHAR_UUID, handle_repl_tx)
await client.start_notify(DATA_TX_CHAR_UUID, handle_data_tx)

repl_service = client.services.get_service(UART_SERVICE_UUID)
data_service = client.services.get_service(DATA_SERVICE_UUID)
repl_rx_char = repl_service.get_characteristic(UART_RX_CHAR_UUID)
data_rx_char = data_service.get_characteristic(DATA_RX_CHAR_UUID)

# Example application: upload a file to the Monocle
loop = asyncio.get_running_loop()

global repl_rx_event
global repl_rx_last

print(">>> switching the terminal to raw mode")
await client.write_gatt_char(repl_rx_char, b'\x01')
await repl_rx_event.wait()
await asyncio.sleep(1)
repl_rx_event.clear()
print(repl_rx_last)
self.data_rx_last = data
self.data_rx_event.set()

async def send_command(self, cmd):
await self.client.write_gatt_char(self.uart_rx_char, cmd.encode('ascii') + b'\x04')
await self.uart_rx_event.wait()
self.uart_rx_event.clear()
print(self.uart_rx_last)
assert self.uart_rx_last[0:2] == b'OK'

async def init_uart_service(self):
await self.client.start_notify(self.UART_TX_CHAR_UUID, self.handle_uart_tx)
uart_service = self.client.services.get_service(self.UART_SERVICE_UUID)
self.uart_rx_char = uart_service.get_characteristic(self.UART_RX_CHAR_UUID)
self.uart_rx_event = asyncio.Event()
self.uart_rx_last = None

async def init_data_service(self):
await self.client.start_notify(self.DATA_TX_CHAR_UUID, self.handle_data_tx)
data_service = self.client.services.get_service(self.DATA_SERVICE_UUID)
self.data_rx_char = data_service.get_characteristic(self.DATA_RX_CHAR_UUID)
self.data_rx_event = asyncio.Event()
self.data_rx_last = None

async def set_monocle_raw_mode(self):
await self.client.write_gatt_char(self.uart_rx_char, b'\x01')
await self.uart_rx_event.wait()
await asyncio.sleep(0.5)
self.uart_rx_event.clear()


class UploadFileScript(MonocleScript):
"""
Example application: upload a file to the Monocle
"""
async def script(self, file):

print(">>> opening a file to write to")
await send_command(client, ''f"f = open('{file}', 'wb')\x04".encode("ascii"))
await self.send_command(f"f = open('{file}', 'wb')")

print(">>> writing the data to the file")
with open(file, 'rb') as f:
size = repl_rx_char.max_write_without_response_size - len("f.write(r'''''')\x04")
while data := f.read(size):
sep = b"'''" if b'"""' in data else b'"""'
await send_command(client, b"f.write(r" + sep + data + sep + b")\x04")
while data := f.read(100):
print(data)
await self.send_command(f"f.write({bytes(data).__repr__()})")

print(">>> closing the file")
await send_command(client, b"f.close()\x04")

print(">>> switch back to friendly mode")
await client.write_gatt_char(repl_rx_char, b"\x02")
await self.send_command("f.close()")

print(">>> script done")

print(">>> upload done!")

if __name__ == "__main__":
file = sys.argv[1]
try:
asyncio.run(upload_file(file))
except asyncio.CancelledError:
pass
asyncio.run(UploadFileScript.run(sys.argv[1]))