From bba7cd4751c4d9737d540ece85184682f747e004 Mon Sep 17 00:00:00 2001 From: WillB97 Date: Wed, 22 May 2024 19:36:38 +0100 Subject: [PATCH] Move simulator detection into utils, skip delay on connect in simulator --- sbot/arduino.py | 8 +++++--- sbot/camera.py | 7 ++++--- sbot/motor_board.py | 7 +++---- sbot/power_board.py | 5 ++--- sbot/robot.py | 4 +--- sbot/serial_wrapper.py | 12 ++++++------ sbot/servo_board.py | 7 +++---- sbot/utils.py | 4 +++- 8 files changed, 27 insertions(+), 27 deletions(-) diff --git a/sbot/arduino.py b/sbot/arduino.py index e207a4f..47ff7c5 100644 --- a/sbot/arduino.py +++ b/sbot/arduino.py @@ -1,7 +1,6 @@ """The Arduino module provides an interface to the Arduino firmware.""" from __future__ import annotations -import os import logging from enum import Enum, IntEnum from types import MappingProxyType @@ -11,7 +10,10 @@ from .exceptions import BoardDisconnectionError, IncorrectBoardError from .logging import log_to_debug from .serial_wrapper import SerialWrapper -from .utils import Board, BoardIdentity, get_simulator_boards, get_USB_identity, map_to_float +from .utils import ( + IN_SIMULATOR, Board, BoardIdentity, + get_simulator_boards, get_USB_identity, map_to_float, +) logger = logging.getLogger(__name__) BAUDRATE = 115200 @@ -141,7 +143,7 @@ def _get_supported_boards( defaults to None :return: A mapping of board serial numbers to Arduinos """ - if os.environ.get('WEBOTS_SIMULATOR') == '1': + if IN_SIMULATOR: return cls._get_simulator_boards() boards = {} diff --git a/sbot/camera.py b/sbot/camera.py index 378afff..5ca3bad 100644 --- a/sbot/camera.py +++ b/sbot/camera.py @@ -1,7 +1,6 @@ """An implementation of a camera board using the april_vision library.""" from __future__ import annotations -import os import logging from pathlib import Path from typing import Callable, Dict, Iterable, List, Optional, Union @@ -16,7 +15,9 @@ from numpy.typing import NDArray from .marker import Marker -from .utils import Board, BoardIdentity, BoardInfo, get_simulator_boards +from .utils import ( + IN_SIMULATOR, Board, BoardIdentity, BoardInfo, get_simulator_boards, +) LOGGER = logging.getLogger(__name__) @@ -57,7 +58,7 @@ def _discover(cls) -> Dict[str, 'AprilCamera']: :return: A dict of cameras, keyed by their name and index. """ - if os.environ.get('WEBOTS_SIMULATOR') == '1': + if IN_SIMULATOR: return { camera_info.serial_number: cls.from_webots_camera(camera_info) for camera_info in get_simulator_boards('CameraBoard') diff --git a/sbot/motor_board.py b/sbot/motor_board.py index 47de897..ca3f946 100644 --- a/sbot/motor_board.py +++ b/sbot/motor_board.py @@ -1,7 +1,6 @@ """The motor board module provides an interface to the motor board firmware over serial.""" from __future__ import annotations -import os import atexit import logging from enum import IntEnum @@ -14,8 +13,8 @@ from .logging import log_to_debug from .serial_wrapper import SerialWrapper from .utils import ( - Board, BoardIdentity, float_bounds_check, get_simulator_boards, - get_USB_identity, map_to_float, map_to_int, + IN_SIMULATOR, Board, BoardIdentity, float_bounds_check, + get_simulator_boards, get_USB_identity, map_to_float, map_to_int, ) logger = logging.getLogger(__name__) @@ -138,7 +137,7 @@ def _get_supported_boards( to connect to, defaults to None :return: A mapping of serial numbers to motor boards. """ - if os.environ.get('WEBOTS_SIMULATOR') == '1': + if IN_SIMULATOR: return cls._get_simulator_boards() boards = {} diff --git a/sbot/power_board.py b/sbot/power_board.py index 650968e..a8a020a 100644 --- a/sbot/power_board.py +++ b/sbot/power_board.py @@ -1,7 +1,6 @@ """The power board module provides an interface to the power board firmware over serial.""" from __future__ import annotations -import os import atexit import logging from enum import IntEnum @@ -14,7 +13,7 @@ from .logging import log_to_debug from .serial_wrapper import SerialWrapper from .utils import ( - Board, BoardIdentity, float_bounds_check, + IN_SIMULATOR, Board, BoardIdentity, float_bounds_check, get_simulator_boards, get_USB_identity, ) @@ -157,7 +156,7 @@ def _get_supported_boards( :param manual_boards: A list of manually specified serial ports to also attempt to connect to, defaults to None """ - if os.environ.get('WEBOTS_SIMULATOR') == '1': + if IN_SIMULATOR: return cls._get_simulator_boards() boards = {} diff --git a/sbot/robot.py b/sbot/robot.py index 8eb9b8c..f002ede 100644 --- a/sbot/robot.py +++ b/sbot/robot.py @@ -1,7 +1,6 @@ """The main entry point for the sbot package.""" from __future__ import annotations -import os import itertools import logging from time import sleep @@ -19,7 +18,7 @@ from .power_board import Note, PowerBoard from .servo_board import ServoBoard from .simulator.time_server import TimeServer -from .utils import ensure_atexit_on_term, obtain_lock, singular +from .utils import IN_SIMULATOR, ensure_atexit_on_term, obtain_lock, singular try: from .mqtt import MQTT_VALID, MQTTClient, get_mqtt_variables @@ -27,7 +26,6 @@ MQTT_VALID = False logger = logging.getLogger(__name__) -IN_SIMULATOR = os.environ.get('WEBOTS_SIMULATOR', '') == '1' class Robot: diff --git a/sbot/serial_wrapper.py b/sbot/serial_wrapper.py index a5a8a75..8fc275c 100644 --- a/sbot/serial_wrapper.py +++ b/sbot/serial_wrapper.py @@ -6,7 +6,6 @@ """ from __future__ import annotations -import os import logging import sys import threading @@ -18,7 +17,7 @@ from .exceptions import BoardDisconnectionError from .logging import TRACE -from .utils import BoardIdentity +from .utils import IN_SIMULATOR, BoardIdentity logger = logging.getLogger(__name__) @@ -31,7 +30,7 @@ E = TypeVar("E", bound=BaseException) -if os.environ.get('WEBOTS_SIMULATOR', '') == '1': +if IN_SIMULATOR: BASE_TIMEOUT = 5 else: BASE_TIMEOUT = 0.5 @@ -207,9 +206,10 @@ def _connect(self) -> bool: """ try: self.serial.open() - # Wait for the board to be ready to receive data - # Certain boards will reset when the serial port is opened - time.sleep(self.delay_after_connect) + if not IN_SIMULATOR: + # Wait for the board to be ready to receive data + # Certain boards will reset when the serial port is opened + time.sleep(self.delay_after_connect) except serial.SerialException: logger.error(( 'Failed to connect to board ' diff --git a/sbot/servo_board.py b/sbot/servo_board.py index d92c50a..dba483e 100644 --- a/sbot/servo_board.py +++ b/sbot/servo_board.py @@ -1,7 +1,6 @@ """The servo board module provides an interface to the servo board firmware over serial.""" from __future__ import annotations -import os import atexit import logging from types import MappingProxyType @@ -13,8 +12,8 @@ from .logging import log_to_debug from .serial_wrapper import SerialWrapper from .utils import ( - Board, BoardIdentity, float_bounds_check, get_simulator_boards, - get_USB_identity, map_to_float, map_to_int, + IN_SIMULATOR, Board, BoardIdentity, float_bounds_check, + get_simulator_boards, get_USB_identity, map_to_float, map_to_int, ) DUTY_MIN = 300 @@ -135,7 +134,7 @@ def _get_supported_boards( to connect to, defaults to None :return: A mapping of serial numbers to servo boards. """ - if os.environ.get('WEBOTS_SIMULATOR') == '1': + if IN_SIMULATOR: return cls._get_simulator_boards() boards = {} diff --git a/sbot/utils.py b/sbot/utils.py index a85c668..fb47c3a 100644 --- a/sbot/utils.py +++ b/sbot/utils.py @@ -1,8 +1,8 @@ """General utility functions and classes for the sbot package.""" from __future__ import annotations -import os import logging +import os import signal import socket from abc import ABC, abstractmethod @@ -14,6 +14,8 @@ T = TypeVar('T') logger = logging.getLogger(__name__) +IN_SIMULATOR = os.environ.get('WEBOTS_SIMULATOR', '') == '1' + class BoardIdentity(NamedTuple): """