Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: safely resume interrupted downloads #294

Merged
merged 16 commits into from
Feb 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions google/resumable_media/_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ def __init__(
media_url, stream=stream, start=start, end=end, headers=headers
)
self.checksum = checksum
self._bytes_downloaded = 0
self._expected_checksum = None
self._checksum_object = None
self._object_generation = None

def _prepare_request(self):
"""Prepare the contents of an HTTP request.
Expand Down
67 changes: 67 additions & 0 deletions google/resumable_media/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
import random
import warnings

from urllib.parse import parse_qs
from urllib.parse import urlencode
from urllib.parse import urlsplit
from urllib.parse import urlunsplit

from google.resumable_media import common


Expand All @@ -33,6 +38,7 @@
"implementation. Python 3 has a faster implementation, `google-crc32c`, "
"which will be used if it is installed."
)
_GENERATION_HEADER = "x-goog-generation"
_HASH_HEADER = "x-goog-hash"
_MISSING_CHECKSUM = """\
No {checksum_type} checksum was returned from the service while downloading {}
Expand Down Expand Up @@ -302,6 +308,67 @@ def _get_checksum_object(checksum_type):
raise ValueError("checksum must be ``'md5'``, ``'crc32c'`` or ``None``")


def _parse_generation_header(response, get_headers):
"""Parses the generation header from an ``X-Goog-Generation`` value.

Args:
response (~requests.Response): The HTTP response object.
get_headers (callable: response->dict): returns response headers.

Returns:
Optional[long]: The object generation from the response, if it
can be detected from the ``X-Goog-Generation`` header; otherwise, None.
"""
headers = get_headers(response)
object_generation = headers.get(_GENERATION_HEADER, None)

if object_generation is None:
return None
else:
return int(object_generation)


def _get_generation_from_url(media_url):
"""Retrieve the object generation query param specified in the media url.

Args:
media_url (str): The URL containing the media to be downloaded.

Returns:
long: The object generation from the media url if exists; otherwise, None.
"""

_, _, _, query, _ = urlsplit(media_url)
query_params = parse_qs(query)
object_generation = query_params.get("generation", None)

if object_generation is None:
return None
else:
return int(object_generation[0])


def add_query_parameters(media_url, query_params):
"""Add query parameters to a base url.

Args:
media_url (str): The URL containing the media to be downloaded.
query_params (dict): Names and values of the query parameters to add.

Returns:
str: URL with additional query strings appended.
"""

if len(query_params) == 0:
return media_url

scheme, netloc, path, query, frag = urlsplit(media_url)
params = parse_qs(query)
new_params = {**params, **query_params}
query = urlencode(new_params, doseq=True)
return urlunsplit((scheme, netloc, path, query, frag))


class _DoNothingHash(object):
"""Do-nothing hash object.

Expand Down
126 changes: 102 additions & 24 deletions google/resumable_media/requests/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,22 @@ def _write_to_stream(self, response):
checksum doesn't agree with server-computed checksum.
"""

# `_get_expected_checksum()` may return None even if a checksum was
# requested, in which case it will emit an info log _MISSING_CHECKSUM.
# If an invalid checksum type is specified, this will raise ValueError.
expected_checksum, checksum_object = _helpers._get_expected_checksum(
response, self._get_headers, self.media_url, checksum_type=self.checksum
)
# Retrieve the expected checksum only once for the download request,
# then compute and validate the checksum when the full download completes.
# Retried requests are range requests, and there's no way to detect
# data corruption for that byte range alone.
if self._expected_checksum is None and self._checksum_object is None:
# `_get_expected_checksum()` may return None even if a checksum was
# requested, in which case it will emit an info log _MISSING_CHECKSUM.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What causes this case to happen? Transcoding?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is due to retried requests being range requests. For range requests, as noted here, there's no way to detect data corruption for that byte range alone.

Therefore, here we retrieve the expected checksum/checksum object only once for the initial download request. Then we calculate and validate the checksum when the download completes.

# If an invalid checksum type is specified, this will raise ValueError.
expected_checksum, checksum_object = _helpers._get_expected_checksum(
response, self._get_headers, self.media_url, checksum_type=self.checksum
)
self._expected_checksum = expected_checksum
self._checksum_object = checksum_object
else:
expected_checksum = self._expected_checksum
checksum_object = self._checksum_object

with response:
# NOTE: In order to handle compressed streams gracefully, we try
Expand All @@ -104,6 +114,7 @@ def _write_to_stream(self, response):
)
for chunk in body_iter:
self._stream.write(chunk)
self._bytes_downloaded += len(chunk)
local_checksum_object.update(chunk)

if expected_checksum is not None:
Expand Down Expand Up @@ -150,7 +161,7 @@ def consume(
ValueError: If the current :class:`Download` has already
finished.
"""
method, url, payload, headers = self._prepare_request()
method, _, payload, headers = self._prepare_request()
# NOTE: We assume "payload is None" but pass it along anyway.
request_kwargs = {
"data": payload,
Expand All @@ -160,10 +171,39 @@ def consume(
if self._stream is not None:
request_kwargs["stream"] = True

# Assign object generation if generation is specified in the media url.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this happen via a user specifying a generation on the object? Were we not respecting this previously?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep this would happen via a user specifying a generation on the object. Previously, we've been respecting that only through download.media_url

A property download._object_generation is added. It records the object generation from either (1) generation query param from the media_url, or (2) the object generation from the initial response header. This specific line of code does (1) and retrieves it from the media_url

P.S. It's tricky in how limited information is passed from python-storage to resumable-media-python. A resumable-media-python download instance only knows the specified object generation from its media_url, and the "object" itself isn't pertained in a download.

if self._object_generation is None:
self._object_generation = _helpers._get_generation_from_url(self.media_url)

# Wrap the request business logic in a function to be retried.
def retriable_request():
url = self.media_url

# To restart an interrupted download, read from the offset of last byte
# received using a range request, and set object generation query param.
if self._bytes_downloaded > 0:
_download.add_bytes_range(
self._bytes_downloaded, self.end, self._headers
)
request_kwargs["headers"] = self._headers

# Set object generation query param to ensure the same object content is requested.
if (
self._object_generation is not None
and _helpers._get_generation_from_url(self.media_url) is None
):
query_param = {"generation": self._object_generation}
url = _helpers.add_query_parameters(self.media_url, query_param)

result = transport.request(method, url, **request_kwargs)

# If a generation hasn't been specified, and this is the first response we get, let's record the
# generation. In future requests we'll specify the generation query param to avoid data races.
if self._object_generation is None:
self._object_generation = _helpers._parse_generation_header(
result, self._get_headers
)

self._process_response(result)

if self._stream is not None:
Expand Down Expand Up @@ -223,20 +263,30 @@ def _write_to_stream(self, response):
~google.resumable_media.common.DataCorruption: If the download's
checksum doesn't agree with server-computed checksum.
"""

# `_get_expected_checksum()` may return None even if a checksum was
# requested, in which case it will emit an info log _MISSING_CHECKSUM.
# If an invalid checksum type is specified, this will raise ValueError.
expected_checksum, checksum_object = _helpers._get_expected_checksum(
response, self._get_headers, self.media_url, checksum_type=self.checksum
)
# Retrieve the expected checksum only once for the download request,
# then compute and validate the checksum when the full download completes.
# Retried requests are range requests, and there's no way to detect
# data corruption for that byte range alone.
if self._expected_checksum is None and self._checksum_object is None:
# `_get_expected_checksum()` may return None even if a checksum was
# requested, in which case it will emit an info log _MISSING_CHECKSUM.
# If an invalid checksum type is specified, this will raise ValueError.
expected_checksum, checksum_object = _helpers._get_expected_checksum(
response, self._get_headers, self.media_url, checksum_type=self.checksum
)
self._expected_checksum = expected_checksum
self._checksum_object = checksum_object
else:
expected_checksum = self._expected_checksum
checksum_object = self._checksum_object

with response:
body_iter = response.raw.stream(
_request_helpers._SINGLE_GET_CHUNK_SIZE, decode_content=False
)
for chunk in body_iter:
self._stream.write(chunk)
self._bytes_downloaded += len(chunk)
checksum_object.update(chunk)
response._content_consumed = True

Expand Down Expand Up @@ -285,19 +335,47 @@ def consume(
ValueError: If the current :class:`Download` has already
finished.
"""
method, url, payload, headers = self._prepare_request()
method, _, payload, headers = self._prepare_request()
# NOTE: We assume "payload is None" but pass it along anyway.
request_kwargs = {
"data": payload,
"headers": headers,
"timeout": timeout,
"stream": True,
}

# Assign object generation if generation is specified in the media url.
if self._object_generation is None:
self._object_generation = _helpers._get_generation_from_url(self.media_url)

# Wrap the request business logic in a function to be retried.
def retriable_request():
# NOTE: We assume "payload is None" but pass it along anyway.
result = transport.request(
method,
url,
data=payload,
headers=headers,
stream=True,
timeout=timeout,
)
url = self.media_url

# To restart an interrupted download, read from the offset of last byte
# received using a range request, and set object generation query param.
if self._bytes_downloaded > 0:
_download.add_bytes_range(
self._bytes_downloaded, self.end, self._headers
)
request_kwargs["headers"] = self._headers

# Set object generation query param to ensure the same object content is requested.
if (
self._object_generation is not None
and _helpers._get_generation_from_url(self.media_url) is None
):
query_param = {"generation": self._object_generation}
url = _helpers.add_query_parameters(self.media_url, query_param)

result = transport.request(method, url, **request_kwargs)

# If a generation hasn't been specified, and this is the first response we get, let's record the
# generation. In future requests we'll specify the generation query param to avoid data races.
if self._object_generation is None:
self._object_generation = _helpers._parse_generation_header(
result, self._get_headers
)

self._process_response(result)

Expand Down
Loading