Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: parallelizate fl annotation processing #182

Merged
merged 4 commits into from
Dec 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 85 additions & 55 deletions perception_dataset/fastlabel_to_t4/fastlabel_2d_to_t4_converter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import base64
from collections import defaultdict
from concurrent.futures import ProcessPoolExecutor, as_completed
import json
import os.path as osp
from pathlib import Path
Expand Down Expand Up @@ -175,67 +176,96 @@ def _format_fastlabel_annotation(self, annotations: Dict[str, List[Dict[str, Any
fl_annotations: Dict[str, Dict[int, List[Dict[str, Any]]]] = {}

for filename, ann_list in sorted(annotations.items()):
dataset_name: str = Path(filename).stem
for ann in ann_list:
filename: str = ann["name"].split("/")[-1]
file_id: int = int(filename.split(".")[0])
frame_no: int = file_id + 1
camera: str = ann["name"].split("/")[-2]
dataset_name = Path(filename).stem
# Process annotations for each file in parallel
file_annotations = self._process_annotations_for_file(ann_list)

if dataset_name not in fl_annotations:
fl_annotations[dataset_name] = defaultdict(list)
# Combine results
if dataset_name not in fl_annotations:
fl_annotations[dataset_name] = defaultdict(list)

width: int = ann["width"]
height: int = ann["height"]
for file_id, ann_list in file_annotations.items():
fl_annotations[dataset_name][file_id].extend(ann_list)

for a in ann["annotations"]:
occlusion_state: str = "occlusion_state.none"
visibility: str = "Not available"
instance_id = ""
for att in a["attributes"]:
if att["key"] == "id":
instance_id = att["value"]
if "occlusion_state" in att["key"]:
for v in att["value"]:
if frame_no in range(v[0], v[1]):
occlusion_state = (
"occlusion_state." + att["key"].split("_")[-1]
)
visibility = self._convert_occlusion_to_visibility(
att["key"].split("_")[-1]
)
break
category_label = self._label_converter.convert_label(a["title"])
label_t4_dict: Dict[str, Any] = {
"category_name": category_label,
"instance_id": instance_id,
"attribute_names": [occlusion_state],
"visibility_name": visibility,
}
if a["type"] == "bbox":
label_t4_dict.update(
{
"two_d_box": a["points"],
"sensor_id": self._camera2idx[camera],
}
return fl_annotations

def process_annotation(self, a, frame_no, camera, width, height):
occlusion_state = "occlusion_state.none"
visibility = "Not available"
instance_id = ""

# process attributes
for att in a["attributes"]:
if att["key"] == "id":
instance_id = att["value"]
if "occlusion_state" in att["key"]:
for v in att["value"]:
if frame_no in range(v[0], v[1]):
occlusion_state = "occlusion_state." + att["key"].split("_")[-1]
visibility = self._convert_occlusion_to_visibility(
att["key"].split("_")[-1]
)
elif a["type"] == "segmentation":
label_t4_dict.update(
{
"two_d_segmentation": _rle_from_points(a["points"], width, height),
"sensor_id": self._camera2idx[camera],
}
break

# category and label generation
category_label = self._label_converter.convert_label(a["title"])
label_t4_dict = {
"category_name": category_label,
"instance_id": instance_id,
"attribute_names": [occlusion_state],
"visibility_name": visibility,
}

# bbox or segmentation type processing
if a["type"] == "bbox":
label_t4_dict.update(
{
"two_d_box": a["points"],
"sensor_id": self._camera2idx[camera],
}
)
elif a["type"] == "segmentation":
label_t4_dict.update(
{
"two_d_segmentation": _rle_from_points(a["points"], width, height),
"sensor_id": self._camera2idx[camera],
}
)
if (
self._label_converter.is_object_label(category_label)
and category_label not in self._surface_categories
):
label_t4_dict["two_d_box"] = _convert_polygon_to_bbox(a["points"][0][0])
print(f"Converted polygon to bbox for {category_label}")

return label_t4_dict

def _process_annotations_for_file(self, ann_list):
file_annotations = defaultdict(list)

# parallelize for each annotation in the file
with ProcessPoolExecutor() as executor:
futures = []
for ann in ann_list:
filename = ann["name"].split("/")[-1]
file_id = int(filename.split(".")[0])
frame_no = file_id + 1
camera = ann["name"].split("/")[-2]
width = ann["width"]
height = ann["height"]

for a in ann["annotations"]:
futures.append(
executor.submit(
self.process_annotation, a, frame_no, camera, width, height
)
if (
self._label_converter.is_object_label(category_label)
and category_label not in self._surface_categories
):
label_t4_dict["two_d_box"] = _convert_polygon_to_bbox(
a["points"][0][0]
)
fl_annotations[dataset_name][file_id].append(label_t4_dict)
)

return fl_annotations
for future in as_completed(futures):
label_t4_dict = future.result()
file_annotations[file_id].append(label_t4_dict)

return file_annotations


def _rle_from_points(points: Points2DLike, width: int, height: int) -> Dict[str, Any]:
Expand Down
Loading