-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* long profiles * start with creep * lil cleanup * corolla updates * cleanup * 2s * plot is a little nicer * strict mode * cleanup * unused * fix that --------- Co-authored-by: Comma Device <device@comma.ai>
- Loading branch information
1 parent
cbad7f0
commit f2fa755
Showing
4 changed files
with
203 additions
and
23 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ | |
.sconsign.dblite | ||
.hypothesis | ||
*.egg-info/ | ||
*.html | ||
uv.lock | ||
|
||
opendbc/can/*.so | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
#!/usr/bin/env python3 | ||
import io | ||
import time | ||
import base64 | ||
import argparse | ||
import numpy as np | ||
import matplotlib.pyplot as plt | ||
from collections import defaultdict | ||
from dataclasses import dataclass, asdict | ||
from pathlib import Path | ||
|
||
from opendbc.car.structs import CarControl | ||
from opendbc.car.panda_runner import PandaRunner | ||
|
||
DT = 0.01 # step time (s) | ||
|
||
@dataclass | ||
class Action: | ||
accel: float # m/s^2 | ||
duration: float # seconds | ||
longControlState: CarControl.Actuators.LongControlState = CarControl.Actuators.LongControlState.pid | ||
|
||
def get_msgs(self): | ||
return [ | ||
(t, CarControl( | ||
enabled=True, | ||
longActive=True, | ||
actuators=CarControl.Actuators( | ||
accel=self.accel, | ||
longControlState=self.longControlState, | ||
), | ||
)) | ||
for t in np.linspace(0, self.duration, int(self.duration/DT)) | ||
] | ||
|
||
@dataclass | ||
class Maneuver: | ||
description: str | ||
actions: list[Action] | ||
|
||
def get_msgs(self): | ||
t0 = 0 | ||
for action in self.actions: | ||
for lt, msg in action.get_msgs(): | ||
yield lt + t0, msg | ||
t0 += lt | ||
|
||
MANEUVERS = [ | ||
Maneuver( | ||
"creeping: alternate between +1m/ss and -1m/ss", | ||
[ | ||
Action(1, 2), Action(-1, 2), | ||
Action(1, 2), Action(-1, 2), | ||
Action(1, 2), Action(-1, 2), | ||
], | ||
), | ||
] | ||
|
||
def main(args): | ||
with PandaRunner() as p: | ||
print("\n\n") | ||
|
||
logs = {} | ||
for i, m in enumerate(MANEUVERS): | ||
print(f"Running {i+1}/{len(MANEUVERS)} '{m.description}'") | ||
|
||
print("- setting up") | ||
good_cnt = 0 | ||
for _ in range(int(30./DT)): | ||
cs = p.read(strict=False) | ||
|
||
cc = CarControl( | ||
enabled=True, | ||
longActive=True, | ||
actuators=CarControl.Actuators(accel=-1.5, longControlState=CarControl.Actuators.LongControlState.stopping), | ||
) | ||
p.write(cc) | ||
|
||
good_cnt = (good_cnt+1) if cs.vEgo < 0.1 and cs.cruiseState.enabled and not cs.cruiseState.standstill else 0 | ||
if good_cnt > (2./DT): | ||
break | ||
time.sleep(DT) | ||
else: | ||
print("ERROR: failed to setup") | ||
continue | ||
|
||
print("- executing maneuver") | ||
logs[m.description] = defaultdict(list) | ||
for t, cc in m.get_msgs(): | ||
cs = p.read() | ||
p.write(cc) | ||
|
||
logs[m.description]["t"].append(t) | ||
to_log = {"carControl": cc, "carState": cs, "carControl.actuators": cc.actuators, | ||
"carControl.cruiseControl": cc.cruiseControl, "carState.cruiseState": cs.cruiseState} | ||
for k, v in to_log.items(): | ||
for k2, v2 in asdict(v).items(): | ||
logs[m.description][f"{k}.{k2}"].append(v2) | ||
|
||
time.sleep(DT) | ||
|
||
# ***** write out report ***** | ||
|
||
output_path = Path(__file__).resolve().parent / "longitudinal_reports" | ||
output_fn = args.output or output_path / f"{p.CI.CP.carFingerprint}_{time.strftime('%Y%m%d-%H_%M_%S')}.html" | ||
output_path.mkdir(exist_ok=True) | ||
with open(output_fn, "w") as f: | ||
f.write("<h1>Longitudinal maneuver report</h1>\n") | ||
f.write(f"<h3>{p.CI.CP.carFingerprint}</h3>\n") | ||
if args.desc: | ||
f.write(f"<h3>{args.desc}</h3>") | ||
for m in MANEUVERS: | ||
f.write("<div style='border-top: 1px solid #000; margin: 20px 0;'></div>\n") | ||
f.write(f"<h2>{m.description}</h2>\n") | ||
|
||
log = logs[m.description] | ||
|
||
plt.rcParams['font.size'] = 40 | ||
fig = plt.figure(figsize=(30, 20)) | ||
ax = fig.subplots(3, 1, sharex=True, gridspec_kw={'hspace': 0, 'height_ratios': [5, 1, 1]}) | ||
|
||
ax[0].grid(linewidth=4) | ||
ax[0].plot(log["t"], log["carState.aEgo"], label='aEgo', linewidth=6) | ||
ax[0].plot(log["t"], log["carControl.actuators.accel"], label='accel command', linewidth=6) | ||
ax[0].set_ylabel('Acceleration (m/s^2)') | ||
ax[0].set_ylim(-4.5, 4.5) | ||
ax[0].legend() | ||
|
||
ax[1].plot(log["t"], log["carControl.enabled"], label='enabled', linewidth=6) | ||
ax[2].plot(log["t"], log["carState.gasPressed"], label='gasPressed', linewidth=6) | ||
for i in (1, 2): | ||
ax[i].set_yticks([0, 1], minor=False) | ||
ax[i].set_ylim(-1, 2) | ||
ax[i].legend() | ||
|
||
ax[-1].set_xlabel("Time (s)") | ||
fig.tight_layout() | ||
|
||
buffer = io.BytesIO() | ||
fig.savefig(buffer, format='png') | ||
buffer.seek(0) | ||
f.write(f"<img src='data:image/png;base64,{base64.b64encode(buffer.getvalue()).decode()}' style='width:100%; max-width:800px;'>\n") | ||
|
||
print(f"\nReport written to {output_fn}\n") | ||
|
||
|
||
if __name__ == "__main__": | ||
parser = argparse.ArgumentParser(description="A tool for longitudinal control testing.", | ||
formatter_class=argparse.ArgumentDefaultsHelpFormatter) | ||
parser.add_argument('--desc', help="Extra description to include in report.") | ||
parser.add_argument('--output', help="Write out report to this file.", default=None) | ||
args = parser.parse_args() | ||
|
||
assert args.output is None or args.output.endswith(".html"), "Output filename must end with '.html'" | ||
|
||
main(args) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,35 +1,57 @@ | ||
from contextlib import contextmanager | ||
import time | ||
from contextlib import AbstractContextManager | ||
|
||
from panda import Panda | ||
from opendbc.car.car_helpers import get_car | ||
from opendbc.car.can_definitions import CanData | ||
from opendbc.car.structs import CarControl | ||
|
||
@contextmanager | ||
def PandaRunner(): | ||
p = Panda() | ||
class PandaRunner(AbstractContextManager): | ||
def __enter__(self): | ||
self.p = Panda() | ||
self.p.reset() | ||
|
||
def _can_recv(wait_for_one: bool = False) -> list[list[CanData]]: | ||
recv = p.can_recv() | ||
while len(recv) == 0 and wait_for_one: | ||
recv = p.can_recv() | ||
return [[CanData(addr, dat, bus) for addr, dat, bus in recv], ] | ||
|
||
try: | ||
# setup + fingerprinting | ||
p.set_safety_mode(Panda.SAFETY_ELM327, 1) | ||
CI = get_car(_can_recv, p.can_send_many, p.set_obd, True) | ||
print("fingerprinted", CI.CP.carName) | ||
assert CI.CP.carFingerprint != "mock", "Unable to identify car. Check connections and ensure car is supported." | ||
self.p.set_safety_mode(Panda.SAFETY_ELM327, 1) | ||
self.CI = get_car(self._can_recv, self.p.can_send_many, self.p.set_obd, True) | ||
print("fingerprinted", self.CI.CP.carName) | ||
assert self.CI.CP.carFingerprint != "mock", "Unable to identify car. Check connections and ensure car is supported." | ||
|
||
self.p.set_safety_mode(Panda.SAFETY_ELM327, 1) | ||
self.CI.init(self.CI.CP, self._can_recv, self.p.can_send_many) | ||
self.p.set_safety_mode(Panda.SAFETY_TOYOTA, self.CI.CP.safetyConfigs[0].safetyParam) | ||
|
||
return self | ||
|
||
p.set_safety_mode(Panda.SAFETY_ELM327, 1) | ||
CI.init(CI.CP, _can_recv, p.can_send_many) | ||
p.set_safety_mode(Panda.SAFETY_TOYOTA, CI.CP.safetyConfigs[0].safetyParam) | ||
def __exit__(self, exc_type, exc_value, traceback): | ||
self.p.set_safety_mode(Panda.SAFETY_NOOUTPUT) | ||
self.p.reset() # avoid siren | ||
return super().__exit__(exc_type, exc_value, traceback) | ||
|
||
@property | ||
def panda(self) -> Panda: | ||
return self.p | ||
|
||
def _can_recv(self, wait_for_one: bool = False) -> list[list[CanData]]: | ||
recv = self.p.can_recv() | ||
while len(recv) == 0 and wait_for_one: | ||
recv = self.p.can_recv() | ||
return [[CanData(addr, dat, bus) for addr, dat, bus in recv], ] | ||
|
||
yield p, CI | ||
finally: | ||
p.set_safety_mode(Panda.SAFETY_NOOUTPUT) | ||
def read(self, strict: bool = True): | ||
cs = self.CI.update([int(time.monotonic()*1e9), self._can_recv()[0]]) | ||
if strict: | ||
assert cs.canValid, "CAN went invalid, check connections" | ||
return cs | ||
|
||
def write(self, cc: CarControl) -> None: | ||
if cc.enabled and not self.p.health()['controls_allowed']: | ||
# prevent the car from faulting. print a warning? | ||
cc = CarControl(enabled=False) | ||
_, can_sends = self.CI.apply(cc) | ||
self.p.can_send_many(can_sends, timeout=25) | ||
self.p.send_heartbeat() | ||
|
||
if __name__ == "__main__": | ||
with PandaRunner() as (p, CI): | ||
print(p.can_recv()) | ||
with PandaRunner() as p: | ||
print(p.read()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,7 @@ docs = [ | |
] | ||
examples = [ | ||
"inputs", | ||
"matplotlib", | ||
] | ||
|
||
[tool.pytest.ini_options] | ||
|