Skip to content

Commit

Permalink
add typing, better async event handling, consts for magic numbers, an…
Browse files Browse the repository at this point in the history
…d other review feedback
  • Loading branch information
OkGoDoIt committed Jul 24, 2024
1 parent cab1647 commit e5f4c65
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 119 deletions.
188 changes: 93 additions & 95 deletions src/frameutils/bluetooth.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,49 @@
import asyncio
from typing import Optional
from typing import Optional, Callable, List, Tuple, Dict, Any

from bleak import BleakClient, BleakScanner, BleakError

_FRAME_DATA_PREFIX = 1
_FRAME_LONG_TEXT_PREFIX = 10
_FRAME_LONG_TEXT_END_PREFIX = 11
_FRAME_LONG_DATA_PREFIX = 1
_FRAME_LONG_DATA_END_PREFIX = 2

class Bluetooth:
"""
Frame bluetooth class for managing a connection and transferring data to and
from the device.
"""

_SERVICE_UUID = "7a230001-5475-a6a4-654c-8431f6ad49c4"
_TX_CHARACTERISTIC_UUID = "7a230002-5475-a6a4-654c-8431f6ad49c4"
_RX_CHARACTERISTIC_UUID = "7a230003-5475-a6a4-654c-8431f6ad49c4"

_SERVICE_UUID: str = "7a230001-5475-a6a4-654c-8431f6ad49c4"
_TX_CHARACTERISTIC_UUID: str = "7a230002-5475-a6a4-654c-8431f6ad49c4"
_RX_CHARACTERISTIC_UUID: str = "7a230003-5475-a6a4-654c-8431f6ad49c4"
def __init__(self):
self._awaiting_print_response = False
self._awaiting_data_response = False
self._client = None
self._print_response = ""
self._ongoing_print_response = None
self._ongoing_print_response_chunk_count = None
self._ongoing_data_response = None
self._ongoing_data_response_chunk_count = None
self._data_response = bytearray()
self._tx_characteristic = None
self._user_data_response_handler = lambda: None
self._user_disconnect_handler = lambda: None
self._user_print_response_handler = lambda: None
self._print_debugging = False
self._default_timeout = 10.0

def _disconnect_handler(self, _):
self._client: Optional[BleakClient] = None
self._print_response: str = ""
self._ongoing_print_response: Optional[bytearray] = None
self._ongoing_print_response_chunk_count: Optional[int] = None
self._ongoing_data_response: Optional[bytearray] = None
self._ongoing_data_response_chunk_count: Optional[int] = None
self._data_response: bytearray = bytearray()
self._tx_characteristic: Optional[Any] = None
self._user_data_response_handler: Callable[[bytes], None] = lambda _: None
self._user_disconnect_handler: Callable[[], None] = lambda: None
self._user_print_response_handler: Callable[[str], None] = lambda _: None
self._print_debugging: bool = False
self._default_timeout: float = 10.0
self._data_response_event: asyncio.Event = asyncio.Event()
self._print_response_event: asyncio.Event = asyncio.Event()
self._max_receive_buffer: int = 10 * 1024 * 1024

def _disconnect_handler(self, _: Any) -> None:
self._user_disconnect_handler()
self.__init__()

async def _notification_handler(self, _, data):

if data[0] == 10:
# start of long printed data
async def _notification_handler(self, _: Any, data: bytearray) -> None:
if data[0] == _FRAME_LONG_TEXT_PREFIX:
# start of long printed data from prntLng() function
if self._ongoing_print_response is None or self._ongoing_print_response_chunk_count is None:
self._ongoing_print_response = bytearray()
self._ongoing_print_response_chunk_count = 0
Expand All @@ -48,25 +53,28 @@ async def _notification_handler(self, _, data):
self._ongoing_print_response_chunk_count += 1
if self._print_debugging:
print(f"received chunk #{self._ongoing_print_response_chunk_count}: "+data[1:].decode())
elif data[0] == 11:
# end of long printed data
total_expected_chunk_count_as_string = data[1:].decode()
if len(self._ongoing_print_response) > self._max_receive_buffer:
raise Exception(f"buffered received long printed text is more than {self._max_receive_buffer} bytes")

elif data[0] == _FRAME_LONG_TEXT_END_PREFIX:
# end of long printed data from prntLng() function
total_expected_chunk_count_as_string: str = data[1:].decode()
if len(total_expected_chunk_count_as_string) > 0:
total_expected_chunk_count = int(total_expected_chunk_count_as_string)
total_expected_chunk_count: int = int(total_expected_chunk_count_as_string)
if self._print_debugging:
print(f"received final chunk count: {total_expected_chunk_count}")
if self._ongoing_print_response_chunk_count != total_expected_chunk_count:
raise Exception(f"chunk count mismatch in long received data (expected {total_expected_chunk_count}, got {self._ongoing_print_response_chunk_count})")
self._awaiting_print_response = False
self._print_response = self._ongoing_print_response.decode()
self._print_response_event.set()
self._ongoing_print_response = None
self._ongoing_print_response_chunk_count = None
if self._print_debugging:
print("finished receiving long printed data: "+self._print_response)
self._user_print_response_handler(self._print_response)

elif data[0] == 1 and data[1] == 1:
# start of long raw data
elif data[0] == _FRAME_DATA_PREFIX and data[1] == _FRAME_LONG_DATA_PREFIX:
# start of long raw data from frame.bluetooth.send("\001"..data)
if self._ongoing_data_response is None or self._ongoing_data_response_chunk_count is None:
self._ongoing_data_response = bytearray()
self._ongoing_data_response_chunk_count = 0
Expand All @@ -77,17 +85,20 @@ async def _notification_handler(self, _, data):
self._ongoing_data_response_chunk_count += 1
if self._print_debugging:
print(f"received data chunk #{self._ongoing_data_response_chunk_count}: {len(data[2:])} bytes")
elif data[0] == 1 and data[1] == 2:
# end of long raw data
total_expected_chunk_count_as_string = data[2:].decode()
if len(self._ongoing_data_response) > self._max_receive_buffer:
raise Exception(f"buffered received long raw data is more than {self._max_receive_buffer} bytes")

elif data[0] == _FRAME_DATA_PREFIX and data[1] == _FRAME_LONG_DATA_END_PREFIX:
# end of long raw data from frame.bluetooth.send("\002"..chunkCount)
total_expected_chunk_count_as_string: str = data[2:].decode()
if len(total_expected_chunk_count_as_string) > 0:
total_expected_chunk_count = int(total_expected_chunk_count_as_string)
total_expected_chunk_count: int = int(total_expected_chunk_count_as_string)
if self._print_debugging:
print(f"received final data chunk count: {total_expected_chunk_count}")
if self._ongoing_data_response_chunk_count != total_expected_chunk_count:
raise Exception(f"chunk count mismatch in long received data (expected {total_expected_chunk_count}, got {self._ongoing_data_response_chunk_count})")
self._awaiting_data_response = False
self._data_response = self._ongoing_data_response
self._data_response_event.set()
self._ongoing_data_response = None
self._ongoing_data_response_chunk_count = None
if self._print_debugging:
Expand All @@ -96,25 +107,27 @@ async def _notification_handler(self, _, data):
else:
print(f"finished receiving long raw data: {len(self._data_response)} bytes)")
self._user_data_response_handler(self._data_response)
elif data[0] == 1:

elif data[0] == _FRAME_DATA_PREFIX:
# received single chunk raw data from frame.bluetooth.send(data)
if self._print_debugging:
print(f"received data: {len(data[1:])} bytes")
if self._awaiting_data_response:
self._awaiting_data_response = False
self._data_response = data[1:]
self._data_response = data[1:]
self._data_response_event.set()
self._user_data_response_handler(data[1:])

else:
if self._awaiting_print_response:
self._awaiting_print_response = False
self._print_response = data.decode()
# received single chunk printed text from print()
self._print_response = data.decode()
self._print_response_event.set()
self._user_print_response_handler(data.decode())

async def connect(
self,
print_response_handler=lambda _: None,
data_response_handler=lambda _: None,
disconnect_handler=lambda: None,
):
print_response_handler: Callable[[str], None] = lambda _: None,
data_response_handler: Callable[[bytes], None] = lambda _: None,
disconnect_handler: Callable[[], None] = lambda: None,
) -> None:
"""
Connects to the nearest Frame device.
Expand All @@ -129,20 +142,20 @@ async def connect(
self._user_data_response_handler = data_response_handler

# returns list of (BLEDevice, AdvertisementData)
devices = await BleakScanner.discover(3, return_adv=True)
devices: Dict[str, Tuple[Any, Any]] = await BleakScanner.discover(3, return_adv=True)

filtered_list = []
filtered_list: List[Tuple[Any, Any]] = []
for d in devices.values():
if self._SERVICE_UUID in d[1].service_uuids:
filtered_list.append(d)

# connect to closest device
filtered_list.sort(key=lambda x: x[1].rssi, reverse=True)
try:
device = filtered_list[0][0]
device: Any = filtered_list[0][0]

except IndexError:
raise Exception("no devices found")
raise Exception("No Frame devices found")

self._client = BleakClient(
device,
Expand All @@ -159,22 +172,22 @@ async def connect(
except BleakError as e:
raise Exception("Device needs to be re-paired: "+str(e))

service = self._client.services.get_service(
service: Any = self._client.services.get_service(
self._SERVICE_UUID,
)

self._tx_characteristic = service.get_characteristic(
self._TX_CHARACTERISTIC_UUID,
)

async def disconnect(self):
async def disconnect(self) -> None:
"""
Disconnects from the device.
"""
await self._client.disconnect()
self._disconnect_handler(None)

def is_connected(self):
def is_connected(self) -> bool:
"""
Returns `True` if the device is connected. `False` otherwise.
"""
Expand All @@ -183,7 +196,7 @@ def is_connected(self):
except AttributeError:
return False

def max_lua_payload(self):
def max_lua_payload(self) -> int:
"""
Returns the maximum length of a Lua string which may be transmitted.
"""
Expand All @@ -192,7 +205,7 @@ def max_lua_payload(self):
except AttributeError:
return 0

def max_data_payload(self):
def max_data_payload(self) -> int:
"""
Returns the maximum length of a raw bytearray which may be transmitted.
"""
Expand All @@ -209,21 +222,21 @@ def default_timeout(self) -> float:
return self._default_timeout

@default_timeout.setter
def default_timeout(self, value: float):
def default_timeout(self, value: float) -> None:
"""
Sets the default timeout value in seconds
"""
if value < 0:
raise ValueError("default_timeout must be a non-negative float")
self._default_timeout = value

def set_print_debugging(self, value: bool):
def set_print_debugging(self, value: bool) -> None:
"""
Sets whether to print debugging information when sending data.
"""
self._print_debugging = value

async def _transmit(self, data, show_me=False):
async def _transmit(self, data: bytearray, show_me: bool = False) -> None:
if show_me or self._print_debugging:
print(data) # TODO make this print nicer

Expand All @@ -232,7 +245,7 @@ async def _transmit(self, data, show_me=False):

await self._client.write_gatt_char(self._tx_characteristic, data)

async def send_lua(self, string: str, show_me=False, await_print=False, timeout: Optional[float] = None):
async def send_lua(self, string: str, show_me: bool = False, await_print: bool = False, timeout: Optional[float] = None) -> Optional[str]:
"""
Sends a Lua string to the device. The string length must be less than or
equal to `max_lua_payload()`.
Expand All @@ -247,44 +260,38 @@ async def send_lua(self, string: str, show_me=False, await_print=False, timeout:
if await_print:
return await self.wait_for_print(timeout)

async def wait_for_print(self, timeout: float = None) -> str:
async def wait_for_print(self, timeout: Optional[float] = None) -> str:
"""
Waits until a Lua print() occurs, with a max timeout in seconds
"""
if timeout is None:
timeout = self._default_timeout

self._awaiting_print_response = True
countdown = timeout * 1000
self._print_response_event.clear()

while self._awaiting_print_response:
await asyncio.sleep(0.001)
if countdown == 0:
self._awaiting_print_response = False
raise Exception("device didn't respond")
countdown -= 1
try:
await asyncio.wait_for(self._print_response_event.wait(), timeout)
except asyncio.TimeoutError:
raise Exception(f"Frame didn't respond with printed data (from print() or prntLng()) within {timeout} seconds")

return self._print_response

async def wait_for_data(self, timeout: float = None) -> bytes:
async def wait_for_data(self, timeout: Optional[float] = None) -> bytes:
"""
Waits until data has been received from the device, with a max timeout in seconds
"""
if timeout is None:
timeout = self._default_timeout

self._awaiting_data_response = True
countdown = timeout * 1000

while self._awaiting_data_response:
await asyncio.sleep(0.001)
if countdown == 0:
raise Exception("device didn't respond")
countdown -= 1
self._data_response_event.clear()

try:
await asyncio.wait_for(self._data_response_event.wait(), timeout)
except asyncio.TimeoutError:
raise Exception(f"Frame didn't respond with data (from frame.bluetooth.send(data)) within {timeout} seconds")
return bytes(self._data_response)

async def send_data(self, data: bytearray, show_me=False, await_data=False):
async def send_data(self, data: bytearray, show_me: bool = False, await_data: bool = False) -> Optional[bytes]:
"""
Sends raw data to the device. The payload length must be less than or
equal to `max_data_payload()`.
Expand All @@ -297,18 +304,9 @@ async def send_data(self, data: bytearray, show_me=False, await_data=False):
await self._transmit(bytearray(b"\x01") + data, show_me=show_me)

if await_data:
self._awaiting_data_response = True
countdown = 5000

while self._awaiting_data_response:
await asyncio.sleep(0.001)
if countdown == 0:
raise Exception("device didn't respond")
countdown -= 1

return self._data_response
return await self.wait_for_data()

async def send_reset_signal(self, show_me=False):
async def send_reset_signal(self, show_me: bool = False) -> None:
"""
Sends a reset signal to the device which will reset the Lua virtual
machine.
Expand All @@ -319,7 +317,7 @@ async def send_reset_signal(self, show_me=False):
await self.connect()
await self._transmit(bytearray(b"\x04"), show_me=show_me)

async def send_break_signal(self, show_me=False):
async def send_break_signal(self, show_me: bool = False) -> None:
"""
Sends a break signal to the device which will break any currently
executing Lua script.
Expand All @@ -328,4 +326,4 @@ async def send_break_signal(self, show_me=False):
"""
if not self.is_connected():
await self.connect()
await self._transmit(bytearray(b"\x03"), show_me=show_me)
await self._transmit(bytearray(b"\x03"), show_me=show_me)
1 change: 0 additions & 1 deletion src/frameutils/camera.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from __future__ import annotations
import asyncio
from typing import Optional, TYPE_CHECKING
from exif import Image
from datetime import datetime
Expand Down
4 changes: 2 additions & 2 deletions src/frameutils/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
class FrameFileSystem:
"""Helpers for accessing the Frame filesystem."""

frame : "Frame" = None
frame: "Frame" = None

def __init__(self, frame: "Frame"):
self.frame = frame

async def write_file(self, path: str, data: bytes, checked: bool = False):
async def write_file(self, path: str, data: bytes, checked: bool = False) -> None:
"""Write a file to the device."""

response = await self.frame.bluetooth.send_lua(
Expand Down
Loading

0 comments on commit e5f4c65

Please sign in to comment.