diff --git a/.gitignore b/.gitignore index 6fa3f196..98d7b5d7 100644 --- a/.gitignore +++ b/.gitignore @@ -10,5 +10,6 @@ __pycache__ /build/ /dist/ /venv/ +/.pyre/ .DS_Store -*.log \ No newline at end of file +*.log diff --git a/README.md b/README.md index 845f50d8..d6c6f797 100644 --- a/README.md +++ b/README.md @@ -292,6 +292,92 @@ mapillary_tools video_process MY_VIDEO_DIR \ --video_sample_distance -1 --video_sample_interval 2 ``` +## New video geotagging features (experimental) + +As experimental features, mapillary_tools can now: +* Geotag videos from tracks recorded in GPX and NMEA files +* Invoke `exiftool` internally, if available on the system. `exiftool` can extract geolocation data from a wide +range of video formats, allowing us to support more cameras with less trouble for the end user. +* Try several geotagging sources sequentially, until proper data is found. + +These features apply to the `process` command, which analyzes the video for direct upload instead of sampling it +into images. They are experimental and will be subject to change in future releases. + +### Usage + +The new video processing is triggered with the `--video_geotag_source SOURCE` option. It can be specified multiple times: +In this case, each source will be tried in turn, until one returns good quality data. + +`SOURCE` can be: +1. the plain name of the source - one of `video, camm, gopro, blackvue, gpx, nmea, exiftool_xml, exiftool_runtime` +2. a JSON object that includes optional parameters +```json5 +{ + "source": "SOURCE", + "pattern": "FILENAME_PATTERN" // optional +} +``` + +PATTERN specifies how the data source file is named, starting from the video filename: +* `%f`: the full video filename +* `%g`: the video filename without extension +* `%e`: the video filename extension + +Supported sources and their default pattern are: + +* `video`: parse the video, in order, as `camm, gopro, blackvue`. Pattern: `%f` +* `camm`: parse the video looking for a CAMM track. Pattern: `%f` +* `gopro`: parse the video looking for geolocation in GoPro format. Pattern: `%f` +* `blackvue`: parse the video looking for geolocation in BlackVue format. Pattern: `%f` +* `gpx`: external GPX file. Pattern: `%g.gpx` +* `nmea`: external NMEA file. Pattern: `%g.nmea` +* `exiftool_xml`: external XML file generated by exiftool. Pattern: `%g.xml` +* `exiftool_runtime`: execute exiftool on the video file. Pattern: `%f` + +Notes: +* `exiftool_runtime` only works if exiftool is installed in the system. If exiftool is not in the default +execution path, it is possible to specify its location by setting the environment variable `EXIFTOOL_PATH`. +* pattern are case-sensitive or not depending on the filesystem - in Windows, `%g.gpx` will match both `basename.gpx` +and `basename.GPX`, in MacOs, Linux or other Unix systems no. +* If both `--video_geotag_source` and `--geotag_source` are specified, `--video_geotag_source` will apply to video files +and `--geotag_source` to image files. + +### Examples + +#### Generic supported videos + +Process all videos in a directory, trying to parse them as CAMM, GoPro or BlackVue: +```sh +mapillary_tools process --video_geotag_source video VIDEO_DIR/ +``` + +#### External GPX + +Process all videos in a directory, taking geolocation data from GPX files. A video named `foo.mp4` will be associated +with a GPX file called `foo.gpx`. +```sh +mapillary_tools process --video_geotag_source gpx VIDEO_DIR/ +``` + +#### Insta360 stitched videos + +The videos to process have been stitched by Insta360 Studio; the geolocation data is in the original +videos in the parent directory, and there may be GPX files alongside the stitched video. +First look for GPX, then fallback to running exiftool against the original videos. +```sh +mapillary_tools process \ +--video_geotag_source gpx \ +--video_geotag_source '{"source": "exiftool_runtime", "pattern": "../%g.insv"}' \ +VIDEO_DIR/ +``` + +### Limitations of `--video_geotag_source` + +**External geolocation sources will be aligned with the start of video, there is +currently no way of applying offsets or scaling the time.** This means, for instance, that GPX tracks must begin precisely +at the instant of the video, and that timelapse videos are supported only for sources `camm, gopro, blackvue`. + + ## Authenticate The command `authenticate` will update the user credentials stored in the config file. diff --git a/mapillary_tools/commands/__main__.py b/mapillary_tools/commands/__main__.py index 93e5a102..53329a8e 100644 --- a/mapillary_tools/commands/__main__.py +++ b/mapillary_tools/commands/__main__.py @@ -63,14 +63,14 @@ def add_general_arguments(parser, command): elif command in ["upload"]: parser.add_argument( "import_path", - help="Path to your images.", + help="Paths to your images or videos.", nargs="+", type=Path, ) elif command in ["process", "process_and_upload"]: parser.add_argument( "import_path", - help="Path to your images.", + help="Paths to your images or videos.", nargs="+", type=Path, ) diff --git a/mapillary_tools/commands/process.py b/mapillary_tools/commands/process.py index 15b9ab7c..c93d57fb 100644 --- a/mapillary_tools/commands/process.py +++ b/mapillary_tools/commands/process.py @@ -171,6 +171,13 @@ def add_basic_arguments(self, parser: argparse.ArgumentParser): required=False, type=Path, ) + group_geotagging.add_argument( + "--video_geotag_source", + help="Name of the video data extractor and optional arguments. Can be specified multiple times. See the documentation for details. [Experimental, subject to change]", + action="append", + default=[], + required=False, + ) group_geotagging.add_argument( "--interpolation_use_gpx_start_time", help=f"If supplied, the first image will use the first GPX point time for interpolation, which means the image location will be interpolated to the first GPX point too. Only works for geotagging from {', '.join(geotag_gpx_based_sources)}.", @@ -261,13 +268,14 @@ def run(self, vars_args: dict): vars_args["duplicate_angle"] = 360 metadatas = process_geotag_properties( + vars_args=vars_args, **( { k: v for k, v in vars_args.items() if k in inspect.getfullargspec(process_geotag_properties).args } - ) + ), ) metadatas = process_import_meta_properties( diff --git a/mapillary_tools/constants.py b/mapillary_tools/constants.py index 0ec09ed7..6f6831cb 100644 --- a/mapillary_tools/constants.py +++ b/mapillary_tools/constants.py @@ -18,6 +18,7 @@ VIDEO_DURATION_RATIO = float(os.getenv(_ENV_PREFIX + "VIDEO_DURATION_RATIO", 1)) FFPROBE_PATH: str = os.getenv(_ENV_PREFIX + "FFPROBE_PATH", "ffprobe") FFMPEG_PATH: str = os.getenv(_ENV_PREFIX + "FFMPEG_PATH", "ffmpeg") +EXIFTOOL_PATH: str = os.getenv(_ENV_PREFIX + "EXIFTOOL_PATH", "exiftool") IMAGE_DESCRIPTION_FILENAME = os.getenv( _ENV_PREFIX + "IMAGE_DESCRIPTION_FILENAME", "mapillary_image_description.json" ) diff --git a/mapillary_tools/process_geotag_properties.py b/mapillary_tools/process_geotag_properties.py index efde26c0..03c6f63c 100644 --- a/mapillary_tools/process_geotag_properties.py +++ b/mapillary_tools/process_geotag_properties.py @@ -20,7 +20,10 @@ geotag_videos_from_exiftool_video, geotag_videos_from_video, ) -from .types import FileType +from .types import FileType, VideoMetadataOrError + +from .video_data_extraction.cli_options import CliOptions, CliParserOptions +from .video_data_extraction.extract_video_data import VideoDataExtractor LOG = logging.getLogger(__name__) @@ -30,6 +33,17 @@ "gopro_videos", "blackvue_videos", "camm", "exif", "gpx", "nmea", "exiftool" ] +VideoGeotagSource = T.Literal[ + "video", + "camm", + "gopro", + "blackvue", + "gpx", + "nmea", + "exiftool_xml", + "exiftool_runtime", +] + def _process_images( image_paths: T.Sequence[Path], @@ -40,7 +54,7 @@ def _process_images( interpolation_offset_time: float = 0.0, num_processes: T.Optional[int] = None, skip_subfolders=False, -) -> T.List[types.ImageMetadataOrError]: +) -> T.Sequence[types.ImageMetadataOrError]: geotag: geotag_from_generic.GeotagImagesFromGeneric if video_import_path is not None: @@ -124,6 +138,37 @@ def _process_images( return geotag.to_description() +def _process_videos( + geotag_source: str, + geotag_source_path: T.Optional[Path], + video_paths: T.Sequence[Path], + num_processes: T.Optional[int], + filetypes: T.Optional[T.Set[FileType]], +) -> T.Sequence[VideoMetadataOrError]: + geotag: geotag_from_generic.GeotagVideosFromGeneric + if geotag_source == "exiftool": + if geotag_source_path is None: + raise exceptions.MapillaryFileNotFoundError( + "Geotag source path (--geotag_source_path) is required" + ) + if not geotag_source_path.exists(): + raise exceptions.MapillaryFileNotFoundError( + f"Geotag source file not found: {geotag_source_path}" + ) + geotag = geotag_videos_from_exiftool_video.GeotagVideosFromExifToolVideo( + video_paths, + geotag_source_path, + num_processes=num_processes, + ) + else: + geotag = geotag_videos_from_video.GeotagVideosFromVideo( + video_paths, + filetypes=filetypes, + num_processes=num_processes, + ) + return geotag.to_description() + + def _normalize_import_paths( import_path: T.Union[Path, T.Sequence[Path]] ) -> T.Sequence[Path]: @@ -137,6 +182,7 @@ def _normalize_import_paths( def process_geotag_properties( + vars_args: T.Dict, # Hello, I'm a hack import_path: T.Union[Path, T.Sequence[Path]], filetypes: T.Set[FileType], geotag_source: GeotagSource, @@ -170,51 +216,43 @@ def process_geotag_properties( skip_subfolders=skip_subfolders, check_file_suffix=check_file_suffix, ) - image_metadatas = _process_images( - image_paths, - geotag_source=geotag_source, - geotag_source_path=geotag_source_path, - video_import_path=video_import_path, - interpolation_use_gpx_start_time=interpolation_use_gpx_start_time, - interpolation_offset_time=interpolation_offset_time, - num_processes=num_processes, - skip_subfolders=skip_subfolders, - ) - metadatas.extend(image_metadatas) - - if ( - FileType.CAMM in filetypes - or FileType.GOPRO in filetypes - or FileType.BLACKVUE in filetypes - or FileType.VIDEO in filetypes - ): - video_paths = utils.find_videos( - import_paths, - skip_subfolders=skip_subfolders, - check_file_suffix=check_file_suffix, - ) - geotag: geotag_from_generic.GeotagVideosFromGeneric - if geotag_source == "exiftool": - if geotag_source_path is None: - raise exceptions.MapillaryFileNotFoundError( - "Geotag source path (--geotag_source_path) is required" - ) - if not geotag_source_path.exists(): - raise exceptions.MapillaryFileNotFoundError( - f"Geotag source file not found: {geotag_source_path}" - ) - geotag = geotag_videos_from_exiftool_video.GeotagVideosFromExifToolVideo( - video_paths, - geotag_source_path, + if image_paths: + image_metadatas = _process_images( + image_paths, + geotag_source=geotag_source, + geotag_source_path=geotag_source_path, + video_import_path=video_import_path, + interpolation_use_gpx_start_time=interpolation_use_gpx_start_time, + interpolation_offset_time=interpolation_offset_time, num_processes=num_processes, + skip_subfolders=skip_subfolders, ) - else: - geotag = geotag_videos_from_video.GeotagVideosFromVideo( - video_paths, - filetypes=filetypes, - num_processes=num_processes, + metadatas.extend(image_metadatas) + + # --video_geotag_source is still experimental, for videos execute it XOR the legacy code + if vars_args["video_geotag_source"]: + metadatas.extend(_process_videos_beta(vars_args)) + else: + if ( + FileType.CAMM in filetypes + or FileType.GOPRO in filetypes + or FileType.BLACKVUE in filetypes + or FileType.VIDEO in filetypes + ): + video_paths = utils.find_videos( + import_paths, + skip_subfolders=skip_subfolders, + check_file_suffix=check_file_suffix, ) - metadatas.extend(geotag.to_description()) + if video_paths: + video_metadata = _process_videos( + geotag_source, + geotag_source_path, + video_paths, + num_processes, + filetypes, + ) + metadatas.extend(video_metadata) # filenames should be deduplicated in utils.find_images/utils.find_videos assert len(metadatas) == len( @@ -224,6 +262,38 @@ def process_geotag_properties( return metadatas +def _process_videos_beta(vars_args: T.Dict): + geotag_sources = vars_args["video_geotag_source"] + geotag_sources_opts: T.List[CliParserOptions] = [] + for source in geotag_sources: + parsed_opts: CliParserOptions = {} + try: + parsed_opts = json.loads(source) + except ValueError: + if source not in T.get_args(VideoGeotagSource): + raise exceptions.MapillaryBadParameterError( + "Unknown beta source %s or invalid JSON", source + ) + parsed_opts = {"source": source} + + if "source" not in parsed_opts: + raise exceptions.MapillaryBadParameterError("Missing beta source name") + + geotag_sources_opts.append(parsed_opts) + + options: CliOptions = { + "paths": vars_args["import_path"], + "recursive": vars_args["skip_subfolders"] == False, + "geotag_sources_options": geotag_sources_opts, + "geotag_source_path": vars_args["geotag_source_path"], + "num_processes": vars_args["num_processes"], + "device_make": vars_args["device_make"], + "device_model": vars_args["device_model"], + } + extractor = VideoDataExtractor(options) + return extractor.process() + + def _apply_offsets( metadatas: T.Iterable[types.ImageMetadata], offset_time: float = 0.0, diff --git a/mapillary_tools/video_data_extraction/cli_options.py b/mapillary_tools/video_data_extraction/cli_options.py new file mode 100644 index 00000000..dbcd064f --- /dev/null +++ b/mapillary_tools/video_data_extraction/cli_options.py @@ -0,0 +1,22 @@ +import typing as T +from pathlib import Path + + +known_parser_options = ["source", "pattern", "exiftool_path"] + + +class CliParserOptions(T.TypedDict, total=False): + source: str + pattern: T.Optional[str] + exiftool_path: T.Optional[Path] + + +class CliOptions(T.TypedDict, total=False): + paths: T.Sequence[Path] + recursive: bool + geotag_sources_options: T.Sequence[CliParserOptions] + geotag_source_path: Path + exiftool_path: Path + num_processes: int + device_make: T.Optional[str] + device_model: T.Optional[str] diff --git a/mapillary_tools/video_data_extraction/extract_video_data.py b/mapillary_tools/video_data_extraction/extract_video_data.py new file mode 100644 index 00000000..4c5a9915 --- /dev/null +++ b/mapillary_tools/video_data_extraction/extract_video_data.py @@ -0,0 +1,190 @@ +import logging +import typing as T +from multiprocessing import Pool +from pathlib import Path + +import tqdm + +import mapillary_tools.geotag.utils as video_utils + +from mapillary_tools import exceptions, geo, utils +from mapillary_tools.geotag import gpmf_gps_filter +from mapillary_tools.types import ( + ErrorMetadata, + FileType, + MetadataOrError, + VideoMetadata, + VideoMetadataOrError, +) +from mapillary_tools.video_data_extraction import video_data_parser_factory +from mapillary_tools.video_data_extraction.cli_options import CliOptions +from mapillary_tools.video_data_extraction.extractors.base_parser import BaseParser + + +LOG = logging.getLogger(__name__) + + +class VideoDataExtractor: + options: CliOptions + + def __init__(self, options: CliOptions) -> None: + self.options = options + + def process(self) -> T.List[MetadataOrError]: + paths = self.options["paths"] + self._check_paths(paths) + video_files = utils.find_videos(paths) + self._check_sources_cardinality(video_files) + + num_processes = self.options["num_processes"] or None + with Pool(processes=num_processes) as pool: + if num_processes == 1: + iter: T.Iterator[VideoMetadataOrError] = map( + self.process_file, video_files + ) + else: + iter = pool.imap(self.process_file, video_files) + + video_metadata_or_errors = list( + tqdm.tqdm( + iter, + desc="Extracting GPS tracks", + unit="videos", + disable=LOG.getEffectiveLevel() <= logging.DEBUG, + total=len(video_files), + ) + ) + + return video_metadata_or_errors + + def process_file(self, file: Path) -> VideoMetadataOrError: + parsers = video_data_parser_factory.make_parsers(file, self.options) + points: T.Sequence[geo.Point] = [] + make = self.options["device_make"] + model = self.options["device_model"] + + ex: T.Optional[Exception] + for parser in parsers: + log_vars = { + "filename": file, + "parser": parser.parser_label, + "source": parser.geotag_source_path, + } + try: + if not points: + points = self._extract_points(parser, log_vars) + if not model: + model = parser.extract_model() + if not make: + make = parser.extract_make() + except Exception as e: + ex = e + LOG.warn( + '%(filename)s: Exception for parser %(parser)s while processing source %(source)s: "%(e)s"', + {**log_vars, "e": e}, + ) + + # After trying all parsers, return the points if we found any, otherwise + # the last exception thrown or a default one. + # Note that if we have points, we return them, regardless of exceptions + # with make or model. + if points: + video_metadata = VideoMetadata( + filename=file, + filetype=FileType.VIDEO, + md5sum=None, + points=points, + make=make, + model=model, + ) + video_metadata.update_md5sum() + return video_metadata + else: + return ErrorMetadata( + filename=file, + error=ex + if ex + else exceptions.MapillaryVideoGPSNotFoundError( + "No GPS data found from the video" + ), + filetype=FileType.VIDEO, + ) + + def _extract_points( + self, parser: BaseParser, log_vars: T.Dict + ) -> T.Sequence[geo.Point]: + points = parser.extract_points() + if points: + LOG.debug( + "%(filename)s: %(points)d points extracted by parser %(parser)s from file %(source)s}", + {**log_vars, "points": len(points)}, + ) + + points = self._sanitize_points(points) + + if parser.must_rebase_times_to_zero: + points = self._rebase_times(points) + + return points + + @staticmethod + def _check_paths(import_paths: T.Sequence[Path]): + for path in import_paths: + if not path.is_file() and not path.is_dir(): + raise exceptions.MapillaryFileNotFoundError( + f"Import file or directory not found: {path}" + ) + + def _check_sources_cardinality(self, files: T.Sequence[Path]): + if len(files) > 1: + for parser_opts in self.options["geotag_sources_options"]: + pattern = parser_opts.get("pattern") + if pattern and "%" not in pattern: + raise exceptions.MapillaryUserError( + "Multiple video files found: Geotag source pattern for source %s must include filename placeholders", + parser_opts["source"], + ) + + @staticmethod + def _sanitize_points(points: T.Sequence[geo.Point]) -> T.Sequence[geo.Point]: + """ + Deduplicates points, when possible removes noisy ones, and checks + against stationary videos + """ + + if not points: + raise exceptions.MapillaryVideoGPSNotFoundError( + "No GPS data found in the given sources" + ) + + points = geo.extend_deduplicate_points(points) + + if all(isinstance(p, geo.PointWithFix) for p in points): + points = T.cast( + T.Sequence[geo.Point], + gpmf_gps_filter.remove_noisy_points( + T.cast(T.Sequence[geo.PointWithFix], points) + ), + ) + if not points: + raise exceptions.MapillaryGPSNoiseError("GPS is too noisy") + + stationary = video_utils.is_video_stationary( + geo.get_max_distance_from_start([(p.lat, p.lon) for p in points]) + ) + + if stationary: + raise exceptions.MapillaryStationaryVideoError("Stationary video") + + return points + + @staticmethod + def _rebase_times(points: T.Sequence[geo.Point]): + """ + Make point times start from 0 + """ + if points: + first_timestamp = points[0].time + for p in points: + p.time = p.time - first_timestamp + return points diff --git a/mapillary_tools/video_data_extraction/extractors/base_parser.py b/mapillary_tools/video_data_extraction/extractors/base_parser.py new file mode 100644 index 00000000..d0a29ee6 --- /dev/null +++ b/mapillary_tools/video_data_extraction/extractors/base_parser.py @@ -0,0 +1,73 @@ +import abc +import functools +import logging +import os +import sys +import typing as T +from pathlib import Path + +from mapillary_tools import geo +from mapillary_tools.video_data_extraction.cli_options import ( + CliOptions, + CliParserOptions, +) + +LOG = logging.getLogger(__name__) + + +class BaseParser(metaclass=abc.ABCMeta): + videoPath: Path + options: CliOptions + parserOptions: CliParserOptions + + def __init__( + self, video_path: Path, options: CliOptions, parser_options: CliParserOptions + ) -> None: + self.videoPath = video_path + self.options = options + self.parserOptions = parser_options + + @property + @abc.abstractmethod + def default_source_pattern(self) -> str: + raise NotImplementedError + + @property + @abc.abstractmethod + def parser_label(self) -> str: + raise NotImplementedError + + @property + @abc.abstractmethod + def must_rebase_times_to_zero(self) -> bool: + raise NotImplementedError + + @abc.abstractmethod + def extract_points(self) -> T.Sequence[geo.Point]: + raise NotImplementedError + + @abc.abstractmethod + def extract_make(self) -> T.Optional[str]: + raise NotImplementedError + + @abc.abstractmethod + def extract_model(self) -> T.Optional[str]: + raise NotImplementedError + + @functools.cached_property + def geotag_source_path(self) -> T.Optional[Path]: + video_dir = self.videoPath.parent.resolve() + video_filename = self.videoPath.name + video_basename, video_ext = os.path.splitext(video_filename) + pattern = self.parserOptions.get("pattern") or self.default_source_pattern + + replaced = Path( + pattern.replace("%f", video_filename) + .replace("%g", video_basename) + .replace("%e", video_ext) + ) + abs_path = ( + replaced if replaced.is_absolute() else Path.joinpath(video_dir, replaced) + ).resolve() + + return abs_path if abs_path.is_file() else None diff --git a/mapillary_tools/video_data_extraction/extractors/blackvue_parser.py b/mapillary_tools/video_data_extraction/extractors/blackvue_parser.py new file mode 100644 index 00000000..7f088677 --- /dev/null +++ b/mapillary_tools/video_data_extraction/extractors/blackvue_parser.py @@ -0,0 +1,33 @@ +import typing as T + +from mapillary_tools import geo +from mapillary_tools.geotag import blackvue_parser, simple_mp4_parser +from mapillary_tools.video_data_extraction.extractors.base_parser import BaseParser + + +class BlackVueParser(BaseParser): + default_source_pattern = "%f" + must_rebase_times_to_zero = False + parser_label = "blackvue" + + pointsFound: bool = False + + def extract_points(self) -> T.Sequence[geo.Point]: + source_path = self.geotag_source_path + if not source_path: + return [] + with source_path.open("rb") as fp: + try: + points = blackvue_parser.extract_points(fp) or [] + self.pointsFound = len(points) > 0 + return points + except simple_mp4_parser.ParsingError: + return [] + + def extract_make(self) -> T.Optional[str]: + # If no points were found, assume this is not a BlackVue + return "Blackvue" if self.pointsFound else None + + def extract_model(self) -> T.Optional[str]: + with self.videoPath.open("rb") as fp: + return blackvue_parser.extract_camera_model(fp) or None diff --git a/mapillary_tools/video_data_extraction/extractors/camm_parser.py b/mapillary_tools/video_data_extraction/extractors/camm_parser.py new file mode 100644 index 00000000..98e0b8d6 --- /dev/null +++ b/mapillary_tools/video_data_extraction/extractors/camm_parser.py @@ -0,0 +1,41 @@ +import functools +import typing as T + +from mapillary_tools import geo +from mapillary_tools.geotag import camm_parser, simple_mp4_parser +from mapillary_tools.video_data_extraction.extractors.base_parser import BaseParser + + +class CammParser(BaseParser): + default_source_pattern = "%f" + must_rebase_times_to_zero = False + parser_label = "camm" + + @functools.cached_property + def __camera_info(self) -> T.Tuple[str, str]: + with self.videoPath.open("rb") as fp: + return camm_parser.extract_camera_make_and_model(fp) + + def extract_points(self) -> T.Sequence[geo.Point]: + source_path = self.geotag_source_path + if not source_path: + return [] + with source_path.open("rb") as fp: + try: + return camm_parser.extract_points(fp) or [] + except simple_mp4_parser.ParsingError: + return [] + + def extract_make(self) -> T.Optional[str]: + source_path = self.geotag_source_path + if not source_path: + return None + with source_path.open("rb") as fp: + return self.__camera_info[0] or None + + def extract_model(self) -> T.Optional[str]: + source_path = self.geotag_source_path + if not source_path: + return None + with source_path.open("rb") as fp: + return self.__camera_info[1] or None diff --git a/mapillary_tools/video_data_extraction/extractors/exiftool_runtime_parser.py b/mapillary_tools/video_data_extraction/extractors/exiftool_runtime_parser.py new file mode 100644 index 00000000..33f3ff98 --- /dev/null +++ b/mapillary_tools/video_data_extraction/extractors/exiftool_runtime_parser.py @@ -0,0 +1,50 @@ +import subprocess +import typing as T +from pathlib import Path + +from mapillary_tools import constants, geo +from mapillary_tools.video_data_extraction.cli_options import ( + CliOptions, + CliParserOptions, +) +from mapillary_tools.video_data_extraction.extractors.base_parser import BaseParser +from mapillary_tools.video_data_extraction.extractors.exiftool_xml_parser import ( + ExiftoolXmlParser, +) + + +class ExiftoolRuntimeParser(BaseParser): + """ + Wrapper around ExiftoolRdfParser that executes exiftool + """ + + exiftoolXmlParser: ExiftoolXmlParser + + default_source_pattern = "%f" + must_rebase_times_to_zero = True + parser_label = "exiftool_runtime" + + def __init__( + self, video_path: Path, options: CliOptions, parser_options: CliParserOptions + ): + super().__init__(video_path, options, parser_options) + + args = ( + f"{constants.EXIFTOOL_PATH} -q -r -n -ee -api LargeFileSupport=1 -X {self.geotag_source_path}" + ).split(" ") + xml_content = subprocess.run(args, capture_output=True, text=True).stdout + + self.exiftoolXmlParser = ExiftoolXmlParser( + video_path, options, parser_options, xml_content + ) + + def extract_points(self) -> T.Sequence[geo.Point]: + return self.exiftoolXmlParser.extract_points() if self.exiftoolXmlParser else [] + + def extract_make(self) -> T.Optional[str]: + return self.exiftoolXmlParser.extract_make() if self.exiftoolXmlParser else None + + def extract_model(self) -> T.Optional[str]: + return ( + self.exiftoolXmlParser.extract_model() if self.exiftoolXmlParser else None + ) diff --git a/mapillary_tools/video_data_extraction/extractors/exiftool_xml_parser.py b/mapillary_tools/video_data_extraction/extractors/exiftool_xml_parser.py new file mode 100644 index 00000000..64b83c28 --- /dev/null +++ b/mapillary_tools/video_data_extraction/extractors/exiftool_xml_parser.py @@ -0,0 +1,51 @@ +import typing as T +import xml.etree.ElementTree as ET + +from pathlib import Path + +from mapillary_tools import geo +from mapillary_tools.exiftool_read import EXIFTOOL_NAMESPACES +from mapillary_tools.exiftool_read_video import ExifToolReadVideo +from mapillary_tools.geotag.geotag_videos_from_exiftool_video import _DESCRIPTION_TAG +from mapillary_tools.video_data_extraction.cli_options import ( + CliOptions, + CliParserOptions, +) +from mapillary_tools.video_data_extraction.extractors.base_parser import BaseParser + + +class ExiftoolXmlParser(BaseParser): + default_source_pattern = "%g.xml" + must_rebase_times_to_zero = True + parser_label = "exiftool_xml" + + exifToolReadVideo: ExifToolReadVideo + + def __init__( + self, + video_path: Path, + options: CliOptions, + parser_options: CliParserOptions, + xml_content: T.Optional[str] = None, + ) -> None: + super().__init__(video_path, options, parser_options) + + if xml_content: + etree = ET.fromstring(xml_content) + else: + xml_path = self.geotag_source_path + if not xml_path: + return None + etree = ET.parse(xml_path).getroot() + + element = next(etree.iterfind(_DESCRIPTION_TAG, namespaces=EXIFTOOL_NAMESPACES)) + self.exifToolReadVideo = ExifToolReadVideo(ET.ElementTree(element)) + + def extract_points(self) -> T.Sequence[geo.Point]: + return self.exifToolReadVideo.extract_gps_track() + + def extract_make(self) -> T.Optional[str]: + return self.exifToolReadVideo.extract_make() + + def extract_model(self) -> T.Optional[str]: + return self.exifToolReadVideo.extract_model() diff --git a/mapillary_tools/video_data_extraction/extractors/generic_video_parser.py b/mapillary_tools/video_data_extraction/extractors/generic_video_parser.py new file mode 100644 index 00000000..d48418b4 --- /dev/null +++ b/mapillary_tools/video_data_extraction/extractors/generic_video_parser.py @@ -0,0 +1,57 @@ +import typing as T +from pathlib import Path + +from mapillary_tools import geo +from mapillary_tools.video_data_extraction.cli_options import ( + CliOptions, + CliParserOptions, +) +from mapillary_tools.video_data_extraction.extractors.base_parser import BaseParser +from mapillary_tools.video_data_extraction.extractors.blackvue_parser import ( + BlackVueParser, +) +from mapillary_tools.video_data_extraction.extractors.camm_parser import CammParser +from mapillary_tools.video_data_extraction.extractors.gopro_parser import GoProParser + + +class GenericVideoParser(BaseParser): + """ + Wrapper around the three native video parsers. It will try to execute them + in the order camm-gopro-blackvue, like the previous implementation + """ + + parsers: T.Sequence[BaseParser] = [] + + default_source_pattern = "%f" + must_rebase_times_to_zero = False + parser_label = "video" + + def __init__( + self, video_path: Path, options: CliOptions, parser_options: CliParserOptions + ) -> None: + super().__init__(video_path, options, parser_options) + camm_parser = CammParser(video_path, options, parser_options) + gopro_parser = GoProParser(video_path, options, parser_options) + blackvue_parser = BlackVueParser(video_path, options, parser_options) + self.parsers = [camm_parser, gopro_parser, blackvue_parser] + + def extract_points(self) -> T.Sequence[geo.Point]: + for parser in self.parsers: + points = parser.extract_points() + if points: + return points + return [] + + def extract_make(self) -> T.Optional[str]: + for parser in self.parsers: + make = parser.extract_make() + if make: + return make + return None + + def extract_model(self) -> T.Optional[str]: + for parser in self.parsers: + model = parser.extract_model() + if model: + return model + return None diff --git a/mapillary_tools/video_data_extraction/extractors/gopro_parser.py b/mapillary_tools/video_data_extraction/extractors/gopro_parser.py new file mode 100644 index 00000000..3a4c3efd --- /dev/null +++ b/mapillary_tools/video_data_extraction/extractors/gopro_parser.py @@ -0,0 +1,36 @@ +import typing as T + +from mapillary_tools import geo +from mapillary_tools.geotag import gpmf_parser, simple_mp4_parser +from mapillary_tools.video_data_extraction.extractors.base_parser import BaseParser + + +class GoProParser(BaseParser): + default_source_pattern = "%f" + must_rebase_times_to_zero = False + parser_label = "gopro" + + pointsFound: bool = False + + def extract_points(self) -> T.Sequence[geo.Point]: + source_path = self.geotag_source_path + if not source_path: + return [] + with source_path.open("rb") as fp: + try: + points = gpmf_parser.extract_points(fp) or [] + self.pointsFound = len(points) > 0 + return points + except simple_mp4_parser.ParsingError: + return [] + + def extract_make(self) -> T.Optional[str]: + # If no points were found, assume this is not a GoPro + return "GoPro" if self.pointsFound else None + + def extract_model(self) -> T.Optional[str]: + source_path = self.geotag_source_path + if not source_path: + return None + with source_path.open("rb") as fp: + return gpmf_parser.extract_camera_model(fp) or None diff --git a/mapillary_tools/video_data_extraction/extractors/gpx_parser.py b/mapillary_tools/video_data_extraction/extractors/gpx_parser.py new file mode 100644 index 00000000..d38347b8 --- /dev/null +++ b/mapillary_tools/video_data_extraction/extractors/gpx_parser.py @@ -0,0 +1,29 @@ +import typing as T + +from mapillary_tools import geo +from mapillary_tools.geotag import geotag_images_from_gpx_file +from mapillary_tools.video_data_extraction.extractors.base_parser import BaseParser + + +class GpxParser(BaseParser): + default_source_pattern = "%g.gpx" + must_rebase_times_to_zero = True + parser_label = "gpx" + + def extract_points(self) -> T.Sequence[geo.Point]: + path = self.geotag_source_path + if not path: + return [] + try: + tracks = geotag_images_from_gpx_file.parse_gpx(path) + except Exception as e: + return [] + + points: T.Sequence[geo.Point] = sum(tracks, []) + return points + + def extract_make(self) -> T.Optional[str]: + return None + + def extract_model(self) -> T.Optional[str]: + return None diff --git a/mapillary_tools/video_data_extraction/extractors/nmea_parser.py b/mapillary_tools/video_data_extraction/extractors/nmea_parser.py new file mode 100644 index 00000000..22805dea --- /dev/null +++ b/mapillary_tools/video_data_extraction/extractors/nmea_parser.py @@ -0,0 +1,24 @@ +import typing as T + +from mapillary_tools import geo +from mapillary_tools.geotag import geotag_images_from_nmea_file +from mapillary_tools.video_data_extraction.extractors.base_parser import BaseParser + + +class NmeaParser(BaseParser): + default_source_pattern = "%g.nmea" + must_rebase_times_to_zero = True + parser_label = "nmea" + + def extract_points(self) -> T.Sequence[geo.Point]: + source_path = self.geotag_source_path + if not source_path: + return [] + points = geotag_images_from_nmea_file.get_lat_lon_time_from_nmea(source_path) + return points + + def extract_make(self) -> T.Optional[str]: + return None + + def extract_model(self) -> T.Optional[str]: + return None diff --git a/mapillary_tools/video_data_extraction/video_data_parser_factory.py b/mapillary_tools/video_data_extraction/video_data_parser_factory.py new file mode 100644 index 00000000..df509ea3 --- /dev/null +++ b/mapillary_tools/video_data_extraction/video_data_parser_factory.py @@ -0,0 +1,46 @@ +import typing as T +from pathlib import Path + +from mapillary_tools.video_data_extraction.cli_options import CliOptions + +from mapillary_tools.video_data_extraction.extractors.base_parser import BaseParser + +from mapillary_tools.video_data_extraction.extractors.blackvue_parser import ( + BlackVueParser, +) +from mapillary_tools.video_data_extraction.extractors.camm_parser import CammParser +from mapillary_tools.video_data_extraction.extractors.exiftool_runtime_parser import ( + ExiftoolRuntimeParser, +) +from mapillary_tools.video_data_extraction.extractors.exiftool_xml_parser import ( + ExiftoolXmlParser, +) +from mapillary_tools.video_data_extraction.extractors.generic_video_parser import ( + GenericVideoParser, +) +from mapillary_tools.video_data_extraction.extractors.gopro_parser import GoProParser +from mapillary_tools.video_data_extraction.extractors.gpx_parser import GpxParser +from mapillary_tools.video_data_extraction.extractors.nmea_parser import NmeaParser + + +known_parsers = { + "gpx": GpxParser, + "nmea": NmeaParser, + "exiftool_xml": ExiftoolXmlParser, + "exiftool_runtime": ExiftoolRuntimeParser, + "camm": CammParser, + "blackvue": BlackVueParser, + "gopro": GoProParser, + "video": GenericVideoParser, +} + + +def make_parsers(file: Path, options: CliOptions) -> T.Sequence[BaseParser]: + src_options = options["geotag_sources_options"] + parsers = [ + known_parsers[s["source"]](file, options, s) + for s in src_options + if s["source"] in known_parsers + ] + + return parsers diff --git a/setup.py b/setup.py index 64dae3da..74b9a348 100644 --- a/setup.py +++ b/setup.py @@ -41,8 +41,14 @@ def readme(): url="https://github.com/mapillary/mapillary_tools", author="Mapillary", license="BSD", - python_requires=">=3.6", - packages=["mapillary_tools", "mapillary_tools.commands", "mapillary_tools.geotag"], + python_requires=">=3.8", + packages=[ + "mapillary_tools", + "mapillary_tools.commands", + "mapillary_tools.geotag", + "mapillary_tools.video_data_extraction", + "mapillary_tools.video_data_extraction.extractors", + ], entry_points=""" [console_scripts] mapillary_tools=mapillary_tools.commands.__main__:main