Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: immediate reties on HTTP 412 #580

Merged
merged 5 commits into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 7 additions & 21 deletions mapillary_tools/authenticate.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@


def authenticate(
user_name: str = None,
user_email: str = None,
user_password: str = None,
jwt: str = None,
user_name: T.Optional[str] = None,
user_email: T.Optional[str] = None,
user_password: T.Optional[str] = None,
jwt: T.Optional[str] = None,
):
if user_name:
user_name = user_name.strip()
Expand Down Expand Up @@ -43,20 +43,6 @@ def authenticate(
config.update_config(user_name, user_items)


class HTTPError(requests.HTTPError):
pass


def wrap_http_exception(ex: requests.HTTPError):
resp = ex.response
lines = [
f"{ex.request.method} {resp.url}",
f"> HTTP Status: {ex.response.status_code}",
f"{ex.response.text}",
]
return HTTPError("\n".join(lines))


def prompt_user_for_user_items(user_name: str) -> types.UserItem:
print(f"Sign in for user {user_name}")
user_email = input("Enter your Mapillary user email: ")
Expand All @@ -71,12 +57,12 @@ def prompt_user_for_user_items(user_name: str) -> types.UserItem:
if subcode in [1348028, 1348092, 3404005, 1348131]:
title = r.get("error", {}).get("error_user_title")
message = r.get("error", {}).get("error_user_msg")
LOG.error(f"{title}: {message}")
LOG.error(f"%s: %s", title, message)
return prompt_user_for_user_items(user_name)
else:
raise wrap_http_exception(ex)
raise ex
else:
raise wrap_http_exception(ex)
raise ex

data = resp.json()
upload_token = T.cast(str, data.get("access_token"))
Expand Down
1 change: 1 addition & 0 deletions mapillary_tools/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
GOPRO_GPS_FIXES: T.Set[int] = set(
int(fix) for fix in os.getenv(_ENV_PREFIX + "GOPRO_GPS_FIXES", "2,3").split(",")
)
MAX_UPLOAD_RETRIES: int = int(os.getenv(_ENV_PREFIX + "MAX_UPLOAD_RETRIES", 200))

# GPS precision, in meters, is used to filter outliers
GOPRO_GPS_PRECISION = float(os.getenv(_ENV_PREFIX + "GOPRO_GPS_PRECISION", 15))
32 changes: 27 additions & 5 deletions mapillary_tools/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,20 @@ def __init__(self, inner_ex) -> None:
super().__init__(str(inner_ex))


class UploadHTTPError(Exception):
pass


def wrap_http_exception(ex: requests.HTTPError):
resp = ex.response
lines = [
f"{ex.request.method} {resp.url}",
f"> HTTP Status: {ex.response.status_code}",
f"{resp.content}",
]
return UploadHTTPError("\n".join(lines))


def read_image_descriptions(desc_path: str) -> T.List[types.ImageDescriptionFile]:
descs: T.List[types.ImageDescriptionFile] = []

Expand Down Expand Up @@ -129,15 +143,18 @@ def fetch_user_items(
f"Found multiple Mapillary accounts. Please specify one with --user_name"
)
else:
user_items = authenticate.authenticate_user(user_name)
try:
user_items = authenticate.authenticate_user(user_name)
except requests.HTTPError as exc:
raise wrap_http_exception(exc) from exc

if organization_key is not None:
try:
resp = api_v4.fetch_organization(
user_items["user_upload_token"], organization_key
)
except requests.HTTPError as ex:
raise authenticate.wrap_http_exception(ex) from ex
raise wrap_http_exception(ex) from ex
org = resp.json()
LOG.info("Uploading to organization: %s", json.dumps(org))
user_items = T.cast(
Expand Down Expand Up @@ -440,7 +457,7 @@ def _api_logging_finished(user_items: types.UserItem, summary: T.Dict):
LOG.warning(
"Error from API Logging for action %s",
action,
exc_info=uploader.upload_api_v4.wrap_http_exception(exc),
exc_info=wrap_http_exception(exc),
)
except:
LOG.warning("Error from API Logging for action %s", action, exc_info=True)
Expand All @@ -460,7 +477,7 @@ def _api_logging_failed(user_items: types.UserItem, payload: T.Dict, exc: Except
payload_with_reason,
)
except requests.HTTPError as exc:
wrapped_exc = uploader.upload_api_v4.wrap_http_exception(exc)
wrapped_exc = wrap_http_exception(exc)
LOG.warning(
"Error from API Logging for action %s",
action,
Expand Down Expand Up @@ -625,7 +642,10 @@ def upload(
except UploadError as ex:
if not dry_run:
_api_logging_failed(mly_uploader.user_items, _summarize(stats), ex.inner_ex)
raise ex
if isinstance(ex.inner_ex, requests.HTTPError):
raise wrap_http_exception(ex.inner_ex) from ex.inner_ex
else:
raise ex

if stats:
if not dry_run:
Expand Down Expand Up @@ -686,6 +706,7 @@ def _convert_and_upload_camm(
)
except Exception as ex:
raise UploadError(ex) from ex

LOG.debug(f"Uploaded to cluster: %s", cluster_id)


Expand Down Expand Up @@ -782,6 +803,7 @@ def _upload_raw_camm(
)
except Exception as ex:
raise UploadError(ex) from ex

LOG.debug(f"Uploaded to cluster: %s", cluster_id)


Expand Down
14 changes: 0 additions & 14 deletions mapillary_tools/upload_api_v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,6 @@
FileType = Literal["zip", "mly_blackvue_video", "mly_camm_video"]


class UploadHTTPError(Exception):
pass


def wrap_http_exception(ex: requests.HTTPError):
resp = ex.response
lines = [
f"{ex.request.method} {resp.url}",
f"> HTTP Status: {ex.response.status_code}",
f"{ex.response.text}",
]
return UploadHTTPError("\n".join(lines))


def _sanitize_headers(headers: T.Dict):
return {
k: v
Expand Down
68 changes: 26 additions & 42 deletions mapillary_tools/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
else:
from typing_extensions import Literal

from . import exif_write, types, upload_api_v4, utils
from . import constants, exif_write, types, upload_api_v4, utils


LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -350,6 +350,16 @@ def _zip_sequence_fp(
return _hash_zipfile(ziph)


def is_immediate_retry(ex: Exception):
if isinstance(ex, requests.HTTPError) and ex.response.status_code == 412:
try:
resp = ex.response.json()
except json.JSONDecodeError:
return False
# resp: {"debug_info":{"retriable":true,"type":"OffsetInvalidError","message":"Request starting offset is invalid"}}
return resp.get("debug_info", {}).get("retriable", False)


def is_retriable_exception(ex: Exception):
if isinstance(ex, (requests.ConnectionError, requests.Timeout)):
return True
Expand Down Expand Up @@ -398,11 +408,9 @@ def _reset_retries(_, __):
if emitter:
emitter.emit("upload_start", mutable_payload)

MAX_RETRIES = 200

while True:
begin_offset: T.Optional[int] = None
fp.seek(0, io.SEEK_SET)
begin_offset: T.Optional[int] = None
try:
begin_offset = upload_service.fetch_offset()
upload_service.callbacks = [_reset_retries]
Expand All @@ -415,64 +423,40 @@ def _reset_retries(_, __):
)
file_handle = upload_service.upload(fp, offset=begin_offset)
except Exception as ex:
if retries < MAX_RETRIES and is_retriable_exception(ex):
# for logging and degugging so we keep it quiet
try:
server_offset = upload_service.fetch_offset()
except Exception:
server_offset = None

# throw fatal error if the offset fetched from the server does not move as expected
expected_offset = mutable_payload.get("offset")
if expected_offset is not None and server_offset is not None:
# we don't expect expected_offset == server_offset
# because the chunk might be partially uploaded (that's too bad)
if not (expected_offset <= server_offset):
LOG.fatal(
"Upload server offset %s does not match expected offset %s",
server_offset,
expected_offset,
)
if isinstance(ex, requests.HTTPError):
raise upload_api_v4.wrap_http_exception(ex) from ex
else:
raise ex

if retries < constants.MAX_UPLOAD_RETRIES and is_retriable_exception(ex):
if emitter:
emitter.emit("upload_interrupted", mutable_payload)

LOG.warning(
# use %s instead of %d because offset could be None
f"Error uploading chunk_size %d at server_offset %s (begin_offset %s): %s: %s",
f"Error uploading chunk_size %d at begin_offset %s: %s: %s",
upload_service.chunk_size,
server_offset,
begin_offset,
ex.__class__.__name__,
str(ex),
)

retries += 1
sleep_for = min(2**retries, 16)
if is_immediate_retry(ex):
sleep_for = 0
else:
sleep_for = min(2**retries, 16)
LOG.info(
"Retrying in %d seconds (%d/%d)", sleep_for, retries, MAX_RETRIES
"Retrying in %d seconds (%d/%d)",
sleep_for,
retries,
constants.MAX_UPLOAD_RETRIES,
)
time.sleep(sleep_for)
if sleep_for:
time.sleep(sleep_for)
else:
if isinstance(ex, requests.HTTPError):
raise upload_api_v4.wrap_http_exception(ex) from ex
else:
raise ex
raise ex
else:
break

if emitter:
emitter.emit("upload_end", mutable_payload)

# TODO: retry here
try:
cluster_id = upload_service.finish(file_handle)
except requests.HTTPError as ex:
raise upload_api_v4.wrap_http_exception(ex) from ex
cluster_id = upload_service.finish(file_handle)

if emitter:
mutable_payload["cluster_id"] = cluster_id
Expand Down
16 changes: 11 additions & 5 deletions tests/cli/upload_api_v4.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,22 @@
import tqdm
from mapillary_tools import upload

from mapillary_tools.upload_api_v4 import (
DEFAULT_CHUNK_SIZE,
UploadService,
wrap_http_exception,
)
from mapillary_tools.upload_api_v4 import DEFAULT_CHUNK_SIZE, UploadService


LOG = logging.getLogger("mapillary_tools")


def wrap_http_exception(ex: requests.HTTPError):
resp = ex.response
lines = [
f"{ex.request.method} {resp.url}",
f"> HTTP Status: {ex.response.status_code}",
f"{resp.content}",
]
return Exception("\n".join(lines))


def configure_logger(logger: logging.Logger, stream=None) -> None:
formatter = logging.Formatter("%(asctime)s - %(levelname)-7s - %(message)s")
handler = logging.StreamHandler(stream)
Expand Down