Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

model replay: more consistent replay #23237

Merged
merged 4 commits into from
Dec 16, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 72 additions & 82 deletions selfdrive/test/process_replay/model_replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
import os
import sys
import time
from typing import Any

from collections import defaultdict
from tqdm import tqdm
from typing import Any

import cereal.messaging as messaging
from cereal.visionipc.visionipc_pyx import VisionIpcServer, VisionStreamType # pylint: disable=no-name-in-module, import-error
from common.spinner import Spinner
from common.timeout import Timeout
from common.transformations.camera import get_view_frame_from_road_frame, eon_f_frame_size, tici_f_frame_size, \
eon_d_frame_size, tici_d_frame_size
eon_d_frame_size, tici_d_frame_size
from selfdrive.hardware import PC, TICI
from selfdrive.manager.process_config import managed_processes
from selfdrive.test.openpilotci import BASE_URL, get_url
Expand All @@ -25,50 +25,23 @@
TEST_ROUTE = "4cf7a6ad03080c90|2021-09-29--13-46-36"
else:
TEST_ROUTE = "303055c0002aefd1|2021-11-22--18-36-32"
SEGMENT = 0

CACHE_DIR = os.getenv("CACHE_DIR", None)
SEND_EXTRA_INPUTS = bool(os.getenv("SEND_EXTRA_INPUTS", "0"))

packet_from_camera = {"roadCameraState":"modelV2", "driverCameraState":"driverState"}

def get_log_fn(ref_commit):
return "%s_%s_%s.bz2" % (TEST_ROUTE, "model_tici" if TICI else "model", ref_commit)


def replace_calib(msg, calib):
msg = msg.as_builder()
if calib is not None:
msg.liveCalibration.extrinsicMatrix = get_view_frame_from_road_frame(*calib, 1.22).flatten().tolist()
return msg

def process_frame(msg, pm, sm, log_msgs, vipc_server, spinner, frs, frame_idxs, last_desire):
if msg.which() == "roadCameraState" and last_desire is not None:
dat = messaging.new_message('lateralPlan')
dat.lateralPlan.desire = last_desire
pm.send('lateralPlan', dat)

f = msg.as_builder()
pm.send(msg.which(), f)

img = frs[msg.which()].get(frame_idxs[msg.which()], pix_fmt="yuv420p")[0]
if msg.which == "roadCameraState":
vipc_server.send(VisionStreamType.VISION_STREAM_ROAD, img.flatten().tobytes(), f.roadCameraState.frameId,
f.roadCameraState.timestampSof, f.roadCameraState.timestampEof)
else:
vipc_server.send(VisionStreamType.VISION_STREAM_DRIVER, img.flatten().tobytes(), f.driverCameraState.frameId,
f.driverCameraState.timestampSof, f.driverCameraState.timestampEof)
with Timeout(seconds=15):
log_msgs.append(messaging.recv_one(sm.sock[packet_from_camera[msg.which()]]))

frame_idxs[msg.which()] += 1
if frame_idxs[msg.which()] >= frs[msg.which()].frame_count:
return None
update_spinner(spinner, frame_idxs['roadCameraState'], frs['roadCameraState'].frame_count,
frame_idxs['driverCameraState'], frs['driverCameraState'].frame_count)
return 0

def update_spinner(s, fidx, fcnt, didx, dcnt):
s.update("replaying models: road %d/%d, driver %d/%d" % (fidx, fcnt, didx, dcnt))

def model_replay(lr_list, frs):

def model_replay(lr, frs):
spinner = Spinner()
spinner.update("starting model replay")

Expand All @@ -77,93 +50,110 @@ def model_replay(lr_list, frs):
vipc_server.create_buffers(VisionStreamType.VISION_STREAM_DRIVER, 40, False, *(tici_d_frame_size if TICI else eon_d_frame_size))
vipc_server.start_listener()

pm = messaging.PubMaster(['roadCameraState', 'driverCameraState', 'liveCalibration', 'lateralPlan'])
sm = messaging.SubMaster(['modelV2', 'driverState'])
pm = messaging.PubMaster(['roadCameraState', 'driverCameraState', 'liveCalibration', 'lateralPlan'])

try:
managed_processes['modeld'].start()
managed_processes['dmonitoringmodeld'].start()
time.sleep(5)
time.sleep(2)
sm.update(1000)

last_desire = None
log_msgs = []
frame_idxs = dict.fromkeys(['roadCameraState','driverCameraState'], 0)

cal = [msg for msg in lr if msg.which() == "liveCalibration"]
for msg in cal[:5]:
pm.send(msg.which(), replace_calib(msg, None))

for msg in tqdm(lr_list):
if msg.which() == "liveCalibration":
last_calib = list(msg.liveCalibration.rpyCalib)
pm.send(msg.which(), replace_calib(msg, last_calib))
elif msg.which() == "lateralPlan":
last_desire = msg.lateralPlan.desire
elif msg.which() in ["roadCameraState", "driverCameraState"]:
ret = process_frame(msg, pm, sm, log_msgs, vipc_server, spinner, frs, frame_idxs, last_desire)
if ret is None:
last_desire = None
frame_idxs = defaultdict(lambda: 0)

# init modeld with valid calibration
cal_msgs = [msg for msg in lr if msg.which() == "liveCalibration"]
for _ in range(5):
pm.send(cal_msgs[0].which(), cal_msgs[0].as_builder())
time.sleep(0.1)

for msg in tqdm(lr):
if SEND_EXTRA_INPUTS:
if msg.which() == "liveCalibration":
last_calib = list(msg.liveCalibration.rpyCalib)
pm.send(msg.which(), replace_calib(msg, last_calib))
elif msg.which() == "lateralPlan":
last_desire = msg.lateralPlan.desire
dat = messaging.new_message('lateralPlan')
dat.lateralPlan.desire = last_desire
pm.send('lateralPlan', dat)

if msg.which() in ["roadCameraState", "driverCameraState"]:
camera_state = getattr(msg, msg.which())
stream = VisionStreamType.VISION_STREAM_ROAD if msg.which() == "roadCameraState" else VisionStreamType.VISION_STREAM_DRIVER
img = frs[msg.which()].get(frame_idxs[msg.which()], pix_fmt="yuv420p")[0]

# send camera state and frame
pm.send(msg.which(), msg.as_builder())
vipc_server.send(stream, img.flatten().tobytes(), camera_state.frameId,
camera_state.timestampSof, camera_state.timestampEof)

# wait for a response
with Timeout(seconds=15):
packet_from_camera = {"roadCameraState": "modelV2", "driverCameraState": "driverState"}
log_msgs.append(messaging.recv_one(sm.sock[packet_from_camera[msg.which()]]))

frame_idxs[msg.which()] += 1
if frame_idxs[msg.which()] >= frs[msg.which()].frame_count:
break

except KeyboardInterrupt:
pass
spinner.update("replaying models: road %d/%d, driver %d/%d" % (frame_idxs['roadCameraState'],
frs['roadCameraState'].frame_count, frame_idxs['driverCameraState'], frs['driverCameraState'].frame_count))

finally:
spinner.close()
managed_processes['modeld'].stop()
managed_processes['dmonitoringmodeld'].stop()


return log_msgs


if __name__ == "__main__":

update = "--update" in sys.argv

if TICI:
os.system('sudo mount -o remount,size=200M /tmp') # c3 hevcs are 75M each

replay_dir = os.path.dirname(os.path.abspath(__file__))
ref_commit_fn = os.path.join(replay_dir, "model_replay_ref_commit")

segnum = 0
frs = {}
if CACHE_DIR:
lr = LogReader(os.path.join(CACHE_DIR, '%s--%d--rlog.bz2' % (TEST_ROUTE.replace('|', '_'), segnum)))
frs['roadCameraState'] = FrameReader(os.path.join(CACHE_DIR, '%s--%d--fcamera.hevc' % (TEST_ROUTE.replace('|', '_'), segnum)))
frs['driverCameraState'] = FrameReader(os.path.join(CACHE_DIR, '%s--%d--dcamera.hevc' % (TEST_ROUTE.replace('|', '_'), segnum)))
else:
lr = LogReader(get_url(TEST_ROUTE, segnum))
frs['roadCameraState'] = FrameReader(get_url(TEST_ROUTE, segnum, log_type="fcamera"))
frs['driverCameraState'] = FrameReader(get_url(TEST_ROUTE, segnum, log_type="dcamera"))
# load logs
lr = list(LogReader(get_url(TEST_ROUTE, SEGMENT)))
frs = {
'roadCameraState': FrameReader(get_url(TEST_ROUTE, SEGMENT, log_type="fcamera")),
'driverCameraState': FrameReader(get_url(TEST_ROUTE, SEGMENT, log_type="dcamera")),
}

lr_list = list(lr)
log_msgs = model_replay(lr_list, frs)
# run replay
log_msgs = model_replay(lr, frs)

# get diff
failed = False
if not update:
ref_commit = open(ref_commit_fn).read().strip()
with open(ref_commit_fn) as f:
ref_commit = f.read().strip()
log_fn = get_log_fn(ref_commit)
cmp_log = LogReader(BASE_URL + log_fn)

ignore = ['logMonoTime', 'valid',
'modelV2.frameDropPerc',
'modelV2.modelExecutionTime',
'driverState.modelExecutionTime',
'driverState.dspExecutionTime']
ignore = [
'logMonoTime',
'modelV2.frameDropPerc',
'modelV2.modelExecutionTime',
'driverState.modelExecutionTime',
'driverState.dspExecutionTime'
]
tolerance = None if not PC else 1e-3
results: Any = {TEST_ROUTE: {}}
results[TEST_ROUTE]["models"] = compare_logs(cmp_log, log_msgs, tolerance=tolerance, ignore_fields=ignore)
diff1, diff2, failed = format_diff(results, ref_commit)

print(diff2)
print('-------------')
print('-------------')
print('-------------')
print('-------------')
print('-------------')
print('-------------\n'*5)
print(diff1)
with open("model_diff.txt", "w") as f:
f.write(diff2)

# upload new refs
if update or failed:
from selfdrive.test.openpilotci import upload_file

Expand Down