Skip to content

Commit

Permalink
Add support for remote start button
Browse files Browse the repository at this point in the history
  • Loading branch information
WillB97 committed Jul 11, 2024
1 parent 12ce826 commit 3b48527
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 6 deletions.
35 changes: 35 additions & 0 deletions sbot/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import os
import time
from threading import Event
from typing import Any, Callable, TypedDict
from urllib.parse import urlparse

Expand Down Expand Up @@ -214,3 +215,37 @@ def get_mqtt_variables() -> MQTTVariables:
username=url_parts.username,
password=url_parts.password,
)


class RemoteStartButton:
def __init__(self, mqtt_client: MQTTClient) -> None:
self._mqtt_client = mqtt_client
self._start_pressed = Event()

self._mqtt_client.subscribe('start_button', self._process_start_message)

def _process_start_message(
self,
client: mqtt.Client,
userdata: Any,
message: mqtt.MQTTMessage,
) -> None:
try:
payload = json.loads(message.payload)
except json.JSONDecodeError:
LOGGER.warning("Failed to decode start button message.")
return
else:
if 'pressed' in payload.keys():
if payload['pressed']:
self._start_pressed.set()
LOGGER.debug("Start button pressed.")
else:
self._start_pressed.clear()
LOGGER.debug("Start button cleared.")

def get_start_button_pressed(self) -> bool:
"""Get the start button pressed status."""
pressed = self._start_pressed.is_set()
self._start_pressed.clear()
return pressed
23 changes: 17 additions & 6 deletions sbot/robot.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from .utils import ensure_atexit_on_term, obtain_lock, singular

try:
from .mqtt import MQTT_VALID, MQTTClient, get_mqtt_variables
from .mqtt import MQTT_VALID, MQTTClient, RemoteStartButton, get_mqtt_variables
except ImportError:
MQTT_VALID = False

Expand All @@ -43,7 +43,7 @@ class Robot:
"""
__slots__ = (
'_lock', '_metadata', '_power_board', '_motor_boards', '_servo_boards',
'_arduinos', '_cameras', '_mqttc',
'_arduinos', '_cameras', '_mqttc', '_start_button',
)

def __init__(
Expand All @@ -62,6 +62,12 @@ def __init__(

logger.info(f"SourceBots API v{__version__}")

if MQTT_VALID:
# get the config from env vars
mqtt_config = get_mqtt_variables()
self._mqttc = MQTTClient.establish(**mqtt_config)
self._start_button = RemoteStartButton(self._mqttc)

if manual_boards:
self._init_power_board(manual_boards.get(PowerBoard.get_board_type(), []))
self._init_aux_boards(manual_boards)
Expand Down Expand Up @@ -117,9 +123,6 @@ def _init_camera(self) -> None:
markers in its field of view.
"""
if MQTT_VALID:
# get the config from env vars
mqtt_config = get_mqtt_variables()
self._mqttc = MQTTClient.establish(**mqtt_config)
self._cameras = MappingProxyType(_setup_cameras(
game_specific.MARKER_SIZES,
self._mqttc.wrapped_publish,
Expand Down Expand Up @@ -299,14 +302,22 @@ def wait_start(self) -> None:
Once the start button is pressed, the metadata will be loaded and the timeout
will start if in competition mode.
"""
if MQTT_VALID:
remote_start_pressed = self._start_button.get_start_button_pressed
else:
def null_button_pressed() -> bool:
return False
remote_start_pressed = null_button_pressed

# ignore previous button presses
_ = self.power_board._start_button()
_ = remote_start_pressed()
logger.info('Waiting for start button.')

self.power_board.piezo.buzz(Note.A6, 0.1)
self.power_board._run_led.flash()

while not self.power_board._start_button():
while not self.power_board._start_button() and not remote_start_pressed():
sleep(0.1)
logger.info("Start button pressed.")
self.power_board._run_led.on()
Expand Down

0 comments on commit 3b48527

Please sign in to comment.