Skip to content

Commit

Permalink
refactor geotagging (#442)
Browse files Browse the repository at this point in the history
* refactor geotagging

* move camera specific code to GeotagFromBlackVue

* Move camera model/make extration to GeotagFromExif

* Improve GPX interpolation error message

* Add tests for offset_time and offset_angle

* Parsing GoPro GPMF in tempdir
  • Loading branch information
ptpt authored Oct 13, 2021
1 parent 40c4433 commit 0242e19
Show file tree
Hide file tree
Showing 29 changed files with 1,148 additions and 1,209 deletions.
15 changes: 0 additions & 15 deletions mapillary_tools/apply_camera_specific_config.py

This file was deleted.

1 change: 0 additions & 1 deletion mapillary_tools/camera_support/__init__.py

This file was deleted.

30 changes: 0 additions & 30 deletions mapillary_tools/camera_support/prepare_blackvue_videos.py

This file was deleted.

16 changes: 9 additions & 7 deletions mapillary_tools/commands/process.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import inspect
import argparse

from ..insert_MAPJson import insert_MAPJson
from ..process_geotag_properties import process_geotag_properties
from ..process_geotag_properties import process_geotag_properties, insert_MAPJson
from ..process_import_meta_properties import (
process_import_meta_properties,
)
Expand Down Expand Up @@ -218,27 +217,29 @@ def run(self, vars_args: dict):
):
vars_args["duplicate_angle"] = 360

process_import_meta_properties(
descs = process_geotag_properties(
**(
{
k: v
for k, v in vars_args.items()
if k in inspect.getfullargspec(process_import_meta_properties).args
if k in inspect.getfullargspec(process_geotag_properties).args
}
)
)

process_geotag_properties(
descs = process_import_meta_properties(
descs=descs,
**(
{
k: v
for k, v in vars_args.items()
if k in inspect.getfullargspec(process_geotag_properties).args
if k in inspect.getfullargspec(process_import_meta_properties).args
}
)
)

process_sequence_properties(
descs = process_sequence_properties(
descs=descs,
**(
{
k: v
Expand All @@ -249,6 +250,7 @@ def run(self, vars_args: dict):
)

insert_MAPJson(
descs=descs,
**(
{
k: v
Expand Down
6 changes: 2 additions & 4 deletions mapillary_tools/commands/video_process.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from .process import Command as ProcessCommand
from .sample_video import Command as SampleCommand
from ..apply_camera_specific_config import apply_camera_specific_config


class Command:
Expand All @@ -12,6 +11,5 @@ def add_basic_arguments(self, parser):
ProcessCommand().add_basic_arguments(parser)

def run(self, args: dict):
vars_args = apply_camera_specific_config(args)
SampleCommand().run(vars_args)
ProcessCommand().run(vars_args)
SampleCommand().run(args)
ProcessCommand().run(args)
4 changes: 4 additions & 0 deletions mapillary_tools/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ class MapillaryGeoTaggingError(MapillaryUserError):
pass


class MapillaryInterpolationError(MapillaryUserError):
pass


class MapillaryStationaryBlackVueError(MapillaryUserError):
pass

Expand Down
12 changes: 4 additions & 8 deletions mapillary_tools/exif_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,24 +252,20 @@ def extract_lon_lat(self) -> Tuple[Optional[float], Optional[float]]:

return None, None

def extract_make(self) -> str:
def extract_make(self) -> Optional[str]:
"""
Extract camera make
"""
fields = ["EXIF LensMake", "Image Make"]
make, _ = self._extract_alternative_fields(
fields, default="none", field_type=str
)
make, _ = self._extract_alternative_fields(fields, field_type=str)
return make

def extract_model(self) -> str:
def extract_model(self) -> Optional[str]:
"""
Extract camera model
"""
fields = ["EXIF LensModel", "Image Model"]
model, _ = self._extract_alternative_fields(
fields, default="none", field_type=str
)
model, _ = self._extract_alternative_fields(fields, field_type=str)
return model

def extract_orientation(self) -> int:
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -1,23 +1,109 @@
import logging
import typing as T
import datetime
import io
import logging
import os
import re
import typing as T

import pynmea2
from pymp4.parser import Box

from .geo import get_max_distance_from_start
from .types import GPXPoint

"""
Pulls geo data out of a BlackVue video files
"""
from .geotag_from_generic import GeotagFromGeneric
from .geotag_from_gpx import GeotagFromGPX
from .. import image_log, types
from ..error import MapillaryGeoTaggingError, MapillaryStationaryBlackVueError
from ..geo import get_max_distance_from_start

LOG = logging.getLogger(__name__)


def get_points_from_bv(path, use_nmea_stream_timestamp=False) -> T.List[GPXPoint]:
class GeotagFromBlackVue(GeotagFromGeneric):
def __init__(self, image_dir: str, source_path: str):
super().__init__()
self.image_dir = image_dir
if os.path.isdir(source_path):
self.blackvue_videos = image_log.get_video_file_list(
source_path, abs_path=True
)
elif os.path.isfile(source_path):
# it is okay to not suffix with .mp4
self.blackvue_videos = [source_path]
else:
raise RuntimeError(f"The geotag_source_path {source_path} does not exist")
self.source_path = source_path

def to_description(self) -> T.List[types.FinalImageDescriptionOrError]:
descs: T.List[types.FinalImageDescriptionOrError] = []

images = image_log.get_total_file_list(self.image_dir)
for blackvue_video in self.blackvue_videos:
sample_images = filter_video_samples(images, blackvue_video)
if not sample_images:
continue

[points, is_stationary_video] = gpx_from_blackvue(
blackvue_video, use_nmea_stream_timestamp=False
)

if not points:
message = f"Skipping the BlackVue video {blackvue_video} -- no GPS found in the video"
for image in sample_images:
error = types.describe_error(MapillaryGeoTaggingError(message))
descs.append({"error": error, "filename": image})
continue

if is_stationary_video:
message = f"Skipping stationary BlackVue video {blackvue_video}"
for image in sample_images:
error = types.describe_error(
MapillaryStationaryBlackVueError(message)
)
descs.append({"error": error, "filename": image})
continue

geotag = GeotagFromGPX(self.image_dir, sample_images, points)

model = find_camera_model(blackvue_video)
LOG.debug(f"Found BlackVue camera model %s for %s", model, blackvue_video)

for desc in geotag.to_description():
if "error" not in desc:
desc = T.cast(types.ImageDescriptionJSON, desc)
desc["MAPDeviceMake"] = "Blackvue"
if model is not None:
desc["MAPDeviceModel"] = model.decode("utf-8")
descs.append(desc)

return descs


def find_camera_model(video_path) -> T.Optional[bytes]:
with open(video_path, "rb") as fd:
fd.seek(0, io.SEEK_END)
eof = fd.tell()
fd.seek(0)
while fd.tell() < eof:
box = Box.parse_stream(fd)
if box.type.decode("utf-8") == "free":
return box.data[29:39]
return None


def is_sample_of_video(sample_path: str, video_filename: str) -> bool:
abs_sample_path = os.path.abspath(sample_path)
video_basename = os.path.basename(video_filename)
if video_basename == os.path.basename(os.path.dirname(abs_sample_path)):
sample_basename = os.path.basename(sample_path)
root, _ = os.path.splitext(video_basename)
return sample_basename.startswith(root + "_")
return False


def filter_video_samples(images: T.List[str], video_path: str) -> T.List[str]:
return [image for image in images if is_sample_of_video(image, video_path)]


def get_points_from_bv(path, use_nmea_stream_timestamp=False) -> T.List[types.GPXPoint]:
points = []
with open(path, "rb") as fd:
fd.seek(0, io.SEEK_END)
Expand Down Expand Up @@ -173,7 +259,7 @@ def get_points_from_bv(path, use_nmea_stream_timestamp=False) -> T.List[GPXPoint

break

return [GPXPoint(time=p[0], lat=p[1], lon=p[2], alt=p[3]) for p in points]
return [types.GPXPoint(time=p[0], lat=p[1], lon=p[2], alt=p[3]) for p in points]


def is_video_stationary(max_distance_from_start) -> bool:
Expand All @@ -186,8 +272,8 @@ def is_video_stationary(max_distance_from_start) -> bool:


def gpx_from_blackvue(
bv_video, use_nmea_stream_timestamp=False
) -> T.Tuple[T.List[GPXPoint], bool]:
bv_video: str, use_nmea_stream_timestamp=False
) -> T.Tuple[T.List[types.GPXPoint], bool]:
points = get_points_from_bv(bv_video, use_nmea_stream_timestamp)
if not points:
return points, True
Expand Down
66 changes: 66 additions & 0 deletions mapillary_tools/geotag/geotag_from_exif.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import os
import typing as T

from .geotag_from_generic import GeotagFromGeneric
from .. import types
from ..exif_read import ExifRead
from ..error import MapillaryGeoTaggingError


class GeotagFromEXIF(GeotagFromGeneric):
def __init__(self, image_dir: str, images: T.List[str]):
self.image_dir = image_dir
self.images = images
super().__init__()

def to_description(self) -> T.List[types.FinalImageDescriptionOrError]:
descs: T.List[types.FinalImageDescriptionOrError] = []

for image in self.images:
image_path = os.path.join(self.image_dir, image)

exif = ExifRead(image_path)

lon, lat = exif.extract_lon_lat()
if lat is None or lon is None:
exc = MapillaryGeoTaggingError(
"Unable to extract GPS Longitude or GPS Latitude from the image"
)
descs.append({"error": types.describe_error(exc), "filename": image})
continue

timestamp = exif.extract_capture_time()
if timestamp is None:
exc = MapillaryGeoTaggingError(
"Unable to extract timestamp from the image"
)
descs.append({"error": types.describe_error(exc), "filename": image})
continue

angle = exif.extract_direction()

desc: types.ImageDescriptionJSON = {
"MAPLatitude": lat,
"MAPLongitude": lon,
"MAPCaptureTime": types.datetime_to_map_capture_time(timestamp),
"filename": image,
}
if angle is not None:
desc["MAPCompassHeading"] = {
"TrueHeading": angle,
"MagneticHeading": angle,
}

desc["MAPOrientation"] = exif.extract_orientation()

make = exif.extract_make()
if make is not None:
desc["MAPDeviceMake"] = make

model = exif.extract_model()
if model is not None:
desc["MAPDeviceModel"] = model

descs.append(desc)

return descs
11 changes: 11 additions & 0 deletions mapillary_tools/geotag/geotag_from_generic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import typing as T

from .. import types


class GeotagFromGeneric:
def __init__(self):
pass

def to_description(self) -> T.List[types.FinalImageDescriptionOrError]:
return []
Loading

0 comments on commit 0242e19

Please sign in to comment.