-
Notifications
You must be signed in to change notification settings - Fork 5
/
map_conversion.py
241 lines (197 loc) · 8.52 KB
/
map_conversion.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
import argparse
from collections import defaultdict
import json
from typing import Dict, Iterable, Any
import numpy as np
from xml.dom.minidom import parseString
import xml.etree.ElementTree as ET
from argoverse.utils.manhattan_search import compute_point_cloud_bbox
from nuscenes.map_expansion.map_api import NuScenesMap
from nuscenes.map_expansion import arcline_path_utils
"""
Converts the nuScenes map into the Argoverse format.
The nuscenes map is a json file, and is converted to an xml for use with argoverse codebase.
"""
filename_to_id = {
"boston-seaport": "BSP_10318",
"singapore-hollandvillage": "SHV_10320",
"singapore-onenorth": "SON_10322",
"singapore-queenstown": "SQT_10324",
}
LANE_DISCRETIZATION_RESOLUTION_M = 0.5 # meters
# Set defaults for fields not provided in the nuscenes map, but required in argoverse maps
DEFAULT_TURN_DIRECTION = "NONE"
DEFAULT_TRAFFIC_CONTROL = "False"
DEFAULT_IS_INTERSECTION = "False"
DEFAULT_L_NEIGHBOR = "None"
DEFAULT_R_NEIGHBOR = "None"
def populate_lane_dict(
nusc_map: NuScenesMap, root: ET.Element, data: Dict[str, Iterable[Any]]
) -> Dict[str, Iterable[int]]:
"""
Return a map that links a nuscenes 'way' token to all the nodes that are a part of that way.
Also adds all these nodes to the xml map.
"""
lane_dict = defaultdict(list)
# Map x,y value of node to the new(argo) node id in the xml
present_nodes = {} # k:tuple, v: node_id
global_id = 0 # New id for nodes in the xml
# Loop over all lanes in the json to populate the lane to node dictionary
for way in data["lane"] + data["lane_connector"]:
lane_record = nusc_map.get_arcline_path(way["token"]) # get arcline associated with lane
poses = arcline_path_utils.discretize_lane(
lane_record, LANE_DISCRETIZATION_RESOLUTION_M
) # discretize the lane to given resolution
for pose in poses:
currNode = (pose[0], pose[1])
if currNode not in present_nodes:
node = ET.SubElement(root, "node")
node.set("id", str(global_id))
present_nodes[currNode] = global_id
global_id += 1
node.set("x", str(pose[0]))
node.set("y", str(pose[1]))
lane_dict[way["token"]].append(present_nodes[currNode])
return lane_dict
def populate_polys(data: Dict[str, Iterable[Any]]) -> Dict[str, np.ndarray]:
"""Create a lookup table of the exterior lane polygon boundary for each lane
Returns:
poly_dict: map from lane token to array of shape (N,2), representing x,y coordinates of their exterior nodes
"""
# Map nodes in the original json to their x, y coordinates
node_dict = {} # k: node_token v: (x,y)
# Loop over all nodes in the json to populate the node_dict
for node in data["node"]:
node_dict[node["token"]] = (node["x"], node["y"])
poly_dict = {} # k: poly_token v: np.ndarray of shape (N,2)
# Loop over all nodes in the json to populate the node_dict
for poly in data["polygon"]:
poly_array = []
for node_id in poly["exterior_node_tokens"]:
poly_array.append(list(node_dict[node_id]))
poly_dict[poly["token"]] = np.array(poly_array)
return poly_dict
def create_lanes_xml(
nusc_map: NuScenesMap,
root: ET.Element,
data: Dict[str, Iterable[Any]],
filename: str,
argo_dir: str,
lane_dict: Dict[str, Iterable[int]],
poly_dict: Dict[str, np.ndarray],
) -> None:
"""
Fill up the xml map file with lane centelines.
Also create the supporting files halluc_bbox_table.npy and tableidx_to_laneid_map.json
"""
# Id to assign to lanes in the new xml map file. We arbitrarily start with 8000000.
# We make up new lane_ids since the original one's are non numerical
global_way_id = 8000000
# Map that links new lane_id in the xml to its original token in the json.
way_to_lane_id = {}
# map lane segment IDs to their index in the table
tableidx_to_laneid_map = {}
# array that holds xmin,ymin,xmax,ymax for each coord
halluc_bbox_table = []
table_idx_counter = 0
## Iterate over the lanes to create the required xml and supporting files
for way in data["lane"] + data["lane_connector"]:
node = ET.SubElement(root, "way")
if way["token"] not in way_to_lane_id:
way_to_lane_id[way["token"]] = global_way_id
global_way_id += 1
curr_id = way_to_lane_id[way["token"]]
node.set("lane_id", str(curr_id))
traffic = ET.SubElement(node, "tag")
traffic.set("k", "has_traffic_control")
traffic.set("v", DEFAULT_TRAFFIC_CONTROL)
turn = ET.SubElement(node, "tag")
turn.set("k", "turn_direction")
turn.set("v", DEFAULT_TURN_DIRECTION)
intersection = ET.SubElement(node, "tag")
intersection.set("k", "is_intersection")
intersection.set("v", DEFAULT_IS_INTERSECTION)
ln = ET.SubElement(node, "tag")
ln.set("k", "l_neighbor_id")
ln.set("v", DEFAULT_L_NEIGHBOR)
rn = ET.SubElement(node, "tag")
rn.set("k", "r_neighbor_id")
rn.set("v", DEFAULT_R_NEIGHBOR)
for waypoint in lane_dict[way["token"]]:
nd = ET.SubElement(node, "nd")
nd.set("ref", str(waypoint))
predecessors = nusc_map.get_incoming_lane_ids(way["token"])
successors = nusc_map.get_outgoing_lane_ids(way["token"])
for pred_id in predecessors:
pre = ET.SubElement(node, "tag")
pre.set("k", "predecessor")
if pred_id not in way_to_lane_id:
way_to_lane_id[pred_id] = global_way_id
global_way_id += 1
int_pred_id = way_to_lane_id[pred_id]
pre.set("v", str(int_pred_id))
for succ_id in successors:
succ = ET.SubElement(node, "tag")
succ.set("k", "successor")
if succ_id not in way_to_lane_id:
way_to_lane_id[succ_id] = global_way_id
global_way_id += 1
int_succ_id = way_to_lane_id[succ_id]
succ.set("v", str(int_succ_id))
lane_id = way_to_lane_id[way["token"]]
tableidx_to_laneid_map[table_idx_counter] = lane_id
table_idx_counter += 1
xmin, ymin, xmax, ymax = compute_point_cloud_bbox(poly_dict[way["polygon_token"]])
halluc_bbox_table += [(xmin, ymin, xmax, ymax)]
halluc_bbox_table = np.array(halluc_bbox_table)
halluc_bbox_dict = {
"tableidx_to_laneid_map": tableidx_to_laneid_map,
"halluc_bbox_table": halluc_bbox_table,
}
np.save(
f"{argo_dir}/{filename_to_id[filename]}_halluc_bbox_table.npy",
halluc_bbox_table,
)
with open(f"{argo_dir}/{filename_to_id[filename]}_tableidx_to_laneid_map.json", "w") as outfile:
json.dump(tableidx_to_laneid_map, outfile)
tree = ET.ElementTree(root)
with open(f"{argo_dir}/pruned_nuscenes_{filename_to_id[filename]}_vector_map.xml", "wb") as files:
tree.write(files)
def convert_map(args: argparse.Namespace) -> None:
"""
Core function of this script. Loads each json file, calls the helper functions to extract relevant information
from the json files, and finally creates xml files in the argoverse format.
"""
# Load json file. Loop over all the json files in directory
for filename in [
"boston-seaport",
"singapore-hollandvillage",
"singapore-onenorth",
"singapore-queenstown",
]:
print(filename)
with open(f"/{args.nuscenes_dir}/maps/expansion/{filename}.json") as f:
data = json.load(f)
nusc_map = NuScenesMap(dataroot=f"/{args.nuscenes_dir}", map_name=filename)
# Create new xml ETree
root = ET.Element("NuScenesMap")
# Map lane token of nuscenes to node_tokens that form the centerline
lane_dict = populate_lane_dict(nusc_map, root, data) # k: token, v: list of node_token
poly_dict = populate_polys(data)
create_lanes_xml(nusc_map, root, data, filename, args.argo_dir, lane_dict, poly_dict)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--nuscenes-dir",
default="/coc/dataset/nuScenes-v1.0/",
type=str,
help="the path to the directory where the NuScenes map is stored",
)
parser.add_argument(
"--argo-dir",
default="output",
type=str,
help="the path to the directory where the converted data should be written",
)
args = parser.parse_args()
convert_map(args)