Skip to content

Commit

Permalink
Updates for linting and typing
Browse files Browse the repository at this point in the history
  • Loading branch information
WillB97 committed Jun 29, 2024
1 parent 9e9e226 commit d8ddbef
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 30 deletions.
10 changes: 5 additions & 5 deletions sbot/arduino.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,20 +108,20 @@ def _get_simulator_boards(cls) -> MappingProxyType[str, Arduino]:
"""
boards = {}
# The filter here is the name of the emulated board in the simulator
for board in get_simulator_boards('Arduino'):
for board_info in get_simulator_boards('Arduino'):

# Create board identity from the info given
initial_identity = BoardIdentity(
manufacturer='sbot_simulator',
board_type=board.type_str,
asset_tag=board.serial_number,
board_type=board_info.type_str,
asset_tag=board_info.serial_number,
)

try:
board = cls(board.url, initial_identity)
board = cls(board_info.url, initial_identity)
except BoardDisconnectionError:
logger.warning(
f"Simulator specified arduino at port {board.url!r}, "
f"Simulator specified arduino at port {board_info.url!r}, "
"could not be identified. Ignoring this device")
continue
except IncorrectBoardError as err:
Expand Down
10 changes: 8 additions & 2 deletions sbot/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ def from_webots_camera(cls, camera_info: BoardInfo) -> 'AprilCamera':
"""
Create a camera from a webots camera.
:param camera_info: The information about the virtual camera, including the url to connect to.
:param camera_info: The information about the virtual camera,
including the url to connect to.
:return: The camera object.
"""
from .simulator.camera import WebotsRemoteCameraSource
Expand All @@ -106,7 +107,12 @@ def from_webots_camera(cls, camera_info: BoardInfo) -> 'AprilCamera':
)

@classmethod
def from_id(cls, camera_id: int, camera_data: CalibratedCamera, serial_num: str) -> 'AprilCamera':
def from_id(
cls,
camera_id: int,
camera_data: CalibratedCamera,
serial_num: str,
) -> 'AprilCamera':
"""
Create a camera from an ID.
Expand Down
10 changes: 5 additions & 5 deletions sbot/motor_board.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,20 @@ def _get_simulator_boards(cls) -> MappingProxyType[str, MotorBoard]:
"""
boards = {}
# The filter here is the name of the emulated board in the simulator
for board in get_simulator_boards('MotorBoard'):
for board_info in get_simulator_boards('MotorBoard'):

# Create board identity from the info given
initial_identity = BoardIdentity(
manufacturer='sbot_simulator',
board_type=board.type_str,
asset_tag=board.serial_number,
board_type=board_info.type_str,
asset_tag=board_info.serial_number,
)

try:
board = cls(board.url, initial_identity)
board = cls(board_info.url, initial_identity)
except BoardDisconnectionError:
logger.warning(
f"Simulator specified motor board at port {board.url!r}, "
f"Simulator specified motor board at port {board_info.url!r}, "
"could not be identified. Ignoring this device")
continue
except IncorrectBoardError as err:
Expand Down
10 changes: 5 additions & 5 deletions sbot/power_board.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,20 +120,20 @@ def _get_simulator_boards(cls) -> MappingProxyType[str, PowerBoard]:
"""
boards = {}
# The filter here is the name of the emulated board in the simulator
for board in get_simulator_boards('PowerBoard'):
for board_info in get_simulator_boards('PowerBoard'):

# Create board identity from the info given
initial_identity = BoardIdentity(
manufacturer='sbot_simulator',
board_type=board.type_str,
asset_tag=board.serial_number,
board_type=board_info.type_str,
asset_tag=board_info.serial_number,
)

try:
board = cls(board.url, initial_identity)
board = cls(board_info.url, initial_identity)
except BoardDisconnectionError:
logger.warning(
f"Simulator specified power board at port {board.url!r}, "
f"Simulator specified power board at port {board_info.url!r}, "
"could not be identified. Ignoring this device")
continue
except IncorrectBoardError as err:
Expand Down
6 changes: 5 additions & 1 deletion sbot/robot.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import itertools
import logging
from socket import socket
from time import sleep
from types import MappingProxyType
from typing import Mapping
Expand Down Expand Up @@ -55,10 +56,13 @@ def __init__(
trace_logging: bool = False,
manual_boards: dict[str, list[str]] | None = None,
) -> None:
self._lock: TimeServer | socket | None
if IN_SIMULATOR:
self._lock = TimeServer.initialise()
if self._lock is None:
raise OSError('Unable to obtain lock, Is another robot instance already running?')
raise OSError(
'Unable to obtain lock, Is another robot instance already running?'
)
else:
self._lock = obtain_lock()
self._metadata: Metadata | None = None
Expand Down
2 changes: 1 addition & 1 deletion sbot/serial_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
E = TypeVar("E", bound=BaseException)

if IN_SIMULATOR:
BASE_TIMEOUT = 5
BASE_TIMEOUT = 5.0
else:
BASE_TIMEOUT = 0.5

Expand Down
10 changes: 5 additions & 5 deletions sbot/servo_board.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,20 @@ def _get_simulator_boards(cls) -> MappingProxyType[str, ServoBoard]:
"""
boards = {}
# The filter here is the name of the emulated board in the simulator
for board in get_simulator_boards('ServoBoard'):
for board_info in get_simulator_boards('ServoBoard'):

# Create board identity from the info given
initial_identity = BoardIdentity(
manufacturer='sbot_simulator',
board_type=board.type_str,
asset_tag=board.serial_number,
board_type=board_info.type_str,
asset_tag=board_info.serial_number,
)

try:
board = cls(board.url, initial_identity)
board = cls(board_info.url, initial_identity)
except BoardDisconnectionError:
logger.warning(
f"Simulator specified servo board at port {board.url!r}, "
f"Simulator specified servo board at port {board_info.url!r}, "
"could not be identified. Ignoring this device")
continue
except IncorrectBoardError as err:
Expand Down
11 changes: 7 additions & 4 deletions sbot/simulator/camera.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import struct
from typing import cast

import cv2
import numpy as np
Expand All @@ -14,20 +15,22 @@ class WebotsRemoteCameraSource(FrameSource):
COLOURSPACE = cv2.COLOR_BGRA2GRAY

def __init__(self, camera_info: BoardInfo) -> None:
self.calibration = (0, 0, 0, 0)
self.calibration = (0.0, 0.0, 0.0, 0.0)
# Use pyserial to give a nicer interface for connecting to the camera socket
self._serial = serial_for_url(camera_info.url, baudrate=115200, timeout=1)

# Check the camera is connected
response = self._make_request("*IDN?")
if not response.split(b":")[1].lower().startswith(b"cam"):
raise RuntimeError(f"Camera not connected to a camera, returned: {response}")
raise RuntimeError(f"Camera not connected to a camera, returned: {response!r}")

# Get the calibration data for this camera
response = self._make_request("CAM:CALIBRATION?")

# The calibration data is returned as a string of floats separated by colons
self.calibration = tuple(map(float, response.split(b":")))
new_calibration = tuple(map(float, response.split(b":")))
assert len(new_calibration) == 4, f"Invalid calibration data: {new_calibration}"
self.calibration = cast(tuple[float, float, float, float], new_calibration)
assert len(self.calibration) == 4, f"Invalid calibration data: {self.calibration}"

# Get the image size for this camera
Expand Down Expand Up @@ -67,5 +70,5 @@ def _make_request(self, command: str) -> bytes:
self._serial.write(command.encode() + b"\n")
response = self._serial.readline()
if not response.endswith(b"\n") or response.startswith(b"NACK:"):
raise RuntimeError(f"Failed to communicate with camera, returned: {response}")
raise RuntimeError(f"Failed to communicate with camera, returned: {response!r}")
return response
4 changes: 2 additions & 2 deletions sbot/simulator/time_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ def identify(self) -> BoardIdentity:
response = self._serial.query('*IDN?')
return BoardIdentity(*response.split(':'))

def get_time(self):
def get_time(self) -> float:
time_str = self._serial.query('TIME?')
return datetime.fromisoformat(time_str).timestamp()

def sleep(self, duration: int) -> None:
def sleep(self, duration: float) -> None:
if duration < 0:
raise ValueError("sleep length must be non-negative")

Expand Down
2 changes: 2 additions & 0 deletions sbot/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class BoardIdentity(NamedTuple):
asset_tag: str = ""
sw_version: str = ""


class BoardInfo(NamedTuple):
"""
A container for the information about a board connection.
Expand All @@ -45,6 +46,7 @@ class BoardInfo(NamedTuple):
serial_number: str
type_str: str


class Board(ABC):
"""
This is the base class for all boards.
Expand Down

0 comments on commit d8ddbef

Please sign in to comment.