diff --git a/sbot/arduino.py b/sbot/arduino.py index 47ff7c5..faac772 100644 --- a/sbot/arduino.py +++ b/sbot/arduino.py @@ -108,20 +108,20 @@ def _get_simulator_boards(cls) -> MappingProxyType[str, Arduino]: """ boards = {} # The filter here is the name of the emulated board in the simulator - for board in get_simulator_boards('Arduino'): + for board_info in get_simulator_boards('Arduino'): # Create board identity from the info given initial_identity = BoardIdentity( manufacturer='sbot_simulator', - board_type=board.type_str, - asset_tag=board.serial_number, + board_type=board_info.type_str, + asset_tag=board_info.serial_number, ) try: - board = cls(board.url, initial_identity) + board = cls(board_info.url, initial_identity) except BoardDisconnectionError: logger.warning( - f"Simulator specified arduino at port {board.url!r}, " + f"Simulator specified arduino at port {board_info.url!r}, " "could not be identified. Ignoring this device") continue except IncorrectBoardError as err: diff --git a/sbot/camera.py b/sbot/camera.py index 5ca3bad..52e3387 100644 --- a/sbot/camera.py +++ b/sbot/camera.py @@ -92,7 +92,8 @@ def from_webots_camera(cls, camera_info: BoardInfo) -> 'AprilCamera': """ Create a camera from a webots camera. - :param camera_info: The information about the virtual camera, including the url to connect to. + :param camera_info: The information about the virtual camera, + including the url to connect to. :return: The camera object. """ from .simulator.camera import WebotsRemoteCameraSource @@ -106,7 +107,12 @@ def from_webots_camera(cls, camera_info: BoardInfo) -> 'AprilCamera': ) @classmethod - def from_id(cls, camera_id: int, camera_data: CalibratedCamera, serial_num: str) -> 'AprilCamera': + def from_id( + cls, + camera_id: int, + camera_data: CalibratedCamera, + serial_num: str, + ) -> 'AprilCamera': """ Create a camera from an ID. diff --git a/sbot/motor_board.py b/sbot/motor_board.py index ca3f946..991c6b9 100644 --- a/sbot/motor_board.py +++ b/sbot/motor_board.py @@ -100,20 +100,20 @@ def _get_simulator_boards(cls) -> MappingProxyType[str, MotorBoard]: """ boards = {} # The filter here is the name of the emulated board in the simulator - for board in get_simulator_boards('MotorBoard'): + for board_info in get_simulator_boards('MotorBoard'): # Create board identity from the info given initial_identity = BoardIdentity( manufacturer='sbot_simulator', - board_type=board.type_str, - asset_tag=board.serial_number, + board_type=board_info.type_str, + asset_tag=board_info.serial_number, ) try: - board = cls(board.url, initial_identity) + board = cls(board_info.url, initial_identity) except BoardDisconnectionError: logger.warning( - f"Simulator specified motor board at port {board.url!r}, " + f"Simulator specified motor board at port {board_info.url!r}, " "could not be identified. Ignoring this device") continue except IncorrectBoardError as err: diff --git a/sbot/power_board.py b/sbot/power_board.py index a8a020a..733212b 100644 --- a/sbot/power_board.py +++ b/sbot/power_board.py @@ -120,20 +120,20 @@ def _get_simulator_boards(cls) -> MappingProxyType[str, PowerBoard]: """ boards = {} # The filter here is the name of the emulated board in the simulator - for board in get_simulator_boards('PowerBoard'): + for board_info in get_simulator_boards('PowerBoard'): # Create board identity from the info given initial_identity = BoardIdentity( manufacturer='sbot_simulator', - board_type=board.type_str, - asset_tag=board.serial_number, + board_type=board_info.type_str, + asset_tag=board_info.serial_number, ) try: - board = cls(board.url, initial_identity) + board = cls(board_info.url, initial_identity) except BoardDisconnectionError: logger.warning( - f"Simulator specified power board at port {board.url!r}, " + f"Simulator specified power board at port {board_info.url!r}, " "could not be identified. Ignoring this device") continue except IncorrectBoardError as err: diff --git a/sbot/robot.py b/sbot/robot.py index 22e8ec6..56949a2 100644 --- a/sbot/robot.py +++ b/sbot/robot.py @@ -3,6 +3,7 @@ import itertools import logging +from socket import socket from time import sleep from types import MappingProxyType from typing import Mapping @@ -55,10 +56,13 @@ def __init__( trace_logging: bool = False, manual_boards: dict[str, list[str]] | None = None, ) -> None: + self._lock: TimeServer | socket | None if IN_SIMULATOR: self._lock = TimeServer.initialise() if self._lock is None: - raise OSError('Unable to obtain lock, Is another robot instance already running?') + raise OSError( + 'Unable to obtain lock, Is another robot instance already running?' + ) else: self._lock = obtain_lock() self._metadata: Metadata | None = None diff --git a/sbot/serial_wrapper.py b/sbot/serial_wrapper.py index 8fc275c..b6ae3fe 100644 --- a/sbot/serial_wrapper.py +++ b/sbot/serial_wrapper.py @@ -31,7 +31,7 @@ E = TypeVar("E", bound=BaseException) if IN_SIMULATOR: - BASE_TIMEOUT = 5 + BASE_TIMEOUT = 5.0 else: BASE_TIMEOUT = 0.5 diff --git a/sbot/servo_board.py b/sbot/servo_board.py index dba483e..0b30029 100644 --- a/sbot/servo_board.py +++ b/sbot/servo_board.py @@ -97,20 +97,20 @@ def _get_simulator_boards(cls) -> MappingProxyType[str, ServoBoard]: """ boards = {} # The filter here is the name of the emulated board in the simulator - for board in get_simulator_boards('ServoBoard'): + for board_info in get_simulator_boards('ServoBoard'): # Create board identity from the info given initial_identity = BoardIdentity( manufacturer='sbot_simulator', - board_type=board.type_str, - asset_tag=board.serial_number, + board_type=board_info.type_str, + asset_tag=board_info.serial_number, ) try: - board = cls(board.url, initial_identity) + board = cls(board_info.url, initial_identity) except BoardDisconnectionError: logger.warning( - f"Simulator specified servo board at port {board.url!r}, " + f"Simulator specified servo board at port {board_info.url!r}, " "could not be identified. Ignoring this device") continue except IncorrectBoardError as err: diff --git a/sbot/simulator/camera.py b/sbot/simulator/camera.py index c361ef8..f9d2195 100644 --- a/sbot/simulator/camera.py +++ b/sbot/simulator/camera.py @@ -1,4 +1,5 @@ import struct +from typing import cast import cv2 import numpy as np @@ -14,20 +15,22 @@ class WebotsRemoteCameraSource(FrameSource): COLOURSPACE = cv2.COLOR_BGRA2GRAY def __init__(self, camera_info: BoardInfo) -> None: - self.calibration = (0, 0, 0, 0) + self.calibration = (0.0, 0.0, 0.0, 0.0) # Use pyserial to give a nicer interface for connecting to the camera socket self._serial = serial_for_url(camera_info.url, baudrate=115200, timeout=1) # Check the camera is connected response = self._make_request("*IDN?") if not response.split(b":")[1].lower().startswith(b"cam"): - raise RuntimeError(f"Camera not connected to a camera, returned: {response}") + raise RuntimeError(f"Camera not connected to a camera, returned: {response!r}") # Get the calibration data for this camera response = self._make_request("CAM:CALIBRATION?") # The calibration data is returned as a string of floats separated by colons - self.calibration = tuple(map(float, response.split(b":"))) + new_calibration = tuple(map(float, response.split(b":"))) + assert len(new_calibration) == 4, f"Invalid calibration data: {new_calibration}" + self.calibration = cast(tuple[float, float, float, float], new_calibration) assert len(self.calibration) == 4, f"Invalid calibration data: {self.calibration}" # Get the image size for this camera @@ -67,5 +70,5 @@ def _make_request(self, command: str) -> bytes: self._serial.write(command.encode() + b"\n") response = self._serial.readline() if not response.endswith(b"\n") or response.startswith(b"NACK:"): - raise RuntimeError(f"Failed to communicate with camera, returned: {response}") + raise RuntimeError(f"Failed to communicate with camera, returned: {response!r}") return response diff --git a/sbot/simulator/time_server.py b/sbot/simulator/time_server.py index 6e3bcd8..410dcd7 100644 --- a/sbot/simulator/time_server.py +++ b/sbot/simulator/time_server.py @@ -83,11 +83,11 @@ def identify(self) -> BoardIdentity: response = self._serial.query('*IDN?') return BoardIdentity(*response.split(':')) - def get_time(self): + def get_time(self) -> float: time_str = self._serial.query('TIME?') return datetime.fromisoformat(time_str).timestamp() - def sleep(self, duration: int) -> None: + def sleep(self, duration: float) -> None: if duration < 0: raise ValueError("sleep length must be non-negative") diff --git a/sbot/utils.py b/sbot/utils.py index fb47c3a..05264a5 100644 --- a/sbot/utils.py +++ b/sbot/utils.py @@ -35,6 +35,7 @@ class BoardIdentity(NamedTuple): asset_tag: str = "" sw_version: str = "" + class BoardInfo(NamedTuple): """ A container for the information about a board connection. @@ -45,6 +46,7 @@ class BoardInfo(NamedTuple): serial_number: str type_str: str + class Board(ABC): """ This is the base class for all boards.