diff --git a/mapillary_tools/authenticate.py b/mapillary_tools/authenticate.py index a7ac8f2b..c22c0635 100644 --- a/mapillary_tools/authenticate.py +++ b/mapillary_tools/authenticate.py @@ -12,10 +12,10 @@ def authenticate( - user_name: str = None, - user_email: str = None, - user_password: str = None, - jwt: str = None, + user_name: T.Optional[str] = None, + user_email: T.Optional[str] = None, + user_password: T.Optional[str] = None, + jwt: T.Optional[str] = None, ): if user_name: user_name = user_name.strip() @@ -43,20 +43,6 @@ def authenticate( config.update_config(user_name, user_items) -class HTTPError(requests.HTTPError): - pass - - -def wrap_http_exception(ex: requests.HTTPError): - resp = ex.response - lines = [ - f"{ex.request.method} {resp.url}", - f"> HTTP Status: {ex.response.status_code}", - f"{ex.response.text}", - ] - return HTTPError("\n".join(lines)) - - def prompt_user_for_user_items(user_name: str) -> types.UserItem: print(f"Sign in for user {user_name}") user_email = input("Enter your Mapillary user email: ") @@ -71,12 +57,12 @@ def prompt_user_for_user_items(user_name: str) -> types.UserItem: if subcode in [1348028, 1348092, 3404005, 1348131]: title = r.get("error", {}).get("error_user_title") message = r.get("error", {}).get("error_user_msg") - LOG.error(f"{title}: {message}") + LOG.error(f"%s: %s", title, message) return prompt_user_for_user_items(user_name) else: - raise wrap_http_exception(ex) + raise ex else: - raise wrap_http_exception(ex) + raise ex data = resp.json() upload_token = T.cast(str, data.get("access_token")) diff --git a/mapillary_tools/constants.py b/mapillary_tools/constants.py index 81cf09fb..5dd3830a 100644 --- a/mapillary_tools/constants.py +++ b/mapillary_tools/constants.py @@ -32,6 +32,7 @@ GOPRO_GPS_FIXES: T.Set[int] = set( int(fix) for fix in os.getenv(_ENV_PREFIX + "GOPRO_GPS_FIXES", "2,3").split(",") ) +MAX_UPLOAD_RETRIES: int = int(os.getenv(_ENV_PREFIX + "MAX_UPLOAD_RETRIES", 200)) # GPS precision, in meters, is used to filter outliers GOPRO_GPS_PRECISION = float(os.getenv(_ENV_PREFIX + "GOPRO_GPS_PRECISION", 15)) diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index a20f19f1..a79458fc 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -63,6 +63,20 @@ def __init__(self, inner_ex) -> None: super().__init__(str(inner_ex)) +class UploadHTTPError(Exception): + pass + + +def wrap_http_exception(ex: requests.HTTPError): + resp = ex.response + lines = [ + f"{ex.request.method} {resp.url}", + f"> HTTP Status: {ex.response.status_code}", + f"{resp.content}", + ] + return UploadHTTPError("\n".join(lines)) + + def read_image_descriptions(desc_path: str) -> T.List[types.ImageDescriptionFile]: descs: T.List[types.ImageDescriptionFile] = [] @@ -129,7 +143,10 @@ def fetch_user_items( f"Found multiple Mapillary accounts. Please specify one with --user_name" ) else: - user_items = authenticate.authenticate_user(user_name) + try: + user_items = authenticate.authenticate_user(user_name) + except requests.HTTPError as exc: + raise wrap_http_exception(exc) from exc if organization_key is not None: try: @@ -137,7 +154,7 @@ def fetch_user_items( user_items["user_upload_token"], organization_key ) except requests.HTTPError as ex: - raise authenticate.wrap_http_exception(ex) from ex + raise wrap_http_exception(ex) from ex org = resp.json() LOG.info("Uploading to organization: %s", json.dumps(org)) user_items = T.cast( @@ -440,7 +457,7 @@ def _api_logging_finished(user_items: types.UserItem, summary: T.Dict): LOG.warning( "Error from API Logging for action %s", action, - exc_info=uploader.upload_api_v4.wrap_http_exception(exc), + exc_info=wrap_http_exception(exc), ) except: LOG.warning("Error from API Logging for action %s", action, exc_info=True) @@ -460,7 +477,7 @@ def _api_logging_failed(user_items: types.UserItem, payload: T.Dict, exc: Except payload_with_reason, ) except requests.HTTPError as exc: - wrapped_exc = uploader.upload_api_v4.wrap_http_exception(exc) + wrapped_exc = wrap_http_exception(exc) LOG.warning( "Error from API Logging for action %s", action, @@ -625,7 +642,10 @@ def upload( except UploadError as ex: if not dry_run: _api_logging_failed(mly_uploader.user_items, _summarize(stats), ex.inner_ex) - raise ex + if isinstance(ex.inner_ex, requests.HTTPError): + raise wrap_http_exception(ex.inner_ex) from ex.inner_ex + else: + raise ex if stats: if not dry_run: @@ -686,6 +706,7 @@ def _convert_and_upload_camm( ) except Exception as ex: raise UploadError(ex) from ex + LOG.debug(f"Uploaded to cluster: %s", cluster_id) @@ -782,6 +803,7 @@ def _upload_raw_camm( ) except Exception as ex: raise UploadError(ex) from ex + LOG.debug(f"Uploaded to cluster: %s", cluster_id) diff --git a/mapillary_tools/upload_api_v4.py b/mapillary_tools/upload_api_v4.py index 414b64f6..81a43f21 100644 --- a/mapillary_tools/upload_api_v4.py +++ b/mapillary_tools/upload_api_v4.py @@ -33,20 +33,6 @@ FileType = Literal["zip", "mly_blackvue_video", "mly_camm_video"] -class UploadHTTPError(Exception): - pass - - -def wrap_http_exception(ex: requests.HTTPError): - resp = ex.response - lines = [ - f"{ex.request.method} {resp.url}", - f"> HTTP Status: {ex.response.status_code}", - f"{ex.response.text}", - ] - return UploadHTTPError("\n".join(lines)) - - def _sanitize_headers(headers: T.Dict): return { k: v diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index 3df62272..7ab5ee99 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -19,7 +19,7 @@ else: from typing_extensions import Literal -from . import exif_write, types, upload_api_v4, utils +from . import constants, exif_write, types, upload_api_v4, utils LOG = logging.getLogger(__name__) @@ -350,6 +350,16 @@ def _zip_sequence_fp( return _hash_zipfile(ziph) +def is_immediate_retry(ex: Exception): + if isinstance(ex, requests.HTTPError) and ex.response.status_code == 412: + try: + resp = ex.response.json() + except json.JSONDecodeError: + return False + # resp: {"debug_info":{"retriable":true,"type":"OffsetInvalidError","message":"Request starting offset is invalid"}} + return resp.get("debug_info", {}).get("retriable", False) + + def is_retriable_exception(ex: Exception): if isinstance(ex, (requests.ConnectionError, requests.Timeout)): return True @@ -398,11 +408,9 @@ def _reset_retries(_, __): if emitter: emitter.emit("upload_start", mutable_payload) - MAX_RETRIES = 200 - while True: - begin_offset: T.Optional[int] = None fp.seek(0, io.SEEK_SET) + begin_offset: T.Optional[int] = None try: begin_offset = upload_service.fetch_offset() upload_service.callbacks = [_reset_retries] @@ -415,53 +423,32 @@ def _reset_retries(_, __): ) file_handle = upload_service.upload(fp, offset=begin_offset) except Exception as ex: - if retries < MAX_RETRIES and is_retriable_exception(ex): - # for logging and degugging so we keep it quiet - try: - server_offset = upload_service.fetch_offset() - except Exception: - server_offset = None - - # throw fatal error if the offset fetched from the server does not move as expected - expected_offset = mutable_payload.get("offset") - if expected_offset is not None and server_offset is not None: - # we don't expect expected_offset == server_offset - # because the chunk might be partially uploaded (that's too bad) - if not (expected_offset <= server_offset): - LOG.fatal( - "Upload server offset %s does not match expected offset %s", - server_offset, - expected_offset, - ) - if isinstance(ex, requests.HTTPError): - raise upload_api_v4.wrap_http_exception(ex) from ex - else: - raise ex - + if retries < constants.MAX_UPLOAD_RETRIES and is_retriable_exception(ex): if emitter: emitter.emit("upload_interrupted", mutable_payload) - LOG.warning( # use %s instead of %d because offset could be None - f"Error uploading chunk_size %d at server_offset %s (begin_offset %s): %s: %s", + f"Error uploading chunk_size %d at begin_offset %s: %s: %s", upload_service.chunk_size, - server_offset, begin_offset, ex.__class__.__name__, str(ex), ) - retries += 1 - sleep_for = min(2**retries, 16) + if is_immediate_retry(ex): + sleep_for = 0 + else: + sleep_for = min(2**retries, 16) LOG.info( - "Retrying in %d seconds (%d/%d)", sleep_for, retries, MAX_RETRIES + "Retrying in %d seconds (%d/%d)", + sleep_for, + retries, + constants.MAX_UPLOAD_RETRIES, ) - time.sleep(sleep_for) + if sleep_for: + time.sleep(sleep_for) else: - if isinstance(ex, requests.HTTPError): - raise upload_api_v4.wrap_http_exception(ex) from ex - else: - raise ex + raise ex else: break @@ -469,10 +456,7 @@ def _reset_retries(_, __): emitter.emit("upload_end", mutable_payload) # TODO: retry here - try: - cluster_id = upload_service.finish(file_handle) - except requests.HTTPError as ex: - raise upload_api_v4.wrap_http_exception(ex) from ex + cluster_id = upload_service.finish(file_handle) if emitter: mutable_payload["cluster_id"] = cluster_id diff --git a/tests/cli/upload_api_v4.py b/tests/cli/upload_api_v4.py index 28cb279e..bd546100 100644 --- a/tests/cli/upload_api_v4.py +++ b/tests/cli/upload_api_v4.py @@ -9,16 +9,22 @@ import tqdm from mapillary_tools import upload -from mapillary_tools.upload_api_v4 import ( - DEFAULT_CHUNK_SIZE, - UploadService, - wrap_http_exception, -) +from mapillary_tools.upload_api_v4 import DEFAULT_CHUNK_SIZE, UploadService LOG = logging.getLogger("mapillary_tools") +def wrap_http_exception(ex: requests.HTTPError): + resp = ex.response + lines = [ + f"{ex.request.method} {resp.url}", + f"> HTTP Status: {ex.response.status_code}", + f"{resp.content}", + ] + return Exception("\n".join(lines)) + + def configure_logger(logger: logging.Logger, stream=None) -> None: formatter = logging.Formatter("%(asctime)s - %(levelname)-7s - %(message)s") handler = logging.StreamHandler(stream)