diff --git a/sbot/mqtt.py b/sbot/mqtt.py index 671924b..cd57781 100644 --- a/sbot/mqtt.py +++ b/sbot/mqtt.py @@ -5,6 +5,7 @@ import logging import os import time +from threading import Event from typing import Any, Callable, TypedDict from urllib.parse import urlparse @@ -214,3 +215,37 @@ def get_mqtt_variables() -> MQTTVariables: username=url_parts.username, password=url_parts.password, ) + + +class RemoteStartButton: + def __init__(self, mqtt_client: MQTTClient) -> None: + self._mqtt_client = mqtt_client + self._start_pressed = Event() + + self._mqtt_client.subscribe('start_button', self._process_start_message) + + def _process_start_message( + self, + client: mqtt.Client, + userdata: Any, + message: mqtt.MQTTMessage, + ) -> None: + try: + payload = json.loads(message.payload) + except json.JSONDecodeError: + LOGGER.warning("Failed to decode start button message.") + return + else: + if 'pressed' in payload.keys(): + if payload['pressed']: + self._start_pressed.set() + LOGGER.debug("Start button pressed.") + else: + self._start_pressed.clear() + LOGGER.debug("Start button cleared.") + + def get_start_button_pressed(self) -> bool: + """Get the start button pressed status.""" + pressed = self._start_pressed.is_set() + self._start_pressed.clear() + return pressed diff --git a/sbot/robot.py b/sbot/robot.py index 800203c..86db533 100644 --- a/sbot/robot.py +++ b/sbot/robot.py @@ -20,7 +20,7 @@ from .utils import ensure_atexit_on_term, obtain_lock, singular try: - from .mqtt import MQTT_VALID, MQTTClient, get_mqtt_variables + from .mqtt import MQTT_VALID, MQTTClient, RemoteStartButton, get_mqtt_variables except ImportError: MQTT_VALID = False @@ -43,7 +43,7 @@ class Robot: """ __slots__ = ( '_lock', '_metadata', '_power_board', '_motor_boards', '_servo_boards', - '_arduinos', '_cameras', '_mqttc', + '_arduinos', '_cameras', '_mqttc', '_start_button', ) def __init__( @@ -62,6 +62,12 @@ def __init__( logger.info(f"SourceBots API v{__version__}") + if MQTT_VALID: + # get the config from env vars + mqtt_config = get_mqtt_variables() + self._mqttc = MQTTClient.establish(**mqtt_config) + self._start_button = RemoteStartButton(self._mqttc) + if manual_boards: self._init_power_board(manual_boards.get(PowerBoard.get_board_type(), [])) self._init_aux_boards(manual_boards) @@ -117,9 +123,6 @@ def _init_camera(self) -> None: markers in its field of view. """ if MQTT_VALID: - # get the config from env vars - mqtt_config = get_mqtt_variables() - self._mqttc = MQTTClient.establish(**mqtt_config) self._cameras = MappingProxyType(_setup_cameras( game_specific.MARKER_SIZES, self._mqttc.wrapped_publish, @@ -299,14 +302,22 @@ def wait_start(self) -> None: Once the start button is pressed, the metadata will be loaded and the timeout will start if in competition mode. """ + if MQTT_VALID: + remote_start_pressed = self._start_button.get_start_button_pressed + else: + def null_button_pressed() -> bool: + return False + remote_start_pressed = null_button_pressed + # ignore previous button presses _ = self.power_board._start_button() + _ = remote_start_pressed() logger.info('Waiting for start button.') self.power_board.piezo.buzz(Note.A6, 0.1) self.power_board._run_led.flash() - while not self.power_board._start_button(): + while not self.power_board._start_button() and not remote_start_pressed(): sleep(0.1) logger.info("Start button pressed.") self.power_board._run_led.on()