Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start of higher level SDK #11

Closed
wants to merge 11 commits into from
47 changes: 40 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,43 @@ As simple as:

```python
import asyncio
from frameutils import Bluetooth
from frameutils import Frame


async def main():
bluetooth = Bluetooth()
await bluetooth.connect()
# the with statement handles the connection and disconnection to Frame
async with Frame() as f:
# you can access the lower-level bluetooth connection via f.bluetooth, although you shouldn't need to do this often
print(f"Connected: {f.bluetooth.is_connected()}")

print(await bluetooth.send_lua("print('hello world')", await_print=True))
print(await bluetooth.send_lua("print(1 + 2)", await_print=True))
# let's get the current battery level
print(f"Frame battery: {await f.get_battery_level()}%")

# let's write (or overwrite) the file greeting.txt with "Hello world".
# You can provide a bytes object or convert a string with .encode()
await f.files.write_file("greeting.txt", b"Hello world")

# And now we read that file back.
# Note that we should convert the bytearray to a string via the .decode() method.
print((await f.files.read_file("greeting.txt")).decode())

# run_lua will automatically handle scripts that are too long for the MTU, so you don't need to worry about it.
# It will also automatically handle responses that are too long for the MTU automatically.
await f.run_lua("frame.display.text('Hello world', 50, 100);frame.display.show()")

# evaluate is equivalent to f.run_lua("print(\"1+2\"), await_print=True)
# It will also automatically handle responses that are too long for the MTU automatically.
print(await f.evaluate("1+2"))

# take a photo and save to disk
await f.camera.save_photo("frame-test-photo.jpg")
# or with more control
await f.camera.save_photo("frame-test-photo-2.jpg", autofocus_seconds=3, quality=f.camera.HIGH_QUALITY, autofocus_type=f.camera.AUTOFOCUS_TYPE_CENTER_WEIGHTED)
# or get the raw bytes
photo_bytes = await f.camera.take_photo(autofocus_seconds=1)

print("disconnected")

await bluetooth.disconnect()


asyncio.run(main())
Expand All @@ -33,8 +59,15 @@ asyncio.run(main())

## Tests

To run the unit tests, ensure you have an unconnected Frame device in range, and then run:
To run the unit tests, ensure you have pytest installed:

```sh
pip3 install pytest
```

With an unconnected Frame device in range, run:

```sh
python3 -m pytest tests/test_bluetooth.py
python3 -m pytest tests/test_files.py
```
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ classifiers = [
"Programming Language :: Python :: 3",
"Operating System :: OS Independent",
]
dependencies = ["bleak", "numpy", "scikit-learn"]
dependencies = ["bleak", "numpy", "scikit-learn", "exif"]

[project.scripts]
frameutils = "frameutils.cli:main"
Expand Down
4 changes: 3 additions & 1 deletion src/frameutils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
__all__ = ["bluetooth"]
__all__ = ["bluetooth", "files", "frame"]

from .bluetooth import Bluetooth
from .files import FrameFileSystem
from .frame import Frame
99 changes: 96 additions & 3 deletions src/frameutils/bluetooth.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,85 @@ def __init__(self):
self._awaiting_data_response = False
self._client = None
self._print_response = bytearray()
self._ongoing_print_response = None
self._ongoing_print_response_chunk_count = None
self._ongoing_data_response = None
self._ongoing_data_response_chunk_count = None
self._data_response = bytearray()
self._tx_characteristic = None
self._user_data_response_handler = lambda: None
self._user_disconnect_handler = lambda: None
self._user_print_response_handler = lambda: None
self._print_debugging = False

def _disconnect_handler(self, _):
self._user_disconnect_handler()
self.__init__()

async def _notification_handler(self, _, data):
if data[0] == 1:

OkGoDoIt marked this conversation as resolved.
Show resolved Hide resolved
if data[0] == 10:
OkGoDoIt marked this conversation as resolved.
Show resolved Hide resolved
# start of long printed data
if self._ongoing_print_response is None or self._ongoing_print_response_chunk_count is None:
self._ongoing_print_response = bytearray()
self._ongoing_print_response_chunk_count = 0
if self._print_debugging:
print("starting receiving new long printed data")
self._ongoing_print_response += data[1:]
OkGoDoIt marked this conversation as resolved.
Show resolved Hide resolved
self._ongoing_print_response_chunk_count += 1
if self._print_debugging:
print(f"received chunk #{self._ongoing_print_response_chunk_count}: "+data[1:].decode())
elif data[0] == 11:
# end of long printed data
total_expected_chunk_count_as_string = data[1:].decode()
if len(total_expected_chunk_count_as_string) > 0:
total_expected_chunk_count = int(total_expected_chunk_count_as_string)
if self._print_debugging:
print(f"received final chunk count: {total_expected_chunk_count}")
if self._ongoing_print_response_chunk_count != total_expected_chunk_count:
raise Exception(f"chunk count mismatch in long received data (expected {total_expected_chunk_count}, got {self._ongoing_print_response_chunk_count})")
self._awaiting_print_response = False
self._print_response = self._ongoing_print_response.decode()
self._ongoing_print_response = None
self._ongoing_print_response_chunk_count = None
if self._print_debugging:
print("finished receiving long printed data: "+self._print_response)
self._user_print_response_handler(self._print_response)

elif data[0] == 1 and data[1] == 1:
# start of long raw data
if self._ongoing_data_response is None or self._ongoing_data_response_chunk_count is None:
self._ongoing_data_response = bytearray()
self._ongoing_data_response_chunk_count = 0
self._data_response = None
if self._print_debugging:
print("starting receiving new long raw data")
self._ongoing_data_response += data[2:]
self._ongoing_data_response_chunk_count += 1
if self._print_debugging:
print(f"received data chunk #{self._ongoing_data_response_chunk_count}: {len(data[2:])} bytes")
elif data[0] == 1 and data[1] == 2:
# end of long raw data
total_expected_chunk_count_as_string = data[2:].decode()
if len(total_expected_chunk_count_as_string) > 0:
total_expected_chunk_count = int(total_expected_chunk_count_as_string)
if self._print_debugging:
print(f"received final data chunk count: {total_expected_chunk_count}")
if self._ongoing_data_response_chunk_count != total_expected_chunk_count:
raise Exception(f"chunk count mismatch in long received data (expected {total_expected_chunk_count}, got {self._ongoing_data_response_chunk_count})")
self._awaiting_data_response = False
self._data_response = self._ongoing_data_response
self._ongoing_data_response = None
self._ongoing_data_response_chunk_count = None
if self._print_debugging:
if self._data_response is None:
print("finished receiving long raw data: No bytes")
else:
print(f"finished receiving long raw data: {len(self._data_response)} bytes)")
self._user_data_response_handler(self._data_response)
elif data[0] == 1:
if self._print_debugging:
print(f"received data: {len(data[1:])} bytes")
if self._awaiting_data_response:
self._awaiting_data_response = False
self._data_response = data[1:]
Expand Down Expand Up @@ -131,9 +198,15 @@ def max_data_payload(self):
return self._client.mtu_size - 4
except AttributeError:
return 0

def set_print_debugging(self, value: bool):
"""
Sets whether to print debugging information when sending data.
"""
self._print_debugging = value

async def _transmit(self, data, show_me=False):
if show_me:
if show_me or self._print_debugging:
print(data) # TODO make this print nicer

if len(data) > self._client.mtu_size - 3:
Expand All @@ -155,7 +228,7 @@ async def send_lua(self, string: str, show_me=False, await_print=False):

if await_print:
self._awaiting_print_response = True
countdown = 5000
countdown = 10000

while self._awaiting_print_response:
await asyncio.sleep(0.001)
Expand All @@ -164,6 +237,22 @@ async def send_lua(self, string: str, show_me=False, await_print=False):
countdown -= 1

return self._print_response

async def wait_for_data(self, timeout: float = 30.0) -> bytes:
OkGoDoIt marked this conversation as resolved.
Show resolved Hide resolved
"""
Waits until data has been received from the device, with a max timeout in seconds
"""

self._awaiting_data_response = True
countdown = timeout * 1000

while self._awaiting_data_response:
await asyncio.sleep(0.001)
OkGoDoIt marked this conversation as resolved.
Show resolved Hide resolved
if countdown == 0:
raise Exception("device didn't respond")
OkGoDoIt marked this conversation as resolved.
Show resolved Hide resolved
countdown -= 1

return bytes(self._data_response)

async def send_data(self, data: bytearray, show_me=False, await_data=False):
"""
Expand Down Expand Up @@ -196,6 +285,8 @@ async def send_reset_signal(self, show_me=False):

If `show_me=True`, the exact bytes send to the device will be printed.
"""
if not self.is_connected():
await self.connect()
await self._transmit(bytearray(b"\x04"), show_me=show_me)

async def send_break_signal(self, show_me=False):
Expand All @@ -205,4 +296,6 @@ async def send_break_signal(self, show_me=False):

If `show_me=True`, the exact bytes send to the device will be printed.
"""
if not self.is_connected():
await self.connect()
await self._transmit(bytearray(b"\x03"), show_me=show_me)
91 changes: 91 additions & 0 deletions src/frameutils/camera.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from __future__ import annotations
import asyncio
OkGoDoIt marked this conversation as resolved.
Show resolved Hide resolved
from typing import Optional, TYPE_CHECKING
from exif import Image
from datetime import datetime

if TYPE_CHECKING:
from .frame import Frame

class Camera:
"""Helpers for working with the Frame camera."""

frame: "Frame" = None

LOW_QUALITY = 10
MEDIUM_QUALITY = 25
HIGH_QUALITY = 50
FULL_QUALITY = 100

AUTOFOCUS_TYPE_SPOT = "SPOT"
AUTOFOCUS_TYPE_AVERAGE = "AVERAGE"
AUTOFOCUS_TYPE_CENTER_WEIGHTED = "CENTER_WEIGHTED"

_auto_sleep = True
_auto_process_photo = True

def __init__(self, frame: "Frame"):
self.frame = frame

@property
def auto_sleep(self) -> bool:
"""If true, the camera will automatically sleep after taking a photo."""
return self._auto_sleep

@auto_sleep.setter
def auto_sleep(self, value: bool):
self._auto_sleep = value

@property
def auto_process_photo(self) -> bool:
"""If true, the camera will automatically process the photo to correct rotation and add metadata"""
return self._auto_process_photo

@auto_process_photo.setter
def auto_process_photo(self, value: bool):
self._auto_process_photo = value

async def take_photo(self, autofocus_seconds: Optional[int] = 3, quality: int = MEDIUM_QUALITY, autofocus_type: str = AUTOFOCUS_TYPE_AVERAGE) -> bytes:
"""Take a photo with the camera.
If autofocus_seconds is provided, the camera will attempt to focus for the specified number of seconds.
Quality is LOW_QUALITY (10), MEDIUM_QUALITY (25), HIGH_QUALITY (50), or FULL_QUALITY (100).
"""
# TODO: Either camera sleep and wake don't work correctly or they're not properly documented, this doesn't work for now
#if self.auto_sleep:
# await self.frame.bluetooth.send_lua("frame.camera.wake()")
# await asyncio.sleep(0.1)

await self.frame.bluetooth.send_lua(f"cameraCaptureAndSend({quality},{autofocus_seconds or 'nil'},{autofocus_type})")
image_buffer = await self.frame.bluetooth.wait_for_data()

#if self.auto_sleep:
# await self.frame.bluetooth.send_lua("frame.camera.sleep()")

if image_buffer is None or len(image_buffer) == 0:
raise Exception("Failed to get photo")

if self.auto_process_photo:
image_buffer = self.process_photo(image_buffer, quality, autofocus_type)
return image_buffer

async def save_photo(self, filename: str, autofocus_seconds: Optional[int] = 3, quality: int = MEDIUM_QUALITY, autofocus_type: str = AUTOFOCUS_TYPE_AVERAGE):
image_buffer = await self.take_photo(autofocus_seconds, quality, autofocus_type)

with open(filename, "wb") as f:
f.write(image_buffer)

def process_photo(self, image_buffer: bytes, quality: int, autofocus_type: str) -> bytes:
"""Process a photo to correct rotation and add metadata"""
image = Image(image_buffer)
image.orientation = 8
image.make = "Brilliant Labs"
image.model = "Frame"
image.software = "Frame Python SDK"
if autofocus_type == self.AUTOFOCUS_TYPE_AVERAGE:
image.metering_mode = 1
elif autofocus_type == self.AUTOFOCUS_TYPE_CENTER_WEIGHTED:
image.metering_mode = 2
elif autofocus_type == self.AUTOFOCUS_TYPE_SPOT:
image.metering_mode = 3
image.datetime_original = datetime.now().strftime("%Y:%m:%d %H:%M:%S")
return image.get_file()
Loading