forked from sunnypilot/opendbc
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
simple joystick example (commaai#1200)
* setup runner * port over joystickd * fix mypy * set accel and steer
- Loading branch information
1 parent
a1b95d7
commit cbad7f0
Showing
4 changed files
with
215 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
#!/usr/bin/env python3 | ||
import time | ||
import threading | ||
import argparse | ||
import numpy as np | ||
from inputs import get_gamepad | ||
|
||
from kbhit import KBHit | ||
|
||
from opendbc.car.structs import CarControl | ||
from opendbc.car.panda_runner import PandaRunner | ||
from opendbc.car.can_definitions import CanData | ||
|
||
class Keyboard: | ||
def __init__(self): | ||
self.kb = KBHit() | ||
self.axis_increment = 0.05 # 5% of full actuation each key press | ||
self.axes_map = {'w': 'gb', 's': 'gb', | ||
'a': 'steer', 'd': 'steer'} | ||
self.axes_values = {'gb': 0., 'steer': 0.} | ||
self.axes_order = ['gb', 'steer'] | ||
self.cancel = False | ||
|
||
def update(self): | ||
key = self.kb.getch().lower() | ||
print(key) | ||
self.cancel = False | ||
if key == 'r': | ||
self.axes_values = {ax: 0. for ax in self.axes_values} | ||
elif key == 'c': | ||
self.cancel = True | ||
elif key in self.axes_map: | ||
axis = self.axes_map[key] | ||
incr = self.axis_increment if key in ['w', 'a'] else -self.axis_increment | ||
self.axes_values[axis] = float(np.clip(self.axes_values[axis] + incr, -1, 1)) | ||
else: | ||
return False | ||
return True | ||
|
||
class Joystick: | ||
def __init__(self, gamepad=False): | ||
# TODO: find a way to get this from API, perhaps "inputs" doesn't support it | ||
if gamepad: | ||
self.cancel_button = 'BTN_NORTH' # (BTN_NORTH=X, ABS_RZ=Right Trigger) | ||
accel_axis = 'ABS_Y' | ||
steer_axis = 'ABS_RX' | ||
else: | ||
self.cancel_button = 'BTN_TRIGGER' | ||
accel_axis = 'ABS_Y' | ||
steer_axis = 'ABS_RX' | ||
self.min_axis_value = {accel_axis: 0., steer_axis: 0.} | ||
self.max_axis_value = {accel_axis: 255., steer_axis: 255.} | ||
self.axes_values = {accel_axis: 0., steer_axis: 0.} | ||
self.axes_order = [accel_axis, steer_axis] | ||
self.cancel = False | ||
|
||
def update(self): | ||
joystick_event = get_gamepad()[0] | ||
event = (joystick_event.code, joystick_event.state) | ||
if event[0] == self.cancel_button: | ||
if event[1] == 1: | ||
self.cancel = True | ||
elif event[1] == 0: # state 0 is falling edge | ||
self.cancel = False | ||
elif event[0] in self.axes_values: | ||
self.max_axis_value[event[0]] = max(event[1], self.max_axis_value[event[0]]) | ||
self.min_axis_value[event[0]] = min(event[1], self.min_axis_value[event[0]]) | ||
|
||
norm = -float(np.interp(event[1], [self.min_axis_value[event[0]], self.max_axis_value[event[0]]], [-1., 1.])) | ||
self.axes_values[event[0]] = norm if abs(norm) > 0.05 else 0. # center can be noisy, deadzone of 5% | ||
else: | ||
return False | ||
return True | ||
|
||
def joystick_thread(joystick): | ||
while True: | ||
joystick.update() | ||
|
||
def main(joystick): | ||
threading.Thread(target=joystick_thread, args=(joystick,), daemon=True).start() | ||
with PandaRunner() as (p, CI): | ||
CC = CarControl(enabled=False) | ||
while True: | ||
cd = [CanData(addr, dat, bus) for addr, dat, bus in p.can_recv()] | ||
CI.update([0, cd]) | ||
|
||
CC.actuators.accel = float(4.0*np.clip(joystick.axes_values['gb'], -1, 1)) | ||
CC.actuators.steer = float(np.clip(joystick.axes_values['steer'], -1, 1)) | ||
|
||
from pprint import pprint | ||
pprint(CC) | ||
|
||
_, can_sends = CI.apply(CC) | ||
p.can_send_many(can_sends, timeout=1000) | ||
|
||
# 100Hz | ||
time.sleep(0.01) | ||
|
||
|
||
if __name__ == '__main__': | ||
parser = argparse.ArgumentParser(description='Test the car interface with a joystick. Uses keyboard by default.', | ||
formatter_class=argparse.ArgumentDefaultsHelpFormatter) | ||
|
||
parser.add_argument('--mode', choices=['keyboard', 'gamepad', 'joystick'], default='keyboard') | ||
args = parser.parse_args() | ||
|
||
print() | ||
joystick: Keyboard | Joystick | ||
if args.mode == 'keyboard': | ||
print('Gas/brake control: `W` and `S` keys') | ||
print('Steering control: `A` and `D` keys') | ||
print('Buttons') | ||
print('- `R`: Resets axes') | ||
print('- `C`: Cancel cruise control') | ||
joystick = Keyboard() | ||
else: | ||
joystick = Joystick(gamepad=(args.mode == 'gamepad')) | ||
main(joystick) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
#!/usr/bin/env python3 | ||
import sys | ||
import termios | ||
import atexit | ||
from select import select | ||
|
||
STDIN_FD = sys.stdin.fileno() | ||
|
||
class KBHit: | ||
def __init__(self) -> None: | ||
self.set_kbhit_terminal() | ||
|
||
def set_kbhit_terminal(self) -> None: | ||
# Save the terminal settings | ||
self.old_term = termios.tcgetattr(STDIN_FD) | ||
self.new_term = self.old_term.copy() | ||
|
||
# New terminal setting unbuffered | ||
self.new_term[3] &= ~(termios.ICANON | termios.ECHO) | ||
termios.tcsetattr(STDIN_FD, termios.TCSAFLUSH, self.new_term) | ||
|
||
# Support normal-terminal reset at exit | ||
atexit.register(self.set_normal_term) | ||
|
||
def set_normal_term(self) -> None: | ||
termios.tcsetattr(STDIN_FD, termios.TCSAFLUSH, self.old_term) | ||
|
||
@staticmethod | ||
def getch() -> str: | ||
return sys.stdin.read(1) | ||
|
||
@staticmethod | ||
def getarrow() -> int: | ||
c = sys.stdin.read(3)[2] | ||
vals = [65, 67, 66, 68] | ||
return vals.index(ord(c)) | ||
|
||
@staticmethod | ||
def kbhit(): | ||
''' Returns True if keyboard character was hit, False otherwise. | ||
''' | ||
return select([sys.stdin], [], [], 0)[0] != [] | ||
|
||
|
||
if __name__ == "__main__": | ||
|
||
kb = KBHit() | ||
|
||
print('Hit any key, or ESC to exit') | ||
|
||
while True: | ||
|
||
if kb.kbhit(): | ||
c = kb.getch() | ||
if c == '\x1b': # ESC | ||
break | ||
print(c) | ||
|
||
kb.set_normal_term() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
from contextlib import contextmanager | ||
|
||
from panda import Panda | ||
from opendbc.car.car_helpers import get_car | ||
from opendbc.car.can_definitions import CanData | ||
|
||
@contextmanager | ||
def PandaRunner(): | ||
p = Panda() | ||
|
||
def _can_recv(wait_for_one: bool = False) -> list[list[CanData]]: | ||
recv = p.can_recv() | ||
while len(recv) == 0 and wait_for_one: | ||
recv = p.can_recv() | ||
return [[CanData(addr, dat, bus) for addr, dat, bus in recv], ] | ||
|
||
try: | ||
# setup + fingerprinting | ||
p.set_safety_mode(Panda.SAFETY_ELM327, 1) | ||
CI = get_car(_can_recv, p.can_send_many, p.set_obd, True) | ||
print("fingerprinted", CI.CP.carName) | ||
assert CI.CP.carFingerprint != "mock", "Unable to identify car. Check connections and ensure car is supported." | ||
|
||
p.set_safety_mode(Panda.SAFETY_ELM327, 1) | ||
CI.init(CI.CP, _can_recv, p.can_send_many) | ||
p.set_safety_mode(Panda.SAFETY_TOYOTA, CI.CP.safetyConfigs[0].safetyParam) | ||
|
||
yield p, CI | ||
finally: | ||
p.set_safety_mode(Panda.SAFETY_NOOUTPUT) | ||
|
||
|
||
if __name__ == "__main__": | ||
with PandaRunner() as (p, CI): | ||
print(p.can_recv()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters