From 6a389b15b736f3074a1cd71cd548ff1cfba7143d Mon Sep 17 00:00:00 2001 From: adaickalavan Date: Mon, 29 May 2023 16:07:33 -0400 Subject: [PATCH 1/4] Fix dist to dest. --- docs/examples/drive.rst | 6 + docs/examples/rl_model.rst | 1 + docs/spelling_wordlist.txt | 4 +- examples/rl/drive/train/reward.py | 3 - .../v2023/metric_formula_drive.py | 4 + smarts/core/plan.py | 35 ++-- smarts/core/sensor.py | 4 - smarts/env/gymnasium/wrappers/metric/costs.py | 150 ++++++++++++++---- .../env/gymnasium/wrappers/metric/metrics.py | 51 +++--- 9 files changed, 191 insertions(+), 67 deletions(-) create mode 100644 docs/examples/drive.rst diff --git a/docs/examples/drive.rst b/docs/examples/drive.rst new file mode 100644 index 0000000000..98f53b54c2 --- /dev/null +++ b/docs/examples/drive.rst @@ -0,0 +1,6 @@ +.. _drive: + +Drive +===== + +This example was developed in conjunction with the :ref:`Driving SMARTS 2023.1 & 2023.2 ` benchmark, hence refer to it for details. diff --git a/docs/examples/rl_model.rst b/docs/examples/rl_model.rst index 2bf218dd22..a0c5646326 100644 --- a/docs/examples/rl_model.rst +++ b/docs/examples/rl_model.rst @@ -6,4 +6,5 @@ RL Model .. toctree:: :maxdepth: 1 + drive.rst platoon.rst \ No newline at end of file diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index ffeaf749cb..6764359c4b 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -1,3 +1,4 @@ +θ π Δx Δy @@ -44,9 +45,10 @@ laner lidar lookahead natively +ndarray +nomask np numpy -ndarray param pre quaternion diff --git a/examples/rl/drive/train/reward.py b/examples/rl/drive/train/reward.py index 04aec0026a..cee2825e26 100644 --- a/examples/rl/drive/train/reward.py +++ b/examples/rl/drive/train/reward.py @@ -33,9 +33,6 @@ def step(self, action): if terminated[agent_id] == True: if agent_obs["events"]["reached_goal"]: print(f"{agent_id}: Hooray! Reached goal.") - raise Exception( - f"{agent_id}: Goal has been leaked to the ego agent!" - ) elif agent_obs["events"]["reached_max_episode_steps"]: print(f"{agent_id}: Reached max episode steps.") elif ( diff --git a/smarts/benchmark/driving_smarts/v2023/metric_formula_drive.py b/smarts/benchmark/driving_smarts/v2023/metric_formula_drive.py index 2feb9b0de5..8c06a81b01 100644 --- a/smarts/benchmark/driving_smarts/v2023/metric_formula_drive.py +++ b/smarts/benchmark/driving_smarts/v2023/metric_formula_drive.py @@ -28,6 +28,7 @@ from smarts.env.gymnasium.wrappers.metric.formula import FormulaBase, Score, avg_costs from smarts.env.gymnasium.wrappers.metric.params import ( Comfort, + DistToDestination, DistToObstacles, JerkLinear, Params, @@ -53,6 +54,9 @@ def params(self) -> Params: comfort=Comfort( active=True, ), + dist_to_destination=DistToDestination( + active=True, + ), dist_to_obstacles=DistToObstacles( active=False, ), diff --git a/smarts/core/plan.py b/smarts/core/plan.py index e4b96ed302..255d49180b 100644 --- a/smarts/core/plan.py +++ b/smarts/core/plan.py @@ -338,16 +338,25 @@ def road_map(self) -> RoadMap: """The road map this plan is relative to.""" return self._road_map - def create_route(self, mission: Mission, radius: Optional[float] = None): + def create_route( + self, + mission: Mission, + start_lane_radius: Optional[float] = None, + end_lane_radius: Optional[float] = None, + ): """Generates a route that conforms to a mission. Args: mission (Mission): A mission the agent should follow. Defaults to endless if `None`. - radius (Optional[float]): + start_lane_radius (Optional[float]): Radius (meter) to find the nearest starting lane for the given - mission. Defaults to `_default_lane_width` of the underlying - road_map. + mission. Defaults to a function of `_default_lane_width` of the + underlying road_map. + end_lane_radius (Optional[float]): + Radius (meter) to find the nearest ending lane for the given + mission. Defaults to a function of `_default_lane_width` of the + underlying road_map. """ assert not self._route or not len( self._route.road_ids @@ -363,7 +372,7 @@ def create_route(self, mission: Mission, radius: Optional[float] = None): start_lanes = self._road_map.nearest_lanes( self._mission.start.point, include_junctions=True, - radius=radius, + radius=start_lane_radius, ) if not start_lanes: self._mission = Mission.endless_mission(Pose.origin()) @@ -371,20 +380,24 @@ def create_route(self, mission: Mission, radius: Optional[float] = None): via_roads = [self._road_map.road_by_id(via) for via in self._mission.route_vias] - end_lane = self._road_map.nearest_lane( + end_lanes = self._road_map.nearest_lanes( self._mission.goal.position, include_junctions=False, + radius=end_lane_radius, ) - assert end_lane is not None, "route must end in a lane" + assert end_lanes is not None, "No end lane found. Route must end in a lane." # When an agent is in an intersection, the `nearest_lanes` method might # not return the correct road as the first choice. Hence, nearest # starting lanes are tried in sequence until a route is found or until # all nearby starting lane options are exhausted. - for start_lane, _ in start_lanes: - self._route = self._road_map.generate_routes( - start_lane, end_lane, via_roads, 1 - )[0] + for end_lane, _ in end_lanes: + for start_lane, _ in start_lanes: + self._route = self._road_map.generate_routes( + start_lane, end_lane, via_roads, 1 + )[0] + if self._route.road_length > 0: + break if self._route.road_length > 0: break diff --git a/smarts/core/sensor.py b/smarts/core/sensor.py index d1e7070358..6e29be5946 100644 --- a/smarts/core/sensor.py +++ b/smarts/core/sensor.py @@ -34,16 +34,12 @@ from smarts.core.masks import RenderMasks from smarts.core.observations import ( DrivableAreaGridMap, - EgoVehicleObservation, GridMapMetadata, - Observation, OccupancyGridMap, RoadWaypoints, SignalObservation, TopDownRGB, - VehicleObservation, ViaPoint, - Vias, ) from smarts.core.plan import Plan from smarts.core.renderer_base import RendererBase diff --git a/smarts/env/gymnasium/wrappers/metric/costs.py b/smarts/env/gymnasium/wrappers/metric/costs.py index 14443f9282..01c5bb1ef7 100644 --- a/smarts/env/gymnasium/wrappers/metric/costs.py +++ b/smarts/env/gymnasium/wrappers/metric/costs.py @@ -22,11 +22,11 @@ import warnings from collections import deque from dataclasses import dataclass -from typing import Callable, Dict, List, NewType +from typing import Callable, Dict, List, NewType, Optional, Tuple import numpy as np -from smarts.core.coordinates import Heading, Point +from smarts.core.coordinates import Heading, Point, RefLinePoint from smarts.core.observations import Observation from smarts.core.plan import Mission, Plan, PositionalGoal, Start from smarts.core.road_map import RoadMap @@ -108,28 +108,91 @@ def func( def _dist_to_destination( - end_pos: Point, dist_tot: float + end_pos: Point, + dist_tot: float, + route: RoadMap.Route, + prev_route_lane: RoadMap.Lane, + prev_route_lane_point: Point, + prev_route_displacement: float, ) -> Callable[[RoadMap, VehicleIndex, Done, Observation], Costs]: mean = 0 step = 0 end_pos = end_pos dist_tot = dist_tot + route = route + prev_route_lane = prev_route_lane + prev_route_lane_point = prev_route_lane_point + prev_route_displacement = prev_route_displacement + prev_dist_travelled = 0 + tot_dist_travelled = 0 def func( road_map: RoadMap, vehicle_index: VehicleIndex, done: Done, obs: Observation ) -> Costs: - nonlocal mean, step, end_pos, dist_tot + nonlocal mean, step, end_pos, dist_tot, route, prev_route_lane, prev_route_lane_point, prev_route_displacement, prev_dist_travelled, tot_dist_travelled + + tot_dist_travelled += obs.distance_travelled if not done: + cur_pos = Point(*obs.ego_vehicle_state.position) + cur_on_route, cur_route_lane, cur_route_lane_point, cur_route_displacement = on_route( + road_map=road_map, route=route, pos=cur_pos + ) + + if cur_on_route: + prev_route_lane = cur_route_lane + prev_route_lane_point = cur_route_lane_point + prev_route_displacement = cur_route_displacement + prev_dist_travelled = tot_dist_travelled + return Costs(dist_to_destination=-np.inf) elif obs.events.reached_goal: return Costs(dist_to_destination=0) else: cur_pos = Point(*obs.ego_vehicle_state.position) - dist_remainder = get_dist( - road_map=road_map, point_a=cur_pos, point_b=end_pos + cur_on_route, cur_route_lane, cur_route_lane_point, cur_route_displacement = on_route( + road_map=road_map, route=route, pos=cur_pos + ) + + # Step 1: Compute the last off-route distance driven by the vehicle, if any + if not cur_on_route: + off_route_dist = tot_dist_travelled - prev_dist_travelled + assert off_route_dist >= 0 + off_route_dist += prev_route_displacement + last_route_lane = prev_route_lane + last_route_pos = prev_route_lane_point + else: + off_route_dist = cur_route_displacement + last_route_lane = cur_route_lane + last_route_pos = cur_route_lane_point + + # Step 2: Compute the remaining route distance from the last recorded on-route position + on_route_dist = route.distance_between( + start=RoadMap.Route.RoutePoint(pt=last_route_pos), + end=RoadMap.Route.RoutePoint(pt=end_pos), ) + + # Step 3: Remaining `on_route_dist` can become negative when agent overshoots the end position while + # remaining outside the goal capture radius at all times. + on_route_dist = abs(on_route_dist) + + # Step 4: Compute lane error penalty if vehicle is in the same road as goal, but in a different lane. + # TODO: Lane error penalty is not computed because the end lane of a SUMO traffic vehicle of interest + # is currently not easily accessible. + lane_error_dist = 0 + # end_lane = route.end_lane + # if last_route_lane.road == end_lane.road: + # lane_error = abs(last_route_lane.index - end_lane.index) + # end_offset = end_lane.offset_along_lane(world_point=end_pos) + # lane_width, _ = end_lane.width_at_offset(end_offset) + # lane_error_dist = lane_error * lane_width + + # Step 5: Total distance to destination + dist_remainder = off_route_dist + on_route_dist + lane_error_dist + + # Step 6: Cap distance to destination dist_remainder_capped = min(dist_remainder, dist_tot) + return Costs(dist_to_destination=dist_remainder_capped / dist_tot) return func @@ -484,7 +547,7 @@ class CostFuncsBase: # fmt: off collisions: Callable[[], Callable[[RoadMap, VehicleIndex, Done, Observation], Costs]] = _collisions comfort: Callable[[], Callable[[RoadMap, VehicleIndex, Done, Observation], Costs]] = _comfort - dist_to_destination: Callable[[Point,float], Callable[[RoadMap, VehicleIndex, Done, Observation], Costs]] = _dist_to_destination + dist_to_destination: Callable[[Point,float,RoadMap.Route,RoadMap.Lane,Point,float], Callable[[RoadMap, VehicleIndex, Done, Observation], Costs]] = _dist_to_destination dist_to_obstacles: Callable[[List[str]], Callable[[RoadMap, VehicleIndex, Done, Observation], Costs]] = _dist_to_obstacles jerk_linear: Callable[[], Callable[[RoadMap, VehicleIndex, Done, Observation], Costs]] = _jerk_linear lane_center_offset: Callable[[], Callable[[RoadMap, VehicleIndex, Done, Observation], Costs]] = _lane_center_offset @@ -531,7 +594,9 @@ class CostError(Exception): pass -def get_dist(road_map: RoadMap, point_a: Point, point_b: Point) -> float: +def get_dist( + road_map: RoadMap, point_a: Point, point_b: Point, tolerate: bool = False +) -> Tuple[float, RoadMap.Route]: """ Computes the shortest route distance from point_a to point_b in the road map. Both points should lie on a road in the road map. Key assumption about @@ -542,9 +607,13 @@ def get_dist(road_map: RoadMap, point_a: Point, point_b: Point) -> float: road_map: Scenario road map. point_a: A point, in world-map coordinates, which lies on a road. point_b: A point, in world-map coordinates, which lies on a road. + tolerate: If False, raises an error when distance is negative due to + route being computed in reverse direction from point_b to point_a. + Defaults to False. Returns: float: Shortest road distance between two points in the road map. + RoadMap.Route: Planned route between point_a and point_b. """ mission = Mission( @@ -559,7 +628,7 @@ def get_dist(road_map: RoadMap, point_a: Point, point_b: Point) -> float: ), ) plan = Plan(road_map=road_map, mission=mission, find_route=False) - plan.create_route(mission=mission, radius=20) + plan.create_route(mission=mission, start_lane_radius=3, end_lane_radius=0.5) assert isinstance(plan.route, RoadMap.Route) from_route_point = RoadMap.Route.RoutePoint(pt=point_a) to_route_point = RoadMap.Route.RoutePoint(pt=point_b) @@ -567,22 +636,47 @@ def get_dist(road_map: RoadMap, point_a: Point, point_b: Point) -> float: dist_tot = plan.route.distance_between(start=from_route_point, end=to_route_point) if dist_tot == None: raise CostError("Unable to find road on route near given points.") - elif dist_tot < 0: - # This happens when agent overshoots the goal position while - # remaining outside the goal capture radius at all times. Default - # positional goal radius is 2m. - dist_tot = abs(dist_tot) - - # Account for agent ending in a different lane but in the same road as - # the goal position. - start_lane = plan.route.start_lane - end_lane = plan.route.end_lane - lane_error = abs(start_lane.index - end_lane.index) - if len(plan.route.roads) == 1 and lane_error > 0: - assert start_lane.road == end_lane.road - end_offset = end_lane.offset_along_lane(world_point=point_b) - lane_width, _ = end_lane.width_at_offset(end_offset) - lane_error_dist = lane_error * lane_width - dist_tot += lane_error_dist - - return dist_tot + elif dist_tot < 0 and not tolerate: + raise CostError( + "Route computed in reverse direction from point_b to " + f"point_a resulting in negative distance: {dist_tot}." + ) + + return dist_tot, plan.route + + +def on_route( + road_map: RoadMap, route: RoadMap.Route, pos: Point, radius: float = 7 +) -> Tuple[bool, Optional[RoadMap.Lane], Optional[Point], Optional[float]]: + """ + Computes whether point `pos` is within the search `radius` distance from + any lane in the `route`. + + Args: + road_map (RoadMap): Road map. + route (RoadMap.Route): Route consisting of a set of roads. + pos (smarts.core.coordinates.Point): A world-coordinate point. + radius (float): Search radius. + + Returns: + Tuple[bool, Optional[RoadMap.Lane], Optional[smarts.core.coordinates.Point], Optional[float]]: + True if `pos` is nearby any road in `route`, else False. If true, + additionally returns the (i) nearest lane in route, (ii) its + nearest lane center point, and (iii) displacement between + `pos` and lane center point. + """ + lanes = road_map.nearest_lanes( + point=pos, + radius=radius, + include_junctions=True, + ) + + route_roads = route.roads + for lane, _ in lanes: + if lane.road in route_roads: + offset = lane.offset_along_lane(world_point=pos) + lane_point = lane.from_lane_coord(RefLinePoint(s=offset)) + displacement = np.linalg.norm(lane_point.as_np_array - pos.as_np_array) + return True, lane, lane_point, displacement + + return False, None, None, None diff --git a/smarts/env/gymnasium/wrappers/metric/metrics.py b/smarts/env/gymnasium/wrappers/metric/metrics.py index 22999a8add..aeca6bbef1 100644 --- a/smarts/env/gymnasium/wrappers/metric/metrics.py +++ b/smarts/env/gymnasium/wrappers/metric/metrics.py @@ -42,6 +42,7 @@ Done, get_dist, make_cost_funcs, + on_route, ) from smarts.env.gymnasium.wrappers.metric.formula import FormulaBase, Score from smarts.env.gymnasium.wrappers.metric.params import Params @@ -205,20 +206,16 @@ def reset(self, **kwargs): for agent_name in self._cur_agents: cost_funcs_kwargs = {} if self._params.dist_to_destination.active: - interest_criteria = self.env.agent_interfaces[ - agent_name - ].done_criteria.interest + interest_criteria = self.env.agent_interfaces[agent_name].done_criteria.interest if interest_criteria == None: end_pos = self._scen.missions[agent_name].goal.position - dist_tot = get_dist( + dist_tot, route = get_dist( road_map=self._road_map, point_a=Point(*self._scen.missions[agent_name].start.position), point_b=end_pos, ) - elif isinstance(interest_criteria, InterestDoneCriteria) and ( - interest_actor is not None - ): - end_pos, dist_tot = _get_end_and_dist( + elif isinstance(interest_criteria, InterestDoneCriteria) and (interest_actor is not None): + end_pos, dist_tot, route = _get_end_and_dist( interest_actor=interest_actor, vehicle_index=self._vehicle_index, traffic_sims=self.env.smarts.traffic_sims, @@ -237,8 +234,22 @@ def reset(self, **kwargs): raise MetricsError( "Unsupported configuration for distance-to-destination cost function." ) + start_pos = Point(*self._scen.missions[agent_name].start.position) + cur_on_route, cur_route_lane, cur_route_lane_point, cur_route_displacement = on_route( + road_map=self._road_map, route=route, pos=start_pos + ) + assert cur_on_route, f"{agent_name} does not start nearby the desired route." cost_funcs_kwargs.update( - {"dist_to_destination": {"end_pos": end_pos, "dist_tot": dist_tot}} + { + "dist_to_destination": { + "end_pos": end_pos, + "dist_tot": dist_tot, + "route": route, + "prev_route_lane": cur_route_lane, + "prev_route_lane_point": cur_route_lane_point, + "prev_route_displacement": cur_route_displacement, + } + } ) cost_funcs_kwargs.update( @@ -324,7 +335,7 @@ def _get_end_and_dist( traffic_sims: List[TrafficProvider], scenario: Scenario, road_map: RoadMap, -) -> Tuple[Point, float]: +) -> Tuple[Point, float, RoadMap.Route]: """Computes the end point and route distance for a given vehicle of interest. Args: @@ -335,7 +346,7 @@ def _get_end_and_dist( road_map (RoadMap): Underlying road map. Returns: - Tuple[Point, float]: End point and route distance. + Tuple[Point, float, RoadMap.Route]: End point, route distance, and planned route. """ # Check if the interest vehicle is a social agent. interest_social_missions = [ @@ -358,21 +369,21 @@ def _get_end_and_dist( goal = interest_social_mission.goal assert isinstance(goal, PositionalGoal) end_pos = goal.position - dist_tot = get_dist( + dist_tot, route = get_dist( road_map=road_map, point_a=Point(*interest_social_mission.start.position), point_b=end_pos, ) else: interest_traffic_sim = interest_traffic_sims[0] - end_pos, dist_tot = _get_traffic_end_and_dist( + end_pos, dist_tot, route = _get_traffic_end_and_dist( vehicle_name=interest_actor, vehicle_index=vehicle_index, traffic_sim=interest_traffic_sim, road_map=road_map, ) - return end_pos, dist_tot + return end_pos, dist_tot, route def _get_traffic_end_and_dist( @@ -380,7 +391,7 @@ def _get_traffic_end_and_dist( vehicle_index: VehicleIndex, traffic_sim: TrafficProvider, road_map: RoadMap, -) -> Tuple[Point, float]: +) -> Tuple[Point, float, RoadMap.Route]: """Computes the end point and route distance of a (i) SUMO traffic, (ii) SMARTS traffic, or (iii) history traffic vehicle specified by `vehicle_name`. @@ -392,7 +403,7 @@ def _get_traffic_end_and_dist( road_map (RoadMap): Underlying road map. Returns: - Tuple[Point, float]: End point and route distance. + Tuple[Point, float, RoadMap.Route]: End point, route distance, and planned route. """ if isinstance(traffic_sim, (SumoTrafficSimulation, LocalTrafficProvider)): @@ -403,8 +414,8 @@ def _get_traffic_end_and_dist( .lane_at_index(0) .from_lane_coord(RefLinePoint(s=np.inf)) ) - dist_tot = get_dist(road_map=road_map, point_a=start_pos, point_b=end_pos) - return end_pos, dist_tot + dist_tot, route = get_dist(road_map=road_map, point_a=start_pos, point_b=end_pos) + return end_pos, dist_tot, route elif isinstance(traffic_sim, TrafficHistoryProvider): history = traffic_sim.vehicle_history_window(vehicle_id=vehicle_name) start_pos = Point(x=history.start_position_x, y=history.start_position_y) @@ -414,8 +425,8 @@ def _get_traffic_end_and_dist( # roads traversed by the history vehicle in complex maps. Ideally we # should use the actual road ids traversed by the history vehicle to # compute the distance. - dist_tot = get_dist(road_map=road_map, point_a=start_pos, point_b=end_pos) - return end_pos, dist_tot + dist_tot, route = get_dist(road_map=road_map, point_a=start_pos, point_b=end_pos) + return end_pos, dist_tot, route else: raise MetricsError(f"Unsupported traffic provider {traffic_sim.source_str}.") From 394157f78e0f7d9702ac550cb39cd138a0bd53f3 Mon Sep 17 00:00:00 2001 From: adaickalavan Date: Mon, 29 May 2023 16:47:26 -0400 Subject: [PATCH 2/4] Add changelog. --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a32ddcbe6..972229beab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,7 +31,7 @@ Copy and pasting the git commit messages is __NOT__ enough. - `info` returned by `hiway-v1` in `reset()` and `step()` methods are unified. - Changed instances of `hiway-v0` and `gym` to use `hiway-v1` and `gymnasium`, respectively. - `RoadMap.Route` now optionally stores the start and end lanes of the route. -- `DistToDestination` metric now adds lane error penalty when agent terminates in different lane but same road as the goal position. +- `DistToDestination` metric is now computed by summing the (i) off-route distance driven by the vehicle from its last on-route position, and (ii) the distance to goal from the vehicle's last on-route position. ### Deprecated - `visdom` is set to be removed from the SMARTS object parameters. - Deprecated `start_time` on missions. From b7cb7fcda095d4f1941f4e6512fd21cfaa0d1f54 Mon Sep 17 00:00:00 2001 From: adaickalavan Date: Tue, 30 May 2023 08:56:50 -0400 Subject: [PATCH 3/4] Address review. --- smarts/env/gymnasium/wrappers/metric/costs.py | 38 ++++++++++--------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/smarts/env/gymnasium/wrappers/metric/costs.py b/smarts/env/gymnasium/wrappers/metric/costs.py index 01c5bb1ef7..575fe579d8 100644 --- a/smarts/env/gymnasium/wrappers/metric/costs.py +++ b/smarts/env/gymnasium/wrappers/metric/costs.py @@ -154,7 +154,7 @@ def func( road_map=road_map, route=route, pos=cur_pos ) - # Step 1: Compute the last off-route distance driven by the vehicle, if any + # Step 1: Compute the last off-route distance driven by the vehicle, if any. if not cur_on_route: off_route_dist = tot_dist_travelled - prev_dist_travelled assert off_route_dist >= 0 @@ -166,19 +166,21 @@ def func( last_route_lane = cur_route_lane last_route_pos = cur_route_lane_point - # Step 2: Compute the remaining route distance from the last recorded on-route position + # Step 2: Compute the remaining route distance from the last recorded on-route position. on_route_dist = route.distance_between( start=RoadMap.Route.RoutePoint(pt=last_route_pos), end=RoadMap.Route.RoutePoint(pt=end_pos), ) - # Step 3: Remaining `on_route_dist` can become negative when agent overshoots the end position while + # Step 3: Compute absolute `on_route_dist` because it could be + # negative when an agent overshoots the end position while # remaining outside the goal capture radius at all times. on_route_dist = abs(on_route_dist) # Step 4: Compute lane error penalty if vehicle is in the same road as goal, but in a different lane. - # TODO: Lane error penalty is not computed because the end lane of a SUMO traffic vehicle of interest - # is currently not easily accessible. + # TODO: Lane error penalty should be computed. It is not computed + # currently because the end lane of a SUMO traffic vehicle of + # interest is currently not accessible. lane_error_dist = 0 # end_lane = route.end_lane # if last_route_lane.road == end_lane.road: @@ -187,10 +189,10 @@ def func( # lane_width, _ = end_lane.width_at_offset(end_offset) # lane_error_dist = lane_error * lane_width - # Step 5: Total distance to destination + # Step 5: Total distance to destination. dist_remainder = off_route_dist + on_route_dist + lane_error_dist - # Step 6: Cap distance to destination + # Step 6: Cap distance to destination. dist_remainder_capped = min(dist_remainder, dist_tot) return Costs(dist_to_destination=dist_remainder_capped / dist_tot) @@ -646,27 +648,27 @@ def get_dist( def on_route( - road_map: RoadMap, route: RoadMap.Route, pos: Point, radius: float = 7 + road_map: RoadMap, route: RoadMap.Route, point: Point, radius: float = 7 ) -> Tuple[bool, Optional[RoadMap.Lane], Optional[Point], Optional[float]]: """ - Computes whether point `pos` is within the search `radius` distance from - any lane in the `route`. + Computes whether `point` is within the search `radius` distance from any + lane in the `route`. Args: road_map (RoadMap): Road map. route (RoadMap.Route): Route consisting of a set of roads. - pos (smarts.core.coordinates.Point): A world-coordinate point. + point (smarts.core.coordinates.Point): A world-coordinate point. radius (float): Search radius. Returns: Tuple[bool, Optional[RoadMap.Lane], Optional[smarts.core.coordinates.Point], Optional[float]]: - True if `pos` is nearby any road in `route`, else False. If true, - additionally returns the (i) nearest lane in route, (ii) its - nearest lane center point, and (iii) displacement between - `pos` and lane center point. + True if `point` is nearby any road in `route`, else False. If true, + additionally returns the (i) nearest lane in route, (ii) its + nearest lane center point, and (iii) displacement between `point` + and lane center point. """ lanes = road_map.nearest_lanes( - point=pos, + point=point, radius=radius, include_junctions=True, ) @@ -674,9 +676,9 @@ def on_route( route_roads = route.roads for lane, _ in lanes: if lane.road in route_roads: - offset = lane.offset_along_lane(world_point=pos) + offset = lane.offset_along_lane(world_point=point) lane_point = lane.from_lane_coord(RefLinePoint(s=offset)) - displacement = np.linalg.norm(lane_point.as_np_array - pos.as_np_array) + displacement = np.linalg.norm(lane_point.as_np_array - point.as_np_array) return True, lane, lane_point, displacement return False, None, None, None From 0ca7592447cdbffa0fbb3263d66d5d06e5c69ee1 Mon Sep 17 00:00:00 2001 From: adaickalavan Date: Tue, 30 May 2023 10:05:30 -0400 Subject: [PATCH 4/4] Use built-in point method. --- smarts/env/gymnasium/wrappers/metric/costs.py | 8 +-- .../env/gymnasium/wrappers/metric/metrics.py | 51 +++++++++---------- 2 files changed, 27 insertions(+), 32 deletions(-) diff --git a/smarts/env/gymnasium/wrappers/metric/costs.py b/smarts/env/gymnasium/wrappers/metric/costs.py index 575fe579d8..59d7f131fd 100644 --- a/smarts/env/gymnasium/wrappers/metric/costs.py +++ b/smarts/env/gymnasium/wrappers/metric/costs.py @@ -136,7 +136,7 @@ def func( if not done: cur_pos = Point(*obs.ego_vehicle_state.position) cur_on_route, cur_route_lane, cur_route_lane_point, cur_route_displacement = on_route( - road_map=road_map, route=route, pos=cur_pos + road_map=road_map, route=route, point=cur_pos ) if cur_on_route: @@ -151,7 +151,7 @@ def func( else: cur_pos = Point(*obs.ego_vehicle_state.position) cur_on_route, cur_route_lane, cur_route_lane_point, cur_route_displacement = on_route( - road_map=road_map, route=route, pos=cur_pos + road_map=road_map, route=route, point=cur_pos ) # Step 1: Compute the last off-route distance driven by the vehicle, if any. @@ -179,7 +179,7 @@ def func( # Step 4: Compute lane error penalty if vehicle is in the same road as goal, but in a different lane. # TODO: Lane error penalty should be computed. It is not computed - # currently because the end lane of a SUMO traffic vehicle of + # currently because the end lane of a SUMO traffic vehicle of # interest is currently not accessible. lane_error_dist = 0 # end_lane = route.end_lane @@ -663,7 +663,7 @@ def on_route( Returns: Tuple[bool, Optional[RoadMap.Lane], Optional[smarts.core.coordinates.Point], Optional[float]]: True if `point` is nearby any road in `route`, else False. If true, - additionally returns the (i) nearest lane in route, (ii) its + additionally returns the (i) nearest lane in route, (ii) its nearest lane center point, and (iii) displacement between `point` and lane center point. """ diff --git a/smarts/env/gymnasium/wrappers/metric/metrics.py b/smarts/env/gymnasium/wrappers/metric/metrics.py index aeca6bbef1..c92902fbbe 100644 --- a/smarts/env/gymnasium/wrappers/metric/metrics.py +++ b/smarts/env/gymnasium/wrappers/metric/metrics.py @@ -202,6 +202,7 @@ def reset(self, **kwargs): "actors of interest." ) + # fmt: off # Refresh the cost functions for every episode. for agent_name in self._cur_agents: cost_funcs_kwargs = {} @@ -211,7 +212,7 @@ def reset(self, **kwargs): end_pos = self._scen.missions[agent_name].goal.position dist_tot, route = get_dist( road_map=self._road_map, - point_a=Point(*self._scen.missions[agent_name].start.position), + point_a=self._scen.missions[agent_name].start.point, point_b=end_pos, ) elif isinstance(interest_criteria, InterestDoneCriteria) and (interest_actor is not None): @@ -234,36 +235,29 @@ def reset(self, **kwargs): raise MetricsError( "Unsupported configuration for distance-to-destination cost function." ) - start_pos = Point(*self._scen.missions[agent_name].start.position) cur_on_route, cur_route_lane, cur_route_lane_point, cur_route_displacement = on_route( - road_map=self._road_map, route=route, pos=start_pos + road_map=self._road_map, route=route, point=self._scen.missions[agent_name].start.point ) assert cur_on_route, f"{agent_name} does not start nearby the desired route." - cost_funcs_kwargs.update( - { - "dist_to_destination": { - "end_pos": end_pos, - "dist_tot": dist_tot, - "route": route, - "prev_route_lane": cur_route_lane, - "prev_route_lane_point": cur_route_lane_point, - "prev_route_displacement": cur_route_displacement, - } + cost_funcs_kwargs.update({ + "dist_to_destination": { + "end_pos": end_pos, + "dist_tot": dist_tot, + "route": route, + "prev_route_lane": cur_route_lane, + "prev_route_lane_point": cur_route_lane_point, + "prev_route_displacement": cur_route_displacement, } - ) - - cost_funcs_kwargs.update( - { - "dist_to_obstacles": { - "ignore": self._params.dist_to_obstacles.ignore - }, - "steps": { - "max_episode_steps": self.env.agent_interfaces[ - agent_name - ].max_episode_steps - }, - } - ) + }) + + cost_funcs_kwargs.update({ + "dist_to_obstacles": { + "ignore": self._params.dist_to_obstacles.ignore + }, + "steps": { + "max_episode_steps": self.env.agent_interfaces[agent_name].max_episode_steps + }, + }) self._cost_funcs[agent_name] = make_cost_funcs( params=self._params, **cost_funcs_kwargs ) @@ -279,6 +273,7 @@ def reset(self, **kwargs): } return result + # fmt: on def records(self) -> Dict[str, Dict[str, Record]]: """ @@ -371,7 +366,7 @@ def _get_end_and_dist( end_pos = goal.position dist_tot, route = get_dist( road_map=road_map, - point_a=Point(*interest_social_mission.start.position), + point_a=interest_social_mission.start.point, point_b=end_pos, ) else: