Skip to content

Commit

Permalink
longitudinal maneuvers: add gas/brake step responses (commaai#1207)
Browse files Browse the repository at this point in the history
* longitudinal maneuvers: add gas/brake step responses

* corolla updates

* initial speed

* try beep

* beep

* corolla updates

* add vego

* enable all

---------

Co-authored-by: Comma Device <device@comma.ai>
  • Loading branch information
adeebshihadeh and Comma Device authored Aug 30, 2024
1 parent f2fa755 commit c928f6b
Showing 1 changed file with 143 additions and 84 deletions.
227 changes: 143 additions & 84 deletions examples/longitudinal-profiles.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python3
import io
import os
import time
import base64
import argparse
Expand All @@ -11,9 +12,15 @@

from opendbc.car.structs import CarControl
from opendbc.car.panda_runner import PandaRunner
from opendbc.car.common.conversions import Conversions

DT = 0.01 # step time (s)

# TODOs
# - support lateral maneuvers
# - setup: show countdown?


@dataclass
class Action:
accel: float # m/s^2
Expand All @@ -37,6 +44,8 @@ def get_msgs(self):
class Maneuver:
description: str
actions: list[Action]
repeat: int = 1
initial_speed: float = 0. # m/s

def get_msgs(self):
t0 = 0
Expand All @@ -47,110 +56,160 @@ def get_msgs(self):

MANEUVERS = [
Maneuver(
"creeping: alternate between +1m/ss and -1m/ss",
[
Action(1, 2), Action(-1, 2),
Action(1, 2), Action(-1, 2),
Action(1, 2), Action(-1, 2),
],
"creep: alternate between +1m/ss and -1m/ss",
[
Action(1, 2), Action(-1, 2),
Action(1, 2), Action(-1, 2),
Action(1, 2), Action(-1, 2),
],
initial_speed=0.,
),
Maneuver(
"brake step response: -1m/ss from 20mph",
[Action(-1, 3),],
repeat=3,
initial_speed=20. * Conversions.MPH_TO_MS,
),
Maneuver(
"brake step response: -4m/ss from 20mph",
[Action(-4, 3),],
repeat=3,
initial_speed=15. * Conversions.MPH_TO_MS,
),
Maneuver(
"gas step response: +1m/ss from 20mph",
[Action(1, 3),],
repeat=3,
initial_speed=20. * Conversions.MPH_TO_MS,
),
Maneuver(
"gas step response: +4m/ss from 20mph",
[Action(4, 3),],
repeat=3,
initial_speed=20. * Conversions.MPH_TO_MS,
),
]

def main(args):
with PandaRunner() as p:
print("\n\n")

logs = {}
for i, m in enumerate(MANEUVERS):
print(f"Running {i+1}/{len(MANEUVERS)} '{m.description}'")

print("- setting up")
good_cnt = 0
for _ in range(int(30./DT)):
cs = p.read(strict=False)

cc = CarControl(
enabled=True,
longActive=True,
actuators=CarControl.Actuators(accel=-1.5, longControlState=CarControl.Actuators.LongControlState.stopping),
)
p.write(cc)

good_cnt = (good_cnt+1) if cs.vEgo < 0.1 and cs.cruiseState.enabled and not cs.cruiseState.standstill else 0
if good_cnt > (2./DT):
break
time.sleep(DT)
else:
print("ERROR: failed to setup")
continue

print("- executing maneuver")
logs[m.description] = defaultdict(list)
for t, cc in m.get_msgs():
cs = p.read()
p.write(cc)

logs[m.description]["t"].append(t)
to_log = {"carControl": cc, "carState": cs, "carControl.actuators": cc.actuators,
"carControl.cruiseControl": cc.cruiseControl, "carState.cruiseState": cs.cruiseState}
for k, v in to_log.items():
for k2, v2 in asdict(v).items():
logs[m.description][f"{k}.{k2}"].append(v2)

time.sleep(DT)

# ***** write out report *****

def report(args, logs, fp):
output_path = Path(__file__).resolve().parent / "longitudinal_reports"
output_fn = args.output or output_path / f"{p.CI.CP.carFingerprint}_{time.strftime('%Y%m%d-%H_%M_%S')}.html"
output_fn = args.output or output_path / f"{fp}_{time.strftime('%Y%m%d-%H_%M_%S')}.html"
output_path.mkdir(exist_ok=True)
with open(output_fn, "w") as f:
f.write("<h1>Longitudinal maneuver report</h1>\n")
f.write(f"<h3>{p.CI.CP.carFingerprint}</h3>\n")
f.write(f"<h3>{fp}</h3>\n")
if args.desc:
f.write(f"<h3>{args.desc}</h3>")
for m in MANEUVERS:
for description, runs in logs.items():
f.write("<div style='border-top: 1px solid #000; margin: 20px 0;'></div>\n")
f.write(f"<h2>{m.description}</h2>\n")

log = logs[m.description]

plt.rcParams['font.size'] = 40
fig = plt.figure(figsize=(30, 20))
ax = fig.subplots(3, 1, sharex=True, gridspec_kw={'hspace': 0, 'height_ratios': [5, 1, 1]})

ax[0].grid(linewidth=4)
ax[0].plot(log["t"], log["carState.aEgo"], label='aEgo', linewidth=6)
ax[0].plot(log["t"], log["carControl.actuators.accel"], label='accel command', linewidth=6)
ax[0].set_ylabel('Acceleration (m/s^2)')
ax[0].set_ylim(-4.5, 4.5)
ax[0].legend()

ax[1].plot(log["t"], log["carControl.enabled"], label='enabled', linewidth=6)
ax[2].plot(log["t"], log["carState.gasPressed"], label='gasPressed', linewidth=6)
for i in (1, 2):
ax[i].set_yticks([0, 1], minor=False)
ax[i].set_ylim(-1, 2)
ax[i].legend()

ax[-1].set_xlabel("Time (s)")
fig.tight_layout()
f.write(f"<h2>{description}</h2>\n")
for run, log in runs.items():
f.write(f"<h3>Run #{int(run)+1}</h3>\n")
plt.rcParams['font.size'] = 40
fig = plt.figure(figsize=(30, 25))
ax = fig.subplots(4, 1, sharex=True, gridspec_kw={'hspace': 0, 'height_ratios': [5, 3, 1, 1]})

ax[0].grid(linewidth=4)
ax[0].plot(log["t"], log["carControl.actuators.accel"], label='accel command', linewidth=6)
ax[0].plot(log["t"], log["carState.aEgo"], label='aEgo', linewidth=6)
ax[0].set_ylabel('Acceleration (m/s^2)')
#ax[0].set_ylim(-6.5, 6.5)
ax[0].legend()

ax[1].grid(linewidth=4)
ax[1].plot(log["t"], log["carState.vEgo"], 'g', label='vEgo', linewidth=6)
ax[1].set_ylabel('Velocity (m/s)')
ax[1].legend()

ax[2].plot(log["t"], log["carControl.enabled"], label='enabled', linewidth=6)
ax[3].plot(log["t"], log["carState.gasPressed"], label='gasPressed', linewidth=6)
ax[3].plot(log["t"], log["carState.brakePressed"], label='brakePressed', linewidth=6)
for i in (2, 3):
ax[i].set_yticks([0, 1], minor=False)
ax[i].set_ylim(-1, 2)
ax[i].legend()

ax[-1].set_xlabel("Time (s)")
fig.tight_layout()

buffer = io.BytesIO()
fig.savefig(buffer, format='png')
buffer.seek(0)
f.write(f"<img src='data:image/png;base64,{base64.b64encode(buffer.getvalue()).decode()}' style='width:100%; max-width:800px;'>\n")

import json
f.write(f"<p style='display: none'>{json.dumps(logs)}</p>")
print(f"\nReport written to {output_fn}\n")

buffer = io.BytesIO()
fig.savefig(buffer, format='png')
buffer.seek(0)
f.write(f"<img src='data:image/png;base64,{base64.b64encode(buffer.getvalue()).decode()}' style='width:100%; max-width:800px;'>\n")
def main(args):
with PandaRunner() as p:
print("\n\n")

print(f"\nReport written to {output_fn}\n")
logs = {}
for i, m in enumerate(MANEUVERS):
logs[m.description] = {}
print(f"Running {i+1}/{len(MANEUVERS)} '{m.description}'")
for run in range(m.repeat):
print(f"- run #{run}")
print("- setting up, engage cruise")
ready_cnt = 0
for _ in range(int(60./DT)):
cs = p.read(strict=False)

cc = CarControl(
enabled=True,
longActive=True,
actuators=CarControl.Actuators(
accel=(m.initial_speed - cs.vEgo)*0.5,
longControlState=CarControl.Actuators.LongControlState.stopping if m.initial_speed < 0.1 else CarControl.Actuators.LongControlState.pid,
),
)
p.write(cc)

ready = cs.cruiseState.enabled and not cs.cruiseState.standstill and ((m.initial_speed - 0.3) < cs.vEgo < (m.initial_speed + 0.3))
ready_cnt = (ready_cnt+1) if ready else 0
if ready_cnt > (3./DT):
break
time.sleep(DT)
else:
print("ERROR: failed to setup")
continue

print("- executing maneuver")
logs[m.description][run] = defaultdict(list)
for t, cc in m.get_msgs():
cs = p.read()
p.write(cc)

logs[m.description][run]["t"].append(t)
to_log = {"carControl": cc, "carState": cs, "carControl.actuators": cc.actuators,
"carControl.cruiseControl": cc.cruiseControl, "carState.cruiseState": cs.cruiseState}
for k, v in to_log.items():
for k2, v2 in asdict(v).items():
logs[m.description][run][f"{k}.{k2}"].append(v2)

time.sleep(DT)

with open('/tmp/logs.json', 'w') as f:
import json
json.dump(logs, f, indent=2)
report(args, logs, p.CI.CP.carFingerprint)


if __name__ == "__main__":

parser = argparse.ArgumentParser(description="A tool for longitudinal control testing.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--desc', help="Extra description to include in report.")
parser.add_argument('--output', help="Write out report to this file.", default=None)
args = parser.parse_args()

if "REPORT_TEST" in os.environ:
with open(os.environ["REPORT_TEST"]) as f:
import json
logs = json.loads(f.read().split("none'>")[1].split('</p>')[0])
report(args, logs, "testing")
exit()

assert args.output is None or args.output.endswith(".html"), "Output filename must end with '.html'"

main(args)
main(args)

0 comments on commit c928f6b

Please sign in to comment.