Skip to content

Commit

Permalink
fixed todo review comments -> moved to todo comments in pull request …
Browse files Browse the repository at this point in the history
…comments

removed unused class SingletonTrackDataset
  • Loading branch information
ar0305 committed Sep 18, 2024
1 parent fb2c45f commit 9cc8ce6
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 276 deletions.
11 changes: 2 additions & 9 deletions OTAnalytics/plugin_parser/otvision_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ def __fix_recorded_start_date(
Fix datetime of recorded start date in video metadata
from date string to timestamp.
"""
# TODO Review: Not sure in which version this changed ?
if current_version < self.to_version():
recorded_start_date = metadata[ottrk_format.VIDEO][
ottrk_format.RECORDED_START_DATE
Expand All @@ -251,8 +250,8 @@ def __fix_recorded_start_date(
date.timestamp()
)
except (ValueError, TypeError):
# TODO just for safety in case there are both
# TODO file with timestamp and date string in prior versions
# just for safety in case there are both
# file with timestamp and date string in prior versions
metadata[ottrk_format.VIDEO][ottrk_format.RECORDED_START_DATE] = float(
recorded_start_date
)
Expand Down Expand Up @@ -304,16 +303,12 @@ def __parse_otdet_version(self, content: dict) -> Version:
version = content[ottrk_format.METADATA][ottrk_format.OTDET_VERSION]
return Version.from_str(version)

# TODO find better name instead of name duplication?
def __fix_metadata(self, content: dict) -> dict:
content[ottrk_format.METADATA] = self.fix_metadata(
metadata=content[ottrk_format.METADATA]
)
return content

# TODO Review: fixing metadata alone can be reused in streaming_parser,
# TODO hence I made it public. This might also apply to fixing
# TODO single detection data, when they are parsed lazy (but currently not used).
def fix_metadata(self, metadata: dict) -> dict:
"""
Fix formate changes from older ottrk metadata
Expand Down Expand Up @@ -391,8 +386,6 @@ def __str__(self) -> str:
)


# TODO Review: these methods for creating PythonDetections and PythonTracks are static
# TODO and could live outside a class for reusability in streaming_parser
def parse_python_detection(
metadata_video: dict,
id_generator: TrackIdGenerator,
Expand Down
270 changes: 3 additions & 267 deletions OTAnalytics/plugin_parser/streaming_parser.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,23 @@
import bz2
from abc import ABC, abstractmethod
from bisect import bisect
from datetime import datetime
from itertools import islice
from pathlib import Path
from typing import Any, Callable, Iterable, Iterator, Optional, Sequence
from typing import Any, Callable, Iterable, Iterator

import ijson
from pygeos import (
contains,
get_coordinates,
intersection,
intersects,
is_empty,
line_locate_point,
points,
)

from OTAnalytics.application.datastore import DetectionMetadata, VideoMetadata
from OTAnalytics.application.state import TracksMetadata, VideosMetadata
from OTAnalytics.domain.geometry import RelativeOffsetCoordinate
from OTAnalytics.domain.progress import ProgressbarBuilder
from OTAnalytics.domain.section import Section, SectionId
from OTAnalytics.domain.track import (
Detection,
Track,
TrackClassificationCalculator,
TrackId,
)
from OTAnalytics.domain.track_dataset import (
IntersectionPoint,
TrackDataset,
TrackDoesNotExistError,
TrackSegmentDataset,
)
from OTAnalytics.plugin_datastore.python_track_store import (
PythonTrackDataset,
PythonTrackSegmentDataset,
create_segment_for,
cut_track_with_section,
)
from OTAnalytics.domain.track_dataset import TrackDataset
from OTAnalytics.plugin_datastore.track_geometry_store.pygeos_store import (
GEOMETRY,
PROJECTION,
PygeosTrackGeometryDataset,
area_section_to_pygeos,
create_pygeos_track,
line_sections_to_pygeos_multi,
)
from OTAnalytics.plugin_datastore.track_store import (
PandasByMaxConfidence,
Expand Down Expand Up @@ -373,241 +344,6 @@ def _sort_files(self, files: set[Path]) -> list[Path]:

def _start_date_metadata(self, file: Path) -> float:
json_events = parse_json_bz2_events(file)
metadata = metadata_from_json_events(
json_events
) # TODO metadata fixer in constructor
metadata = metadata_from_json_events(json_events)
metadata = self._format_fixer.fix_metadata(metadata)
return float(metadata[ottrk_format.VIDEO][ottrk_format.RECORDED_START_DATE])


# TODO review: tis singleton implementation of a track dataset is based on
# TODO implementation of PythonTrackDataset and PygeosTrackGeometryDataset
class SingletonTrackDataset(TrackDataset):
"""A TrackDataSet based on a single track."""

def __init__(self, track: Track) -> None:
self._track = track
self._geometries: dict[RelativeOffsetCoordinate, dict]

def _get_geometry_data_for(self, offset: RelativeOffsetCoordinate) -> Any:
"""Compute (or get cached) track geometry"""
if (geometry_data := self._geometries.get(offset, None)) is None:
geometry = create_pygeos_track(self._track, offset)
projection = [
line_locate_point(geometry, points(p))
for p in get_coordinates(geometry)
]
geometry_data = {
GEOMETRY: geometry,
PROJECTION: projection,
}

self._geometries[offset] = geometry_data
return geometry_data

@property
def track_ids(self) -> frozenset[TrackId]:
"""A set containing the single track id."""
return frozenset((self._track.id,))

@property
def first_occurrence(self) -> datetime | None:
"""Returns first occurrence of single track."""
return self._track.first_detection.occurrence

@property
def last_occurrence(self) -> datetime | None:
"""Returns last occurrence of single track."""
return self._track.last_detection.occurrence

@property
def classifications(self) -> frozenset[str]:
"""A set containing the single track classification."""
return frozenset((self._track.classification,))

@property
def empty(self) -> bool:
"""SingletonTrackDataset always contains exactly one Track."""
return False

def add_all(self, other: Iterable[Track]) -> "TrackDataset":
"""Adding is not allowed!"""
raise NotImplementedError

def get_for(self, id: TrackId) -> Optional[Track]:
"""Returns the single track if the given id matches else None."""
return self._track if id == self._track.id else None

def remove(self, track_id: TrackId) -> "TrackDataset":
"""Removing is not allowed!"""
raise NotImplementedError

def remove_multiple(self, track_ids: set[TrackId]) -> "TrackDataset":
"""Removing is not allowed!"""
raise NotImplementedError

def clear(self) -> "TrackDataset":
"""Returns empty PythonTrackDataset."""
return self._empty_dataset()

def as_list(self) -> list[Track]:
"""Return list with single track."""
return [self._track]

def intersecting_tracks(
self, sections: list[Section], offset: RelativeOffsetCoordinate
) -> set[TrackId]:
"""Return id of single track if it intersects any of sections."""
geometry_data = self._get_geometry_data_for(offset)
section_geoms = line_sections_to_pygeos_multi(sections)

if intersects(geometry_data[GEOMETRY], section_geoms):
return {self._track.id}
else:
return set()

def intersection_points(
self, sections: list[Section], offset: RelativeOffsetCoordinate
) -> dict[TrackId, list[tuple[SectionId, IntersectionPoint]]]:
"""Returns intersectionpints of the single track with all given sections."""
geometry_data = self._get_geometry_data_for(offset)
geometry = geometry_data[GEOMETRY]
projection = geometry_data[PROJECTION]
section_geoms = line_sections_to_pygeos_multi(sections)

section_ips = [
(sections[index].id, ip)
for index, ip in enumerate(intersection(geometry, section_geoms))
if not is_empty(ip)
]

intersections = [
(
_section_id,
IntersectionPoint(
bisect(projection, line_locate_point(geometry, point))
),
)
for _section_id, ip in section_ips
for point in get_coordinates(ip)
]

return {self._track.id: intersections}

def contained_by_sections(
self, sections: list[Section], offset: RelativeOffsetCoordinate
) -> dict[TrackId, list[tuple[SectionId, list[bool]]]]:
"""
Return the single track id + containment mask for the given sections.
If no detection of the track is contained by any section, returns empty set.
"""
geometry_data = self._get_geometry_data_for(offset)
geometry = geometry_data[GEOMETRY]

contains_result: list[tuple[SectionId, list[bool]]] = []
for _section in sections:
section_geom = area_section_to_pygeos(_section)

contains_mask = [
contains(section_geom, points(p))[0] for p in get_coordinates(geometry)
]

if not any(contains_mask):
continue

contains_result.append((_section.id, contains_mask))

return {self._track.id: contains_result}

def split(self, chunks: int) -> Sequence["TrackDataset"]:
"""
Splitting track dataset with only one track
returns sequence containing just itself.
"""
return [self]

def __len__(self) -> int:
"""Length of SingletonTrackDataset is always 1!"""
return 1

def calculate_geometries_for(
self, offsets: Iterable[RelativeOffsetCoordinate]
) -> None:
"""Calculate track geometry of single track for all given offsets."""
for offset in offsets:
if offset not in self._geometries.keys():
self._geometries[offset] = self._get_geometry_data_for(offset)

def get_first_segments(self) -> TrackSegmentDataset:
"""Get first segment of single track."""
start = self._track.get_detection(0)
end = self._track.get_detection(1)
return PythonTrackSegmentDataset(
segments=[create_segment_for(track=self._track, start=start, end=end)]
)

def get_last_segments(self) -> TrackSegmentDataset:
"""Get last segment of single track"""
start = self._track.detections[-2]
end = self._track.last_detection
return PythonTrackSegmentDataset(
segments=[create_segment_for(track=self._track, start=start, end=end)]
)

def cut_with_section(
self, section: Section, offset: RelativeOffsetCoordinate
) -> tuple["TrackDataset", set[TrackId]]:
"""
Cut the single track with the given section.
If they intersect returns TrackDataset with cut parts and id of single track.
Otherwise returns empty TrackDataset and empty TrackId set.
"""
cut_tracks = cut_track_with_section(self._track, section, offset)

if len(cut_tracks) > 0:
return (
PythonTrackDataset.from_list(
cut_tracks,
PygeosTrackGeometryDataset.from_track_dataset,
),
{self._track.id},
# only possible id is the single track of this data set
)
else: # return empty track dataset as no tracks were cut
return (self._empty_dataset(), set())

def filter_by_min_detection_length(self, length: int) -> "TrackDataset":
"""
Check if single track has at least the given amount of detections.
If so, returns the unchanged SingletonTrackDataset.
Otherwise returns empty TrackDataset.
"""
if len(self._track.detections) >= length:
return self
else:
return self._empty_dataset()

def get_max_confidences_for(self, track_ids: list[str]) -> dict[str, float]:
if len(set(track_ids)) != 1:
raise ValueError(
f"Multiple track ids lookup ({track_ids}) in SingletionTrackDataset "
+ f"which only contains one track: {self._track.id}"
)

if len(track_ids) == 0:
return dict()

track_id = track_ids[0]
if track_id != self._track.id:
raise TrackDoesNotExistError(
f"Track {track_id} not found. Only contains track {self._track.id}!"
)

max_confidence = max(
[detection.confidence for detection in self._track.detections]
)

return {track_id: max_confidence}

def _empty_dataset(self) -> TrackDataset:
return PythonTrackDataset(PygeosTrackGeometryDataset.from_track_dataset)

0 comments on commit 9cc8ce6

Please sign in to comment.