diff --git a/examples/longitudinal-profiles.py b/examples/longitudinal-profiles.py
index 2e7337441d..c259074eb5 100755
--- a/examples/longitudinal-profiles.py
+++ b/examples/longitudinal-profiles.py
@@ -1,5 +1,6 @@
#!/usr/bin/env python3
import io
+import os
import time
import base64
import argparse
@@ -11,9 +12,15 @@
from opendbc.car.structs import CarControl
from opendbc.car.panda_runner import PandaRunner
+from opendbc.car.common.conversions import Conversions
DT = 0.01 # step time (s)
+# TODOs
+# - support lateral maneuvers
+# - setup: show countdown?
+
+
@dataclass
class Action:
accel: float # m/s^2
@@ -37,6 +44,8 @@ def get_msgs(self):
class Maneuver:
description: str
actions: list[Action]
+ repeat: int = 1
+ initial_speed: float = 0. # m/s
def get_msgs(self):
t0 = 0
@@ -47,110 +56,160 @@ def get_msgs(self):
MANEUVERS = [
Maneuver(
- "creeping: alternate between +1m/ss and -1m/ss",
- [
- Action(1, 2), Action(-1, 2),
- Action(1, 2), Action(-1, 2),
- Action(1, 2), Action(-1, 2),
- ],
+ "creep: alternate between +1m/ss and -1m/ss",
+ [
+ Action(1, 2), Action(-1, 2),
+ Action(1, 2), Action(-1, 2),
+ Action(1, 2), Action(-1, 2),
+ ],
+ initial_speed=0.,
+ ),
+ Maneuver(
+ "brake step response: -1m/ss from 20mph",
+ [Action(-1, 3),],
+ repeat=3,
+ initial_speed=20. * Conversions.MPH_TO_MS,
+ ),
+ Maneuver(
+ "brake step response: -4m/ss from 20mph",
+ [Action(-4, 3),],
+ repeat=3,
+ initial_speed=15. * Conversions.MPH_TO_MS,
+ ),
+ Maneuver(
+ "gas step response: +1m/ss from 20mph",
+ [Action(1, 3),],
+ repeat=3,
+ initial_speed=20. * Conversions.MPH_TO_MS,
+ ),
+ Maneuver(
+ "gas step response: +4m/ss from 20mph",
+ [Action(4, 3),],
+ repeat=3,
+ initial_speed=20. * Conversions.MPH_TO_MS,
),
]
-def main(args):
- with PandaRunner() as p:
- print("\n\n")
-
- logs = {}
- for i, m in enumerate(MANEUVERS):
- print(f"Running {i+1}/{len(MANEUVERS)} '{m.description}'")
-
- print("- setting up")
- good_cnt = 0
- for _ in range(int(30./DT)):
- cs = p.read(strict=False)
-
- cc = CarControl(
- enabled=True,
- longActive=True,
- actuators=CarControl.Actuators(accel=-1.5, longControlState=CarControl.Actuators.LongControlState.stopping),
- )
- p.write(cc)
-
- good_cnt = (good_cnt+1) if cs.vEgo < 0.1 and cs.cruiseState.enabled and not cs.cruiseState.standstill else 0
- if good_cnt > (2./DT):
- break
- time.sleep(DT)
- else:
- print("ERROR: failed to setup")
- continue
-
- print("- executing maneuver")
- logs[m.description] = defaultdict(list)
- for t, cc in m.get_msgs():
- cs = p.read()
- p.write(cc)
-
- logs[m.description]["t"].append(t)
- to_log = {"carControl": cc, "carState": cs, "carControl.actuators": cc.actuators,
- "carControl.cruiseControl": cc.cruiseControl, "carState.cruiseState": cs.cruiseState}
- for k, v in to_log.items():
- for k2, v2 in asdict(v).items():
- logs[m.description][f"{k}.{k2}"].append(v2)
-
- time.sleep(DT)
-
- # ***** write out report *****
-
+def report(args, logs, fp):
output_path = Path(__file__).resolve().parent / "longitudinal_reports"
- output_fn = args.output or output_path / f"{p.CI.CP.carFingerprint}_{time.strftime('%Y%m%d-%H_%M_%S')}.html"
+ output_fn = args.output or output_path / f"{fp}_{time.strftime('%Y%m%d-%H_%M_%S')}.html"
output_path.mkdir(exist_ok=True)
with open(output_fn, "w") as f:
f.write("
Longitudinal maneuver report
\n")
- f.write(f"{p.CI.CP.carFingerprint}
\n")
+ f.write(f"{fp}
\n")
if args.desc:
f.write(f"{args.desc}
")
- for m in MANEUVERS:
+ for description, runs in logs.items():
f.write("\n")
- f.write(f"{m.description}
\n")
-
- log = logs[m.description]
-
- plt.rcParams['font.size'] = 40
- fig = plt.figure(figsize=(30, 20))
- ax = fig.subplots(3, 1, sharex=True, gridspec_kw={'hspace': 0, 'height_ratios': [5, 1, 1]})
-
- ax[0].grid(linewidth=4)
- ax[0].plot(log["t"], log["carState.aEgo"], label='aEgo', linewidth=6)
- ax[0].plot(log["t"], log["carControl.actuators.accel"], label='accel command', linewidth=6)
- ax[0].set_ylabel('Acceleration (m/s^2)')
- ax[0].set_ylim(-4.5, 4.5)
- ax[0].legend()
-
- ax[1].plot(log["t"], log["carControl.enabled"], label='enabled', linewidth=6)
- ax[2].plot(log["t"], log["carState.gasPressed"], label='gasPressed', linewidth=6)
- for i in (1, 2):
- ax[i].set_yticks([0, 1], minor=False)
- ax[i].set_ylim(-1, 2)
- ax[i].legend()
-
- ax[-1].set_xlabel("Time (s)")
- fig.tight_layout()
+ f.write(f"{description}
\n")
+ for run, log in runs.items():
+ f.write(f"Run #{int(run)+1}
\n")
+ plt.rcParams['font.size'] = 40
+ fig = plt.figure(figsize=(30, 25))
+ ax = fig.subplots(4, 1, sharex=True, gridspec_kw={'hspace': 0, 'height_ratios': [5, 3, 1, 1]})
+
+ ax[0].grid(linewidth=4)
+ ax[0].plot(log["t"], log["carControl.actuators.accel"], label='accel command', linewidth=6)
+ ax[0].plot(log["t"], log["carState.aEgo"], label='aEgo', linewidth=6)
+ ax[0].set_ylabel('Acceleration (m/s^2)')
+ #ax[0].set_ylim(-6.5, 6.5)
+ ax[0].legend()
+
+ ax[1].grid(linewidth=4)
+ ax[1].plot(log["t"], log["carState.vEgo"], 'g', label='vEgo', linewidth=6)
+ ax[1].set_ylabel('Velocity (m/s)')
+ ax[1].legend()
+
+ ax[2].plot(log["t"], log["carControl.enabled"], label='enabled', linewidth=6)
+ ax[3].plot(log["t"], log["carState.gasPressed"], label='gasPressed', linewidth=6)
+ ax[3].plot(log["t"], log["carState.brakePressed"], label='brakePressed', linewidth=6)
+ for i in (2, 3):
+ ax[i].set_yticks([0, 1], minor=False)
+ ax[i].set_ylim(-1, 2)
+ ax[i].legend()
+
+ ax[-1].set_xlabel("Time (s)")
+ fig.tight_layout()
+
+ buffer = io.BytesIO()
+ fig.savefig(buffer, format='png')
+ buffer.seek(0)
+ f.write(f"\n")
+
+ import json
+ f.write(f"{json.dumps(logs)}
")
+ print(f"\nReport written to {output_fn}\n")
- buffer = io.BytesIO()
- fig.savefig(buffer, format='png')
- buffer.seek(0)
- f.write(f"\n")
+def main(args):
+ with PandaRunner() as p:
+ print("\n\n")
- print(f"\nReport written to {output_fn}\n")
+ logs = {}
+ for i, m in enumerate(MANEUVERS):
+ logs[m.description] = {}
+ print(f"Running {i+1}/{len(MANEUVERS)} '{m.description}'")
+ for run in range(m.repeat):
+ print(f"- run #{run}")
+ print("- setting up, engage cruise")
+ ready_cnt = 0
+ for _ in range(int(60./DT)):
+ cs = p.read(strict=False)
+
+ cc = CarControl(
+ enabled=True,
+ longActive=True,
+ actuators=CarControl.Actuators(
+ accel=(m.initial_speed - cs.vEgo)*0.5,
+ longControlState=CarControl.Actuators.LongControlState.stopping if m.initial_speed < 0.1 else CarControl.Actuators.LongControlState.pid,
+ ),
+ )
+ p.write(cc)
+
+ ready = cs.cruiseState.enabled and not cs.cruiseState.standstill and ((m.initial_speed - 0.3) < cs.vEgo < (m.initial_speed + 0.3))
+ ready_cnt = (ready_cnt+1) if ready else 0
+ if ready_cnt > (3./DT):
+ break
+ time.sleep(DT)
+ else:
+ print("ERROR: failed to setup")
+ continue
+
+ print("- executing maneuver")
+ logs[m.description][run] = defaultdict(list)
+ for t, cc in m.get_msgs():
+ cs = p.read()
+ p.write(cc)
+
+ logs[m.description][run]["t"].append(t)
+ to_log = {"carControl": cc, "carState": cs, "carControl.actuators": cc.actuators,
+ "carControl.cruiseControl": cc.cruiseControl, "carState.cruiseState": cs.cruiseState}
+ for k, v in to_log.items():
+ for k2, v2 in asdict(v).items():
+ logs[m.description][run][f"{k}.{k2}"].append(v2)
+
+ time.sleep(DT)
+
+ with open('/tmp/logs.json', 'w') as f:
+ import json
+ json.dump(logs, f, indent=2)
+ report(args, logs, p.CI.CP.carFingerprint)
if __name__ == "__main__":
+
parser = argparse.ArgumentParser(description="A tool for longitudinal control testing.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--desc', help="Extra description to include in report.")
parser.add_argument('--output', help="Write out report to this file.", default=None)
args = parser.parse_args()
+ if "REPORT_TEST" in os.environ:
+ with open(os.environ["REPORT_TEST"]) as f:
+ import json
+ logs = json.loads(f.read().split("none'>")[1].split('')[0])
+ report(args, logs, "testing")
+ exit()
+
assert args.output is None or args.output.endswith(".html"), "Output filename must end with '.html'"
- main(args)
\ No newline at end of file
+ main(args)