diff --git a/examples/joystick.py b/examples/joystick.py new file mode 100755 index 0000000000..9a2e1c08be --- /dev/null +++ b/examples/joystick.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python3 +import time +import threading +import argparse +import numpy as np +from inputs import get_gamepad + +from kbhit import KBHit + +from opendbc.car.structs import CarControl +from opendbc.car.panda_runner import PandaRunner +from opendbc.car.can_definitions import CanData + +class Keyboard: + def __init__(self): + self.kb = KBHit() + self.axis_increment = 0.05 # 5% of full actuation each key press + self.axes_map = {'w': 'gb', 's': 'gb', + 'a': 'steer', 'd': 'steer'} + self.axes_values = {'gb': 0., 'steer': 0.} + self.axes_order = ['gb', 'steer'] + self.cancel = False + + def update(self): + key = self.kb.getch().lower() + print(key) + self.cancel = False + if key == 'r': + self.axes_values = {ax: 0. for ax in self.axes_values} + elif key == 'c': + self.cancel = True + elif key in self.axes_map: + axis = self.axes_map[key] + incr = self.axis_increment if key in ['w', 'a'] else -self.axis_increment + self.axes_values[axis] = float(np.clip(self.axes_values[axis] + incr, -1, 1)) + else: + return False + return True + +class Joystick: + def __init__(self, gamepad=False): + # TODO: find a way to get this from API, perhaps "inputs" doesn't support it + if gamepad: + self.cancel_button = 'BTN_NORTH' # (BTN_NORTH=X, ABS_RZ=Right Trigger) + accel_axis = 'ABS_Y' + steer_axis = 'ABS_RX' + else: + self.cancel_button = 'BTN_TRIGGER' + accel_axis = 'ABS_Y' + steer_axis = 'ABS_RX' + self.min_axis_value = {accel_axis: 0., steer_axis: 0.} + self.max_axis_value = {accel_axis: 255., steer_axis: 255.} + self.axes_values = {accel_axis: 0., steer_axis: 0.} + self.axes_order = [accel_axis, steer_axis] + self.cancel = False + + def update(self): + joystick_event = get_gamepad()[0] + event = (joystick_event.code, joystick_event.state) + if event[0] == self.cancel_button: + if event[1] == 1: + self.cancel = True + elif event[1] == 0: # state 0 is falling edge + self.cancel = False + elif event[0] in self.axes_values: + self.max_axis_value[event[0]] = max(event[1], self.max_axis_value[event[0]]) + self.min_axis_value[event[0]] = min(event[1], self.min_axis_value[event[0]]) + + norm = -float(np.interp(event[1], [self.min_axis_value[event[0]], self.max_axis_value[event[0]]], [-1., 1.])) + self.axes_values[event[0]] = norm if abs(norm) > 0.05 else 0. # center can be noisy, deadzone of 5% + else: + return False + return True + +def joystick_thread(joystick): + while True: + joystick.update() + +def main(joystick): + threading.Thread(target=joystick_thread, args=(joystick,), daemon=True).start() + with PandaRunner() as (p, CI): + CC = CarControl(enabled=False) + while True: + cd = [CanData(addr, dat, bus) for addr, dat, bus in p.can_recv()] + CI.update([0, cd]) + + CC.actuators.accel = float(4.0*np.clip(joystick.axes_values['gb'], -1, 1)) + CC.actuators.steer = float(np.clip(joystick.axes_values['steer'], -1, 1)) + + from pprint import pprint + pprint(CC) + + _, can_sends = CI.apply(CC) + p.can_send_many(can_sends, timeout=1000) + + # 100Hz + time.sleep(0.01) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='Test the car interface with a joystick. Uses keyboard by default.', + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + + parser.add_argument('--mode', choices=['keyboard', 'gamepad', 'joystick'], default='keyboard') + args = parser.parse_args() + + print() + joystick: Keyboard | Joystick + if args.mode == 'keyboard': + print('Gas/brake control: `W` and `S` keys') + print('Steering control: `A` and `D` keys') + print('Buttons') + print('- `R`: Resets axes') + print('- `C`: Cancel cruise control') + joystick = Keyboard() + else: + joystick = Joystick(gamepad=(args.mode == 'gamepad')) + main(joystick) \ No newline at end of file diff --git a/examples/kbhit.py b/examples/kbhit.py new file mode 100755 index 0000000000..35f67b4771 --- /dev/null +++ b/examples/kbhit.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python3 +import sys +import termios +import atexit +from select import select + +STDIN_FD = sys.stdin.fileno() + +class KBHit: + def __init__(self) -> None: + self.set_kbhit_terminal() + + def set_kbhit_terminal(self) -> None: + # Save the terminal settings + self.old_term = termios.tcgetattr(STDIN_FD) + self.new_term = self.old_term.copy() + + # New terminal setting unbuffered + self.new_term[3] &= ~(termios.ICANON | termios.ECHO) + termios.tcsetattr(STDIN_FD, termios.TCSAFLUSH, self.new_term) + + # Support normal-terminal reset at exit + atexit.register(self.set_normal_term) + + def set_normal_term(self) -> None: + termios.tcsetattr(STDIN_FD, termios.TCSAFLUSH, self.old_term) + + @staticmethod + def getch() -> str: + return sys.stdin.read(1) + + @staticmethod + def getarrow() -> int: + c = sys.stdin.read(3)[2] + vals = [65, 67, 66, 68] + return vals.index(ord(c)) + + @staticmethod + def kbhit(): + ''' Returns True if keyboard character was hit, False otherwise. + ''' + return select([sys.stdin], [], [], 0)[0] != [] + + +if __name__ == "__main__": + + kb = KBHit() + + print('Hit any key, or ESC to exit') + + while True: + + if kb.kbhit(): + c = kb.getch() + if c == '\x1b': # ESC + break + print(c) + + kb.set_normal_term() diff --git a/opendbc/car/panda_runner.py b/opendbc/car/panda_runner.py new file mode 100644 index 0000000000..02fd3ff224 --- /dev/null +++ b/opendbc/car/panda_runner.py @@ -0,0 +1,35 @@ +from contextlib import contextmanager + +from panda import Panda +from opendbc.car.car_helpers import get_car +from opendbc.car.can_definitions import CanData + +@contextmanager +def PandaRunner(): + p = Panda() + + def _can_recv(wait_for_one: bool = False) -> list[list[CanData]]: + recv = p.can_recv() + while len(recv) == 0 and wait_for_one: + recv = p.can_recv() + return [[CanData(addr, dat, bus) for addr, dat, bus in recv], ] + + try: + # setup + fingerprinting + p.set_safety_mode(Panda.SAFETY_ELM327, 1) + CI = get_car(_can_recv, p.can_send_many, p.set_obd, True) + print("fingerprinted", CI.CP.carName) + assert CI.CP.carFingerprint != "mock", "Unable to identify car. Check connections and ensure car is supported." + + p.set_safety_mode(Panda.SAFETY_ELM327, 1) + CI.init(CI.CP, _can_recv, p.can_send_many) + p.set_safety_mode(Panda.SAFETY_TOYOTA, CI.CP.safetyConfigs[0].safetyParam) + + yield p, CI + finally: + p.set_safety_mode(Panda.SAFETY_NOOUTPUT) + + +if __name__ == "__main__": + with PandaRunner() as (p, CI): + print(p.can_recv()) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 32cceb4826..494635e729 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,9 @@ docs = [ "Jinja2", "natsort", ] +examples = [ + "inputs", +] [tool.pytest.ini_options] addopts = "--ignore=panda/ -Werror --strict-config --strict-markers --durations=10 -n auto"