From f37150ce1701dcf942bb3439557da4eb0e028b84 Mon Sep 17 00:00:00 2001
From: "temkei.kem" <1041084556@qq.com>
Date: Fri, 27 Sep 2024 16:06:35 +0900
Subject: [PATCH 1/3] support new perception_reproducer
---
perception/multi_object_tracker/package.xml | 2 +-
.../perception_replayer.py | 0
.../perception_replayer_common.py | 185 +++++++++++++
.../old_reproducer/utils.py | 152 ++++++++++
.../perception_replayer_common.py | 47 ++--
.../perception_reproducer.py | 259 ++++++++++++++----
.../scripts/perception_replayer/utils.py | 9 +-
7 files changed, 582 insertions(+), 72 deletions(-)
rename planning/planning_debug_tools/scripts/perception_replayer/{ => old_reproducer}/perception_replayer.py (100%)
create mode 100644 planning/planning_debug_tools/scripts/perception_replayer/old_reproducer/perception_replayer_common.py
create mode 100644 planning/planning_debug_tools/scripts/perception_replayer/old_reproducer/utils.py
diff --git a/perception/multi_object_tracker/package.xml b/perception/multi_object_tracker/package.xml
index e3172dfd22349..f343492b2c2e3 100644
--- a/perception/multi_object_tracker/package.xml
+++ b/perception/multi_object_tracker/package.xml
@@ -24,7 +24,7 @@
tier4_autoware_utils
tier4_perception_msgs
unique_identifier_msgs
-
+ diagnostic_updater
ament_lint_auto
autoware_lint_common
diff --git a/planning/planning_debug_tools/scripts/perception_replayer/perception_replayer.py b/planning/planning_debug_tools/scripts/perception_replayer/old_reproducer/perception_replayer.py
similarity index 100%
rename from planning/planning_debug_tools/scripts/perception_replayer/perception_replayer.py
rename to planning/planning_debug_tools/scripts/perception_replayer/old_reproducer/perception_replayer.py
diff --git a/planning/planning_debug_tools/scripts/perception_replayer/old_reproducer/perception_replayer_common.py b/planning/planning_debug_tools/scripts/perception_replayer/old_reproducer/perception_replayer_common.py
new file mode 100644
index 0000000000000..22a73fab5199a
--- /dev/null
+++ b/planning/planning_debug_tools/scripts/perception_replayer/old_reproducer/perception_replayer_common.py
@@ -0,0 +1,185 @@
+#!/usr/bin/env python3
+
+# Copyright 2023 TIER IV, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+from subprocess import CalledProcessError
+from subprocess import check_output
+import time
+
+from autoware_auto_perception_msgs.msg import DetectedObjects
+from autoware_auto_perception_msgs.msg import PredictedObjects
+from autoware_auto_perception_msgs.msg import TrackedObjects
+from autoware_auto_perception_msgs.msg import TrafficSignalArray as AutoTrafficSignalArray
+from autoware_perception_msgs.msg import TrafficSignalArray
+from geometry_msgs.msg import PoseWithCovarianceStamped
+from nav_msgs.msg import Odometry
+import psutil
+from rclpy.node import Node
+from rclpy.serialization import deserialize_message
+from rosbag2_py import StorageFilter
+from rosidl_runtime_py.utilities import get_message
+from sensor_msgs.msg import PointCloud2
+from utils import open_reader
+
+
+class PerceptionReplayerCommon(Node):
+ def __init__(self, args, name):
+ super().__init__(name)
+ self.args = args
+
+ self.ego_pose = None
+ self.rosbag_objects_data = []
+ self.rosbag_ego_odom_data = []
+ self.rosbag_traffic_signals_data = []
+ self.is_auto_traffic_signals = False
+
+ # subscriber
+ self.sub_odom = self.create_subscription(
+ Odometry, "/localization/kinematic_state", self.on_odom, 1
+ )
+
+ # publisher
+ if self.args.detected_object:
+ self.objects_pub = self.create_publisher(
+ DetectedObjects, "/perception/object_recognition/detection/objects", 1
+ )
+ elif self.args.tracked_object:
+ self.objects_pub = self.create_publisher(
+ TrackedObjects, "/perception/object_recognition/tracking/objects", 1
+ )
+ else:
+ self.objects_pub = self.create_publisher(
+ PredictedObjects, "/perception/object_recognition/objects", 1
+ )
+
+ self.pointcloud_pub = self.create_publisher(
+ PointCloud2, "/perception/obstacle_segmentation/pointcloud", 1
+ )
+ self.recorded_ego_pub_as_initialpose = self.create_publisher(
+ PoseWithCovarianceStamped, "/initialpose", 1
+ )
+
+ self.recorded_ego_pub = self.create_publisher(
+ Odometry, "/perception_reproducer/rosbag_ego_odom", 1
+ )
+
+ # load rosbag
+ print("Stared loading rosbag")
+ if os.path.isdir(args.bag):
+ for bag_file in sorted(os.listdir(args.bag)):
+ if bag_file.endswith(self.args.rosbag_format):
+ self.load_rosbag(args.bag + "/" + bag_file)
+ else:
+ self.load_rosbag(args.bag)
+ print("Ended loading rosbag")
+
+ # temporary support old auto msgs
+ if self.is_auto_traffic_signals:
+ self.auto_traffic_signals_pub = self.create_publisher(
+ AutoTrafficSignalArray, "/perception/traffic_light_recognition/traffic_signals", 1
+ )
+ else:
+ self.traffic_signals_pub = self.create_publisher(
+ TrafficSignalArray, "/perception/traffic_light_recognition/traffic_signals", 1
+ )
+
+ # wait for ready to publish/subscribe
+ time.sleep(1.0)
+
+ def on_odom(self, odom):
+ self.ego_pose = odom.pose.pose
+
+ def load_rosbag(self, rosbag2_path: str):
+ reader = open_reader(str(rosbag2_path))
+
+ topic_types = reader.get_all_topics_and_types()
+ # Create a map for quicker lookup
+ type_map = {topic_types[i].name: topic_types[i].type for i in range(len(topic_types))}
+
+ objects_topic = (
+ "/perception/object_recognition/detection/objects"
+ if self.args.detected_object
+ else "/perception/object_recognition/tracking/objects"
+ if self.args.tracked_object
+ else "/perception/object_recognition/objects"
+ )
+ ego_odom_topic = "/localization/kinematic_state"
+ traffic_signals_topic = "/perception/traffic_light_recognition/traffic_signals"
+ topic_filter = StorageFilter(topics=[objects_topic, ego_odom_topic, traffic_signals_topic])
+ reader.set_filter(topic_filter)
+
+ while reader.has_next():
+ (topic, data, stamp) = reader.read_next()
+ msg_type = get_message(type_map[topic])
+ msg = deserialize_message(data, msg_type)
+ # import pdb; pdb.set_trace()
+ print(type(msg))
+ if topic == objects_topic:
+ self.rosbag_objects_data.append((stamp, msg))
+ if topic == ego_odom_topic:
+ self.rosbag_ego_odom_data.append((stamp, msg))
+ if topic == traffic_signals_topic:
+ self.rosbag_traffic_signals_data.append((stamp, msg))
+ self.is_auto_traffic_signals = (
+ "autoware_auto_perception_msgs" in type(msg).__module__
+ )
+
+ def kill_online_perception_node(self):
+ # kill node if required
+ kill_process_name = None
+ if self.args.detected_object:
+ kill_process_name = "dummy_perception_publisher_node"
+ elif self.args.tracked_object:
+ kill_process_name = "multi_object_tracker"
+ else:
+ kill_process_name = "map_based_prediction"
+ if kill_process_name:
+ try:
+ pid = check_output(["pidof", kill_process_name])
+ process = psutil.Process(int(pid[:-1]))
+ process.terminate()
+ except CalledProcessError:
+ pass
+
+ def binary_search(self, data, timestamp):
+ if data[-1][0] < timestamp:
+ return data[-1][1]
+ elif data[0][0] > timestamp:
+ return data[0][1]
+
+ low, high = 0, len(data) - 1
+
+ while low <= high:
+ mid = (low + high) // 2
+ if data[mid][0] < timestamp:
+ low = mid + 1
+ elif data[mid][0] > timestamp:
+ high = mid - 1
+ else:
+ return data[mid][1]
+
+ # Return the next timestamp's data if available
+ if low < len(data):
+ return data[low][1]
+ return None
+
+ def find_topics_by_timestamp(self, timestamp):
+ objects_data = self.binary_search(self.rosbag_objects_data, timestamp)
+ traffic_signals_data = self.binary_search(self.rosbag_traffic_signals_data, timestamp)
+ return objects_data, traffic_signals_data
+
+ def find_ego_odom_by_timestamp(self, timestamp):
+ return self.binary_search(self.rosbag_ego_odom_data, timestamp)
diff --git a/planning/planning_debug_tools/scripts/perception_replayer/old_reproducer/utils.py b/planning/planning_debug_tools/scripts/perception_replayer/old_reproducer/utils.py
new file mode 100644
index 0000000000000..7e8e0a18b4b7c
--- /dev/null
+++ b/planning/planning_debug_tools/scripts/perception_replayer/old_reproducer/utils.py
@@ -0,0 +1,152 @@
+#!/usr/bin/env python3
+
+# Copyright 2023 TIER IV, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import math
+import time
+
+from geometry_msgs.msg import Quaternion
+import numpy as np
+import rosbag2_py
+from sensor_msgs.msg import PointCloud2
+from sensor_msgs.msg import PointField
+from tf_transformations import euler_from_quaternion
+from tf_transformations import quaternion_from_euler
+
+
+def get_rosbag_options(path, serialization_format="cdr"):
+ storage_options = rosbag2_py.StorageOptions(uri=path, storage_id="sqlite3")
+
+ converter_options = rosbag2_py.ConverterOptions(
+ input_serialization_format=serialization_format,
+ output_serialization_format=serialization_format,
+ )
+
+ return storage_options, converter_options
+
+
+def open_reader(path: str):
+ storage_options, converter_options = get_rosbag_options(path)
+ reader = rosbag2_py.SequentialReader()
+ reader.open(storage_options, converter_options)
+ return reader
+
+
+def calc_squared_distance(p1, p2):
+ return math.sqrt((p1.x - p2.x) ** 2 + (p1.y - p2.y) ** 2)
+
+
+def create_empty_pointcloud(timestamp):
+ pointcloud_msg = PointCloud2()
+ pointcloud_msg.header.stamp = timestamp
+ pointcloud_msg.header.frame_id = "map"
+ pointcloud_msg.height = 1
+ pointcloud_msg.is_dense = True
+ pointcloud_msg.point_step = 16
+ field_name_vec = ["x", "y", "z"]
+ offset_vec = [0, 4, 8]
+ for field_name, offset in zip(field_name_vec, offset_vec):
+ field = PointField()
+ field.name = field_name
+ field.offset = offset
+ field.datatype = 7
+ field.count = 1
+ pointcloud_msg.fields.append(field)
+ return pointcloud_msg
+
+
+def get_yaw_from_quaternion(orientation):
+ orientation_list = [
+ orientation.x,
+ orientation.y,
+ orientation.z,
+ orientation.w,
+ ]
+ return euler_from_quaternion(orientation_list)[2]
+
+
+def get_quaternion_from_yaw(yaw):
+ q = quaternion_from_euler(0, 0, yaw)
+ orientation = Quaternion()
+ orientation.x = q[0]
+ orientation.y = q[1]
+ orientation.z = q[2]
+ orientation.w = q[3]
+ return orientation
+
+
+def translate_objects_coordinate(ego_pose, log_ego_pose, objects_msg):
+ log_ego_yaw = get_yaw_from_quaternion(log_ego_pose.orientation)
+ log_ego_pose_trans_mat = np.array(
+ [
+ [
+ math.cos(log_ego_yaw),
+ -math.sin(log_ego_yaw),
+ log_ego_pose.position.x,
+ ],
+ [math.sin(log_ego_yaw), math.cos(log_ego_yaw), log_ego_pose.position.y],
+ [0.0, 0.0, 1.0],
+ ]
+ )
+
+ ego_yaw = get_yaw_from_quaternion(ego_pose.orientation)
+ ego_pose_trans_mat = np.array(
+ [
+ [math.cos(ego_yaw), -math.sin(ego_yaw), ego_pose.position.x],
+ [math.sin(ego_yaw), math.cos(ego_yaw), ego_pose.position.y],
+ [0.0, 0.0, 1.0],
+ ]
+ )
+
+ for o in objects_msg.objects:
+ log_object_pose = o.kinematics.pose_with_covariance.pose
+ log_object_yaw = get_yaw_from_quaternion(log_object_pose.orientation)
+ log_object_pos_vec = np.array([log_object_pose.position.x, log_object_pose.position.y, 1.0])
+
+ # translate object pose from ego pose in log to ego pose in simulation
+ object_pos_vec = np.linalg.inv(ego_pose_trans_mat).dot(
+ log_ego_pose_trans_mat.dot(log_object_pos_vec.T)
+ )
+
+ object_pose = o.kinematics.pose_with_covariance.pose
+ object_pose.position.x = object_pos_vec[0]
+ object_pose.position.y = object_pos_vec[1]
+ object_pose.orientation = get_quaternion_from_yaw(log_object_yaw + log_ego_yaw - ego_yaw)
+
+
+class StopWatch:
+ def __init__(self, verbose):
+ # A dictionary to store the starting times
+ self.start_times = {}
+ self.verbose = verbose
+
+ def tic(self, name):
+ """Store the current time with the given name."""
+ self.start_times[name] = time.perf_counter()
+
+ def toc(self, name):
+ """Print the elapsed time since the last call to tic() with the same name."""
+ if name not in self.start_times:
+ print(f"No start time found for {name}!")
+ return
+
+ elapsed_time = (
+ time.perf_counter() - self.start_times[name]
+ ) * 1000 # Convert to milliseconds
+ if self.verbose:
+ print(f"Time for {name}: {elapsed_time: .2f} ms")
+
+ # Reset the starting time for the name
+ del self.start_times[name]
diff --git a/planning/planning_debug_tools/scripts/perception_replayer/perception_replayer_common.py b/planning/planning_debug_tools/scripts/perception_replayer/perception_replayer_common.py
index 7bf54c0278a27..dbdf1f14e5a20 100644
--- a/planning/planning_debug_tools/scripts/perception_replayer/perception_replayer_common.py
+++ b/planning/planning_debug_tools/scripts/perception_replayer/perception_replayer_common.py
@@ -22,8 +22,8 @@
from autoware_auto_perception_msgs.msg import DetectedObjects
from autoware_auto_perception_msgs.msg import PredictedObjects
from autoware_auto_perception_msgs.msg import TrackedObjects
-from autoware_auto_perception_msgs.msg import TrafficSignalArray as AutoTrafficSignalArray
from autoware_perception_msgs.msg import TrafficSignalArray
+from geometry_msgs.msg import PoseStamped
from geometry_msgs.msg import PoseWithCovarianceStamped
from nav_msgs.msg import Odometry
import psutil
@@ -32,6 +32,7 @@
from rosbag2_py import StorageFilter
from rosidl_runtime_py.utilities import get_message
from sensor_msgs.msg import PointCloud2
+from utils import get_starting_time
from utils import open_reader
@@ -40,11 +41,10 @@ def __init__(self, args, name):
super().__init__(name)
self.args = args
- self.ego_pose = None
+ self.ego_odom = None
self.rosbag_objects_data = []
self.rosbag_ego_odom_data = []
self.rosbag_traffic_signals_data = []
- self.is_auto_traffic_signals = False
# subscriber
self.sub_odom = self.create_subscription(
@@ -76,31 +76,33 @@ def __init__(self, args, name):
Odometry, "/perception_reproducer/rosbag_ego_odom", 1
)
+ self.goal_pose_publisher = self.create_publisher(
+ PoseStamped, "/planning/mission_planning/goal", 1
+ )
+
+ self.traffic_signals_pub = self.create_publisher(
+ TrafficSignalArray, "/perception/traffic_light_recognition/traffic_signals", 1
+ )
+
# load rosbag
print("Stared loading rosbag")
if os.path.isdir(args.bag):
- for bag_file in sorted(os.listdir(args.bag)):
- if bag_file.endswith(self.args.rosbag_format):
- self.load_rosbag(args.bag + "/" + bag_file)
+ bags = [
+ os.path.join(args.bag, base_name)
+ for base_name in os.listdir(args.bag)
+ if base_name.endswith(args.rosbag_format)
+ ]
+ for bag_file in sorted(bags, key=get_starting_time):
+ self.load_rosbag(bag_file)
else:
self.load_rosbag(args.bag)
print("Ended loading rosbag")
- # temporary support old auto msgs
- if self.is_auto_traffic_signals:
- self.auto_traffic_signals_pub = self.create_publisher(
- AutoTrafficSignalArray, "/perception/traffic_light_recognition/traffic_signals", 1
- )
- else:
- self.traffic_signals_pub = self.create_publisher(
- TrafficSignalArray, "/perception/traffic_light_recognition/traffic_signals", 1
- )
-
# wait for ready to publish/subscribe
time.sleep(1.0)
def on_odom(self, odom):
- self.ego_pose = odom.pose.pose
+ self.ego_odom = odom
def load_rosbag(self, rosbag2_path: str):
reader = open_reader(str(rosbag2_path))
@@ -126,14 +128,14 @@ def load_rosbag(self, rosbag2_path: str):
msg_type = get_message(type_map[topic])
msg = deserialize_message(data, msg_type)
if topic == objects_topic:
+ assert isinstance(msg, self.objects_pub.msg_type),f"Unsupported conversion from {type(msg)}"
self.rosbag_objects_data.append((stamp, msg))
if topic == ego_odom_topic:
+ assert isinstance(msg, Odometry),f"Unsupported conversion from {type(msg)}"
self.rosbag_ego_odom_data.append((stamp, msg))
if topic == traffic_signals_topic:
+ assert isinstance(msg, TrafficSignalArray),f"Unsupported conversion from {type(msg)}"
self.rosbag_traffic_signals_data.append((stamp, msg))
- self.is_auto_traffic_signals = (
- "autoware_auto_perception_msgs" in type(msg).__module__
- )
def kill_online_perception_node(self):
# kill node if required
@@ -153,6 +155,9 @@ def kill_online_perception_node(self):
pass
def binary_search(self, data, timestamp):
+ if not data:
+ return None
+
if data[-1][0] < timestamp:
return data[-1][1]
elif data[0][0] > timestamp:
@@ -180,4 +185,4 @@ def find_topics_by_timestamp(self, timestamp):
return objects_data, traffic_signals_data
def find_ego_odom_by_timestamp(self, timestamp):
- return self.binary_search(self.rosbag_ego_odom_data, timestamp)
+ return self.binary_search(self.rosbag_ego_odom_data, timestamp)
\ No newline at end of file
diff --git a/planning/planning_debug_tools/scripts/perception_replayer/perception_reproducer.py b/planning/planning_debug_tools/scripts/perception_replayer/perception_reproducer.py
index b2b6a3c0ef38e..c796ed684309f 100755
--- a/planning/planning_debug_tools/scripts/perception_replayer/perception_reproducer.py
+++ b/planning/planning_debug_tools/scripts/perception_replayer/perception_reproducer.py
@@ -15,8 +15,10 @@
# limitations under the License.
import argparse
+from collections import deque
import pickle
+from geometry_msgs.msg import PoseWithCovarianceStamped
import numpy as np
from perception_replayer_common import PerceptionReplayerCommon
import rclpy
@@ -27,16 +29,44 @@
class PerceptionReproducer(PerceptionReplayerCommon):
def __init__(self, args):
+ self.rosbag_ego_odom_search_radius = args.search_radius
+ self.ego_odom_search_radius = self.rosbag_ego_odom_search_radius
+ self.reproduce_cool_down = args.reproduce_cool_down if args.search_radius != 0.0 else 0.0
+
super().__init__(args, "perception_reproducer")
- self.prev_traffic_signals_msg = None
self.stopwatch = StopWatch(self.args.verbose) # for debug
+ # refresh cool down for setting initial pose in psim.
+ self.sub_init_pos = self.create_subscription(
+ PoseWithCovarianceStamped, "/initialpose", lambda msg: self.cool_down_indices.clear(), 1
+ )
+
# to make some data to accelerate computation
self.preprocess_data()
+ self.reproduce_sequence_indices = deque() # contains ego_odom_idx
+ self.cool_down_indices = deque() # contains ego_odom_idx
+ self.ego_odom_id2last_published_timestamp = {} # for checking last published timestamp.
+ self.last_sequenced_ego_pose = None
+
+ pose_timestamp, self.prev_ego_odom_msg = self.rosbag_ego_odom_data[0]
+ self.perv_objects_msg, self.prev_traffic_signals_msg = self.find_topics_by_timestamp(
+ pose_timestamp
+ )
+ self.memorized_original_objects_msg = (
+ self.memorized_noised_objects_msg
+ ) = self.perv_objects_msg
+
# start main timer callback
- self.timer = self.create_timer(0.1, self.on_timer)
+
+ average_ego_odom_interval = np.mean(
+ [
+ (self.rosbag_ego_odom_data[i][0] - self.rosbag_ego_odom_data[i - 1][0]) / 1e9
+ for i in range(1, len(self.rosbag_ego_odom_data))
+ ]
+ )
+ self.timer = self.create_timer(average_ego_odom_interval, self.on_timer)
# kill perception process to avoid a conflict of the perception topics
self.timer_check_perception_process = self.create_timer(3.0, self.on_timer_kill_perception)
@@ -60,84 +90,202 @@ def on_timer(self):
print("\n-- on_timer start --")
self.stopwatch.tic("total on_timer")
- timestamp = self.get_clock().now().to_msg()
+ timestamp = self.get_clock().now()
+ timestamp_msg = timestamp.to_msg()
if self.args.detected_object:
- pointcloud_msg = create_empty_pointcloud(timestamp)
+ pointcloud_msg = create_empty_pointcloud(timestamp_msg)
self.pointcloud_pub.publish(pointcloud_msg)
- if not self.ego_pose:
- print("No ego pose found.")
+ if not self.ego_odom:
+ print("No ego odom found.")
return
- # find nearest ego odom by simulation observation
- self.stopwatch.tic("find_nearest_ego_odom_by_observation")
- ego_odom = self.find_nearest_ego_odom_by_observation(self.ego_pose)
- pose_timestamp = ego_odom[0]
- log_ego_pose = ego_odom[1].pose.pose
- self.stopwatch.toc("find_nearest_ego_odom_by_observation")
+ ego_pose = self.ego_odom.pose.pose
+ dist_moved = (
+ np.sqrt(
+ (ego_pose.position.x - self.last_sequenced_ego_pose.position.x) ** 2
+ + (ego_pose.position.y - self.last_sequenced_ego_pose.position.y) ** 2
+ )
+ if self.last_sequenced_ego_pose
+ else 999
+ )
- # extract message by the nearest ego odom timestamp
- self.stopwatch.tic("find_topics_by_timestamp")
- msgs_orig = self.find_topics_by_timestamp(pose_timestamp)
- self.stopwatch.toc("find_topics_by_timestamp")
+ # Update the reproduce sequence if the distance moved is greater than the search radius.
+ if dist_moved > self.ego_odom_search_radius:
+ self.last_sequenced_ego_pose = ego_pose
- # copy the messages
- self.stopwatch.tic("message deepcopy")
- if self.args.detected_object:
- msgs = pickle.loads(pickle.dumps(msgs_orig)) # this is x5 faster than deepcopy
- objects_msg = msgs[0]
- traffic_signals_msg = msgs[1]
+ # find the nearest ego odom by simulation observation
+ self.stopwatch.tic("find_nearest_ego_odom_by_observation")
+ nearby_ego_odom_indies = self.find_nearby_ego_odom_indies(
+ [ego_pose], self.ego_odom_search_radius
+ )
+ nearby_ego_odom_indies = [
+ self.rosbag_ego_odom_data[idx][1].pose.pose for idx in nearby_ego_odom_indies
+ ]
+ if not nearby_ego_odom_indies:
+ nearest_ego_odom_ind = self.find_nearest_ego_odom_index(ego_pose)
+ nearby_ego_odom_indies += [
+ self.rosbag_ego_odom_data[nearest_ego_odom_ind][1].pose.pose
+ ]
+ self.stopwatch.toc("find_nearest_ego_odom_by_observation")
+
+ # find a list of ego odom around the nearest_ego_odom_pos.
+ self.stopwatch.tic("find_nearby_ego_odom_indies")
+ ego_odom_indices = self.find_nearby_ego_odom_indies(
+ nearby_ego_odom_indies, self.rosbag_ego_odom_search_radius
+ )
+ self.stopwatch.toc("find_nearby_ego_odom_indies")
+
+ # update reproduce_sequence with those data not in cool down list.
+ while self.cool_down_indices:
+ last_timestamp = self.ego_odom_id2last_published_timestamp[
+ self.cool_down_indices[0]
+ ]
+ if (timestamp - last_timestamp).nanoseconds / 1e9 > self.reproduce_cool_down:
+ self.cool_down_indices.popleft()
+ else:
+ break
+
+ self.stopwatch.tic("update reproduce_sequence")
+ ego_odom_indices = [
+ idx for idx in ego_odom_indices if idx not in self.cool_down_indices
+ ]
+ ego_odom_indices = sorted(ego_odom_indices)
+ self.reproduce_sequence_indices = deque(ego_odom_indices)
+ self.stopwatch.toc("update reproduce_sequence")
+
+ if self.args.verbose:
+ print("reproduce_sequence_indices: ", list(self.reproduce_sequence_indices)[:20])
+
+ # Get messages
+ repeat_flag = len(self.reproduce_sequence_indices) == 0
+
+ # Add an additional constraint to avoid publishing too fast when there is a speed gap between the ego and the rosbag's ego when ego is departing/stopping while rosbag's ego is moving.
+ if not repeat_flag:
+ ego_speed = np.sqrt(
+ self.ego_odom.twist.twist.linear.x**2 + self.ego_odom.twist.twist.linear.y**2
+ )
+ ego_odom_idx = self.reproduce_sequence_indices[0]
+ _, ego_odom_msg = self.rosbag_ego_odom_data[ego_odom_idx]
+ ego_rosbag_speed = np.sqrt(
+ ego_odom_msg.twist.twist.linear.x**2 + ego_odom_msg.twist.twist.linear.y**2
+ )
+
+ ego_rosbag_dist = np.sqrt(
+ (ego_pose.position.x - ego_odom_msg.pose.pose.position.x) ** 2
+ + (ego_pose.position.y - ego_odom_msg.pose.pose.position.y) ** 2
+ )
+ repeat_flag = ego_rosbag_speed > ego_speed * 5 and ego_rosbag_dist > 1.0
+ # set the speed threshold to many (5) times then ego_speed because this constraint is mainly for departing/stopping (ego speed is close to 0).
+
+ if not repeat_flag:
+ self.stopwatch.tic("find_topics_by_timestamp")
+ ego_odom_idx = self.reproduce_sequence_indices.popleft()
+ # extract messages by the nearest ego odom timestamp
+ pose_timestamp, ego_odom_msg = self.rosbag_ego_odom_data[ego_odom_idx]
+ objects_msg, traffic_signals_msg = self.find_topics_by_timestamp(pose_timestamp)
+ self.stopwatch.toc("find_topics_by_timestamp")
+ # update cool down info.
+ self.ego_odom_id2last_published_timestamp[ego_odom_idx] = timestamp
+ self.cool_down_indices.append(ego_odom_idx)
else:
- # NOTE: No need to deepcopy since only timestamp will be changed and it will be changed every time correctly.
- objects_msg = msgs_orig[0]
- traffic_signals_msg = msgs_orig[1]
- self.stopwatch.toc("message deepcopy")
+ ego_odom_msg = self.prev_ego_odom_msg
+ objects_msg = self.perv_objects_msg
+ traffic_signals_msg = self.prev_traffic_signals_msg
+ # Transform and publish messages.
self.stopwatch.tic("transform and publish")
+ # ego odom
+ if ego_odom_msg:
+ self.prev_ego_odom_msg = ego_odom_msg
+ self.recorded_ego_pub.publish(ego_odom_msg)
# objects
+ objects_msg = objects_msg if objects_msg else self.perv_objects_msg
if objects_msg:
- objects_msg.header.stamp = timestamp
+ self.perv_objects_msg = objects_msg
+ objects_msg.header.stamp = timestamp_msg
+
+ # add noise to repeat published objects
+ if repeat_flag and self.args.noise:
+ objects_msg = self.add_perception_noise(objects_msg)
+
if self.args.detected_object:
- translate_objects_coordinate(self.ego_pose, log_ego_pose, objects_msg)
- self.objects_pub.publish(objects_msg)
+ objects_msg = self.copy_message(objects_msg)
+ translate_objects_coordinate(ego_pose, ego_odom_msg.pose.pose, objects_msg)
- # ego odom
- self.recorded_ego_pub.publish(ego_odom[1])
+ self.objects_pub.publish(objects_msg)
# traffic signals
- # temporary support old auto msgs
+ traffic_signals_msg = (
+ traffic_signals_msg if traffic_signals_msg else self.prev_traffic_signals_msg
+ )
if traffic_signals_msg:
- if self.is_auto_traffic_signals:
- traffic_signals_msg.header.stamp = timestamp
- self.auto_traffic_signals_pub.publish(traffic_signals_msg)
- else:
- traffic_signals_msg.stamp = timestamp
- self.traffic_signals_pub.publish(traffic_signals_msg)
+ traffic_signals_msg.stamp = timestamp_msg
self.prev_traffic_signals_msg = traffic_signals_msg
- elif self.prev_traffic_signals_msg:
- if self.is_auto_traffic_signals:
- self.prev_traffic_signals_msg.header.stamp = timestamp
- self.auto_traffic_signals_pub.publish(self.prev_traffic_signals_msg)
- else:
- self.prev_traffic_signals_msg.stamp = timestamp
- self.traffic_signals_pub.publish(self.prev_traffic_signals_msg)
+ self.traffic_signals_pub.publish(traffic_signals_msg)
+
self.stopwatch.toc("transform and publish")
self.stopwatch.toc("total on_timer")
- def find_nearest_ego_odom_by_observation(self, ego_pose):
+ def find_nearest_ego_odom_index(self, ego_pose):
# nearest search with numpy format is much (~ x100) faster than regular for loop
self_pose = np.array([ego_pose.position.x, ego_pose.position.y])
dists_squared = np.sum((self.rosbag_ego_odom_data_numpy - self_pose) ** 2, axis=1)
nearest_idx = np.argmin(dists_squared)
+ return nearest_idx
+
+ def find_nearby_ego_odom_indies(self, ego_poses: list, search_radius: float):
+ ego_poses_np = np.array([[pose.position.x, pose.position.y] for pose in ego_poses])
+ dists_squared = np.sum(
+ (self.rosbag_ego_odom_data_numpy[:, None] - ego_poses_np) ** 2, axis=2
+ )
+ nearby_indices = np.where(np.any(dists_squared <= search_radius**2, axis=1))[0]
+
+ return nearby_indices
- return self.rosbag_ego_odom_data[nearest_idx]
+ def copy_message(self, msg):
+ self.stopwatch.tic("message deepcopy")
+ objects_msg_copied = pickle.loads(pickle.dumps(msg)) # this is x5 faster than deepcopy
+ self.stopwatch.toc("message deepcopy")
+ return objects_msg_copied
+
+ def add_perception_noise(
+ self, objects_msg, update_rate=0.03, x_noise_std=0.1, y_noise_std=0.05
+ ):
+ if self.memorized_original_objects_msg != objects_msg:
+ self.memorized_noised_objects_msg = self.memorized_original_objects_msg = objects_msg
+
+ if np.random.rand() < update_rate:
+ self.stopwatch.tic("add noise")
+ self.memorized_noised_objects_msg = self.copy_message(
+ self.memorized_original_objects_msg
+ )
+ for obj in self.memorized_noised_objects_msg.objects:
+ noise_x = np.random.normal(0, x_noise_std)
+ noise_y = np.random.normal(0, y_noise_std)
+ if self.args.detected_object or self.args.tracked_object:
+ obj.kinematics.pose_with_covariance.pose.position.x += noise_x
+ obj.kinematics.pose_with_covariance.pose.position.y += noise_y
+ else:
+ obj.kinematics.initial_pose_with_covariance.pose.position.x += noise_x
+ obj.kinematics.initial_pose_with_covariance.pose.position.y += noise_y
+ self.stopwatch.toc("add noise")
+
+ return self.memorized_noised_objects_msg
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-b", "--bag", help="rosbag", default=None)
+ parser.add_argument(
+ "-n",
+ "--noise",
+ help="apply perception noise to the objects when publishing repeated messages",
+ action="store_true",
+ default=True,
+ )
parser.add_argument(
"-d", "--detected-object", help="publish detected object", action="store_true"
)
@@ -150,6 +298,21 @@ def find_nearest_ego_odom_by_observation(self, ego_pose):
parser.add_argument(
"-v", "--verbose", help="output debug data", action="store_true", default=False
)
+ parser.add_argument(
+ "-r",
+ "--search-radius",
+ help="the search radius for searching rosbag's ego odom messages around the nearest ego odom pose (default is 1.5 meters), if the search radius is set to 0, it will always publish the closest message, just as the old reproducer did.",
+ type=float,
+ default=1.5,
+ )
+ parser.add_argument(
+ "-c",
+ "--reproduce-cool-down",
+ help="The cool down time for republishing published messages (default is 80.0 seconds), please make sure that it's greater than the ego's stopping time.",
+ type=float,
+ default=80.0,
+ )
+
args = parser.parse_args()
rclpy.init()
@@ -161,4 +324,4 @@ def find_nearest_ego_odom_by_observation(self, ego_pose):
pass
finally:
node.destroy_node()
- rclpy.shutdown()
+ rclpy.shutdown()
\ No newline at end of file
diff --git a/planning/planning_debug_tools/scripts/perception_replayer/utils.py b/planning/planning_debug_tools/scripts/perception_replayer/utils.py
index 7e8e0a18b4b7c..1b121f6c8efd0 100644
--- a/planning/planning_debug_tools/scripts/perception_replayer/utils.py
+++ b/planning/planning_debug_tools/scripts/perception_replayer/utils.py
@@ -26,6 +26,11 @@
from tf_transformations import quaternion_from_euler
+def get_starting_time(uri: str):
+ info = rosbag2_py.Info().read_metadata(uri, "sqlite3")
+ return info.starting_time
+
+
def get_rosbag_options(path, serialization_format="cdr"):
storage_options = rosbag2_py.StorageOptions(uri=path, storage_id="sqlite3")
@@ -146,7 +151,7 @@ def toc(self, name):
time.perf_counter() - self.start_times[name]
) * 1000 # Convert to milliseconds
if self.verbose:
- print(f"Time for {name}: {elapsed_time: .2f} ms")
+ print(f"Time for {name}: {elapsed_time:.2f} ms")
# Reset the starting time for the name
- del self.start_times[name]
+ del self.start_times[name]
\ No newline at end of file
From 74121987564b8c7671fbcd85384ccdec6652808a Mon Sep 17 00:00:00 2001
From: "temkei.kem" <1041084556@qq.com>
Date: Fri, 27 Sep 2024 18:55:49 +0900
Subject: [PATCH 2/3] move files
---
.../{old_reproducer => }/perception_replayer.py | 0
1 file changed, 0 insertions(+), 0 deletions(-)
rename planning/planning_debug_tools/scripts/perception_replayer/{old_reproducer => }/perception_replayer.py (100%)
diff --git a/planning/planning_debug_tools/scripts/perception_replayer/old_reproducer/perception_replayer.py b/planning/planning_debug_tools/scripts/perception_replayer/perception_replayer.py
similarity index 100%
rename from planning/planning_debug_tools/scripts/perception_replayer/old_reproducer/perception_replayer.py
rename to planning/planning_debug_tools/scripts/perception_replayer/perception_replayer.py
From bc0823580e9cad716ccb0fbb3028262712772a97 Mon Sep 17 00:00:00 2001
From: "temkei.kem" <1041084556@qq.com>
Date: Mon, 30 Sep 2024 12:04:05 +0900
Subject: [PATCH 3/3] remove old files.
---
.../perception_replayer_common.py | 185 ------------------
.../old_reproducer/utils.py | 152 --------------
2 files changed, 337 deletions(-)
delete mode 100644 planning/planning_debug_tools/scripts/perception_replayer/old_reproducer/perception_replayer_common.py
delete mode 100644 planning/planning_debug_tools/scripts/perception_replayer/old_reproducer/utils.py
diff --git a/planning/planning_debug_tools/scripts/perception_replayer/old_reproducer/perception_replayer_common.py b/planning/planning_debug_tools/scripts/perception_replayer/old_reproducer/perception_replayer_common.py
deleted file mode 100644
index 22a73fab5199a..0000000000000
--- a/planning/planning_debug_tools/scripts/perception_replayer/old_reproducer/perception_replayer_common.py
+++ /dev/null
@@ -1,185 +0,0 @@
-#!/usr/bin/env python3
-
-# Copyright 2023 TIER IV, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import os
-from subprocess import CalledProcessError
-from subprocess import check_output
-import time
-
-from autoware_auto_perception_msgs.msg import DetectedObjects
-from autoware_auto_perception_msgs.msg import PredictedObjects
-from autoware_auto_perception_msgs.msg import TrackedObjects
-from autoware_auto_perception_msgs.msg import TrafficSignalArray as AutoTrafficSignalArray
-from autoware_perception_msgs.msg import TrafficSignalArray
-from geometry_msgs.msg import PoseWithCovarianceStamped
-from nav_msgs.msg import Odometry
-import psutil
-from rclpy.node import Node
-from rclpy.serialization import deserialize_message
-from rosbag2_py import StorageFilter
-from rosidl_runtime_py.utilities import get_message
-from sensor_msgs.msg import PointCloud2
-from utils import open_reader
-
-
-class PerceptionReplayerCommon(Node):
- def __init__(self, args, name):
- super().__init__(name)
- self.args = args
-
- self.ego_pose = None
- self.rosbag_objects_data = []
- self.rosbag_ego_odom_data = []
- self.rosbag_traffic_signals_data = []
- self.is_auto_traffic_signals = False
-
- # subscriber
- self.sub_odom = self.create_subscription(
- Odometry, "/localization/kinematic_state", self.on_odom, 1
- )
-
- # publisher
- if self.args.detected_object:
- self.objects_pub = self.create_publisher(
- DetectedObjects, "/perception/object_recognition/detection/objects", 1
- )
- elif self.args.tracked_object:
- self.objects_pub = self.create_publisher(
- TrackedObjects, "/perception/object_recognition/tracking/objects", 1
- )
- else:
- self.objects_pub = self.create_publisher(
- PredictedObjects, "/perception/object_recognition/objects", 1
- )
-
- self.pointcloud_pub = self.create_publisher(
- PointCloud2, "/perception/obstacle_segmentation/pointcloud", 1
- )
- self.recorded_ego_pub_as_initialpose = self.create_publisher(
- PoseWithCovarianceStamped, "/initialpose", 1
- )
-
- self.recorded_ego_pub = self.create_publisher(
- Odometry, "/perception_reproducer/rosbag_ego_odom", 1
- )
-
- # load rosbag
- print("Stared loading rosbag")
- if os.path.isdir(args.bag):
- for bag_file in sorted(os.listdir(args.bag)):
- if bag_file.endswith(self.args.rosbag_format):
- self.load_rosbag(args.bag + "/" + bag_file)
- else:
- self.load_rosbag(args.bag)
- print("Ended loading rosbag")
-
- # temporary support old auto msgs
- if self.is_auto_traffic_signals:
- self.auto_traffic_signals_pub = self.create_publisher(
- AutoTrafficSignalArray, "/perception/traffic_light_recognition/traffic_signals", 1
- )
- else:
- self.traffic_signals_pub = self.create_publisher(
- TrafficSignalArray, "/perception/traffic_light_recognition/traffic_signals", 1
- )
-
- # wait for ready to publish/subscribe
- time.sleep(1.0)
-
- def on_odom(self, odom):
- self.ego_pose = odom.pose.pose
-
- def load_rosbag(self, rosbag2_path: str):
- reader = open_reader(str(rosbag2_path))
-
- topic_types = reader.get_all_topics_and_types()
- # Create a map for quicker lookup
- type_map = {topic_types[i].name: topic_types[i].type for i in range(len(topic_types))}
-
- objects_topic = (
- "/perception/object_recognition/detection/objects"
- if self.args.detected_object
- else "/perception/object_recognition/tracking/objects"
- if self.args.tracked_object
- else "/perception/object_recognition/objects"
- )
- ego_odom_topic = "/localization/kinematic_state"
- traffic_signals_topic = "/perception/traffic_light_recognition/traffic_signals"
- topic_filter = StorageFilter(topics=[objects_topic, ego_odom_topic, traffic_signals_topic])
- reader.set_filter(topic_filter)
-
- while reader.has_next():
- (topic, data, stamp) = reader.read_next()
- msg_type = get_message(type_map[topic])
- msg = deserialize_message(data, msg_type)
- # import pdb; pdb.set_trace()
- print(type(msg))
- if topic == objects_topic:
- self.rosbag_objects_data.append((stamp, msg))
- if topic == ego_odom_topic:
- self.rosbag_ego_odom_data.append((stamp, msg))
- if topic == traffic_signals_topic:
- self.rosbag_traffic_signals_data.append((stamp, msg))
- self.is_auto_traffic_signals = (
- "autoware_auto_perception_msgs" in type(msg).__module__
- )
-
- def kill_online_perception_node(self):
- # kill node if required
- kill_process_name = None
- if self.args.detected_object:
- kill_process_name = "dummy_perception_publisher_node"
- elif self.args.tracked_object:
- kill_process_name = "multi_object_tracker"
- else:
- kill_process_name = "map_based_prediction"
- if kill_process_name:
- try:
- pid = check_output(["pidof", kill_process_name])
- process = psutil.Process(int(pid[:-1]))
- process.terminate()
- except CalledProcessError:
- pass
-
- def binary_search(self, data, timestamp):
- if data[-1][0] < timestamp:
- return data[-1][1]
- elif data[0][0] > timestamp:
- return data[0][1]
-
- low, high = 0, len(data) - 1
-
- while low <= high:
- mid = (low + high) // 2
- if data[mid][0] < timestamp:
- low = mid + 1
- elif data[mid][0] > timestamp:
- high = mid - 1
- else:
- return data[mid][1]
-
- # Return the next timestamp's data if available
- if low < len(data):
- return data[low][1]
- return None
-
- def find_topics_by_timestamp(self, timestamp):
- objects_data = self.binary_search(self.rosbag_objects_data, timestamp)
- traffic_signals_data = self.binary_search(self.rosbag_traffic_signals_data, timestamp)
- return objects_data, traffic_signals_data
-
- def find_ego_odom_by_timestamp(self, timestamp):
- return self.binary_search(self.rosbag_ego_odom_data, timestamp)
diff --git a/planning/planning_debug_tools/scripts/perception_replayer/old_reproducer/utils.py b/planning/planning_debug_tools/scripts/perception_replayer/old_reproducer/utils.py
deleted file mode 100644
index 7e8e0a18b4b7c..0000000000000
--- a/planning/planning_debug_tools/scripts/perception_replayer/old_reproducer/utils.py
+++ /dev/null
@@ -1,152 +0,0 @@
-#!/usr/bin/env python3
-
-# Copyright 2023 TIER IV, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import math
-import time
-
-from geometry_msgs.msg import Quaternion
-import numpy as np
-import rosbag2_py
-from sensor_msgs.msg import PointCloud2
-from sensor_msgs.msg import PointField
-from tf_transformations import euler_from_quaternion
-from tf_transformations import quaternion_from_euler
-
-
-def get_rosbag_options(path, serialization_format="cdr"):
- storage_options = rosbag2_py.StorageOptions(uri=path, storage_id="sqlite3")
-
- converter_options = rosbag2_py.ConverterOptions(
- input_serialization_format=serialization_format,
- output_serialization_format=serialization_format,
- )
-
- return storage_options, converter_options
-
-
-def open_reader(path: str):
- storage_options, converter_options = get_rosbag_options(path)
- reader = rosbag2_py.SequentialReader()
- reader.open(storage_options, converter_options)
- return reader
-
-
-def calc_squared_distance(p1, p2):
- return math.sqrt((p1.x - p2.x) ** 2 + (p1.y - p2.y) ** 2)
-
-
-def create_empty_pointcloud(timestamp):
- pointcloud_msg = PointCloud2()
- pointcloud_msg.header.stamp = timestamp
- pointcloud_msg.header.frame_id = "map"
- pointcloud_msg.height = 1
- pointcloud_msg.is_dense = True
- pointcloud_msg.point_step = 16
- field_name_vec = ["x", "y", "z"]
- offset_vec = [0, 4, 8]
- for field_name, offset in zip(field_name_vec, offset_vec):
- field = PointField()
- field.name = field_name
- field.offset = offset
- field.datatype = 7
- field.count = 1
- pointcloud_msg.fields.append(field)
- return pointcloud_msg
-
-
-def get_yaw_from_quaternion(orientation):
- orientation_list = [
- orientation.x,
- orientation.y,
- orientation.z,
- orientation.w,
- ]
- return euler_from_quaternion(orientation_list)[2]
-
-
-def get_quaternion_from_yaw(yaw):
- q = quaternion_from_euler(0, 0, yaw)
- orientation = Quaternion()
- orientation.x = q[0]
- orientation.y = q[1]
- orientation.z = q[2]
- orientation.w = q[3]
- return orientation
-
-
-def translate_objects_coordinate(ego_pose, log_ego_pose, objects_msg):
- log_ego_yaw = get_yaw_from_quaternion(log_ego_pose.orientation)
- log_ego_pose_trans_mat = np.array(
- [
- [
- math.cos(log_ego_yaw),
- -math.sin(log_ego_yaw),
- log_ego_pose.position.x,
- ],
- [math.sin(log_ego_yaw), math.cos(log_ego_yaw), log_ego_pose.position.y],
- [0.0, 0.0, 1.0],
- ]
- )
-
- ego_yaw = get_yaw_from_quaternion(ego_pose.orientation)
- ego_pose_trans_mat = np.array(
- [
- [math.cos(ego_yaw), -math.sin(ego_yaw), ego_pose.position.x],
- [math.sin(ego_yaw), math.cos(ego_yaw), ego_pose.position.y],
- [0.0, 0.0, 1.0],
- ]
- )
-
- for o in objects_msg.objects:
- log_object_pose = o.kinematics.pose_with_covariance.pose
- log_object_yaw = get_yaw_from_quaternion(log_object_pose.orientation)
- log_object_pos_vec = np.array([log_object_pose.position.x, log_object_pose.position.y, 1.0])
-
- # translate object pose from ego pose in log to ego pose in simulation
- object_pos_vec = np.linalg.inv(ego_pose_trans_mat).dot(
- log_ego_pose_trans_mat.dot(log_object_pos_vec.T)
- )
-
- object_pose = o.kinematics.pose_with_covariance.pose
- object_pose.position.x = object_pos_vec[0]
- object_pose.position.y = object_pos_vec[1]
- object_pose.orientation = get_quaternion_from_yaw(log_object_yaw + log_ego_yaw - ego_yaw)
-
-
-class StopWatch:
- def __init__(self, verbose):
- # A dictionary to store the starting times
- self.start_times = {}
- self.verbose = verbose
-
- def tic(self, name):
- """Store the current time with the given name."""
- self.start_times[name] = time.perf_counter()
-
- def toc(self, name):
- """Print the elapsed time since the last call to tic() with the same name."""
- if name not in self.start_times:
- print(f"No start time found for {name}!")
- return
-
- elapsed_time = (
- time.perf_counter() - self.start_times[name]
- ) * 1000 # Convert to milliseconds
- if self.verbose:
- print(f"Time for {name}: {elapsed_time: .2f} ms")
-
- # Reset the starting time for the name
- del self.start_times[name]