Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add crc32c_checksum argument to download_chunks_concurrently #1138

Merged
merged 9 commits into from
Oct 11, 2023
158 changes: 142 additions & 16 deletions google/cloud/storage/transfer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import warnings
import pickle
import copyreg
import struct
import base64
import functools

from google.api_core import exceptions
Expand All @@ -32,9 +34,11 @@
from google.cloud.storage._helpers import _api_core_retry_to_resumable_media_retry
from google.cloud.storage.retry import DEFAULT_RETRY

import google_crc32c

from google.resumable_media.requests.upload import XMLMPUContainer
from google.resumable_media.requests.upload import XMLMPUPart

from google.resumable_media.common import DataCorruption

warnings.warn(
"The module `transfer_manager` is a preview feature. Functionality and API "
Expand All @@ -44,6 +48,7 @@

TM_DEFAULT_CHUNK_SIZE = 32 * 1024 * 1024
DEFAULT_MAX_WORKERS = 8
MAX_CRC32C_ZERO_ARRAY_SIZE = 4 * 1024 * 1024
METADATA_HEADER_TRANSLATION = {
"cacheControl": "Cache-Control",
"contentDisposition": "Content-Disposition",
Expand All @@ -57,6 +62,20 @@
PROCESS = "process"
THREAD = "thread"

DOWNLOAD_CRC32C_MISMATCH_TEMPLATE = """\
Checksum mismatch while downloading:

{}

The object metadata indicated a crc32c checksum of:

{}

but the actual crc32c checksum of the downloaded contents was:

{}
"""


_cached_clients = {}

Expand Down Expand Up @@ -732,6 +751,8 @@ def download_chunks_concurrently(
deadline=None,
worker_type=PROCESS,
max_workers=DEFAULT_MAX_WORKERS,
*,
andrewsg marked this conversation as resolved.
Show resolved Hide resolved
crc32c_checksum=True,
):
"""Download a single file in chunks, concurrently.

Expand All @@ -744,9 +765,6 @@ def download_chunks_concurrently(
performance under normal circumstances due to Python interpreter threading
behavior. The default is therefore to use processes instead of threads.

Checksumming (md5 or crc32c) is not supported for chunked operations. Any
`checksum` parameter passed in to download_kwargs will be ignored.

:param bucket:
The bucket which contains the blobs to be downloaded

Expand All @@ -768,10 +786,13 @@ def download_chunks_concurrently(
:param download_kwargs:
A dictionary of keyword arguments to pass to the download method. Refer
to the documentation for blob.download_to_file() or
blob.download_to_filename() for more information. The dict is directly passed into the download methods and is not validated by this function.
blob.download_to_filename() for more information. The dict is directly
passed into the download methods and is not validated by this function.

Keyword arguments "start" and "end" which are not supported and will
cause a ValueError if present.
cause a ValueError if present. The key "checksum" is also not supported
in download_kwargs, but see the argument "crc32c_checksum" (which does
not go in download_kwargs) below.

:type deadline: int
:param deadline:
Expand Down Expand Up @@ -811,15 +832,33 @@ def download_chunks_concurrently(
and the default is a conservative number that should work okay in most
cases without consuming excessive resources.

:raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded.
:type crc32c_checksum: bool
:param crc32c_checksum:
Whether to compute a checksum for the resulting object, using the crc32c
algorithm. As the checksums for each chunk must be combined using a
feature of crc32c that is not available for md5, md5 is not supported.

:raises:
:exc:`concurrent.futures.TimeoutError`
if deadline is exceeded.
:exc:`google.resumable_media.common.DataCorruption` if the download's
checksum doesn't agree with server-computed checksum. The
`google.resumable_media` exception is used here for consistency
with other download methods despite the exception originating
elsewhere.
"""
client = blob.client

if download_kwargs is None:
download_kwargs = {}
if "start" in download_kwargs or "end" in download_kwargs:
raise ValueError(
"Download arguments 'start' and 'end' are not supported by download_chunks_concurrently."
)
if "checksum" in download_kwargs:
raise ValueError(
"'checksum' is in download_kwargs, but is not supported because sliced downloads have a different checksum mechanism from regular downloads. Use the 'crc32c_checksum' argument on download_chunks_concurrently instead."
)

download_kwargs["command"] = "tm.download_sharded"

Expand Down Expand Up @@ -851,16 +890,42 @@ def download_chunks_concurrently(
start=start,
end=cursor - 1,
download_kwargs=download_kwargs,
crc32c_checksum=crc32c_checksum,
)
)

concurrent.futures.wait(
futures, timeout=deadline, return_when=concurrent.futures.ALL_COMPLETED
)

# Raise any exceptions. Successful results can be ignored.
# Raise any exceptions; combine checksums.
results = []
for future in futures:
future.result()
results.append(future.result())

if crc32c_checksum and results:
crc_digest = _digest_ordered_checksum_and_size_pairs(results)
actual_checksum = base64.b64encode(crc_digest).decode("utf-8")
expected_checksum = blob.crc32c
if actual_checksum != expected_checksum:
# For consistency with other download methods we will use
# "google.resumable_media.common.DataCorruption" despite the error
# not originating inside google.resumable_media.
download_url = blob._get_download_url(
client,
if_generation_match=download_kwargs.get("if_generation_match"),
if_generation_not_match=download_kwargs.get("if_generation_not_match"),
if_metageneration_match=download_kwargs.get("if_metageneration_match"),
if_metageneration_not_match=download_kwargs.get(
"if_metageneration_not_match"
),
)
raise DataCorruption(
None,
DOWNLOAD_CRC32C_MISMATCH_TEMPLATE.format(
download_url, expected_checksum, actual_checksum
),
)
return None


Expand Down Expand Up @@ -1118,23 +1183,58 @@ def _headers_from_metadata(metadata):


def _download_and_write_chunk_in_place(
maybe_pickled_blob, filename, start, end, download_kwargs
maybe_pickled_blob, filename, start, end, download_kwargs, crc32c_checksum
):
"""Helper function that runs inside a thread or subprocess.

`maybe_pickled_blob` is either a Blob (for threads) or a specially pickled
Blob (for processes) because the default pickling mangles Client objects
which are attached to Blobs."""
which are attached to Blobs.

Returns a crc if configured (or None) and the size written.
"""

if isinstance(maybe_pickled_blob, Blob):
blob = maybe_pickled_blob
else:
blob = pickle.loads(maybe_pickled_blob)
with open(
filename, "rb+"
) as f: # Open in mixed read/write mode to avoid truncating or appending
f.seek(start)
return blob._prep_and_do_download(f, start=start, end=end, **download_kwargs)

with _ChecksummingSparseFileWrapper(filename, start, crc32c_checksum) as f:
blob._prep_and_do_download(f, start=start, end=end, **download_kwargs)
return (f.crc, (end - start) + 1)


class _ChecksummingSparseFileWrapper:
"""A file wrapper that writes to a sparse file and optionally checksums.

This wrapper only implements write() and does not inherit from `io` module
base classes.
"""

def __init__(self, filename, start_position, crc32c_enabled):
# Open in mixed read/write mode to avoid truncating or appending
self.f = open(filename, "rb+")
self.f.seek(start_position)
self._crc = None
self._crc32c_enabled = crc32c_enabled

def write(self, chunk):
if self._crc32c_enabled:
if self._crc is None:
self._crc = google_crc32c.value(chunk)
else:
self._crc = google_crc32c.extend(self._crc, chunk)
self.f.write(chunk)

@property
def crc(self):
return self._crc

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, tb):
self.f.close()


def _call_method_on_maybe_pickled_blob(
Expand Down Expand Up @@ -1208,6 +1308,32 @@ def _get_pool_class_and_requirements(worker_type):
)


def _digest_ordered_checksum_and_size_pairs(checksum_and_size_pairs):
base_crc = None
zeroes = bytes(MAX_CRC32C_ZERO_ARRAY_SIZE)
for part_crc, size in checksum_and_size_pairs:
if not base_crc:
base_crc = part_crc
else:
base_crc ^= 0xFFFFFFFF # precondition

# Zero pad base_crc32c. To conserve memory, do so with only
# MAX_CRC32C_ZERO_ARRAY_SIZE at a time. Reuse the zeroes array where
# possible.
padded = 0
while padded < size:
desired_zeroes_size = min((size - padded), MAX_CRC32C_ZERO_ARRAY_SIZE)
base_crc = google_crc32c.extend(base_crc, zeroes[:desired_zeroes_size])
padded += desired_zeroes_size

base_crc ^= 0xFFFFFFFF # postcondition
base_crc ^= part_crc
crc_digest = struct.pack(
">L", base_crc
) # https://cloud.google.com/storage/docs/json_api/v1/objects#crc32c
return crc_digest


class _LazyClient:
"""An object that will transform into either a cached or a new Client"""

Expand Down
3 changes: 3 additions & 0 deletions samples/snippets/snippets_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ def test_list_blobs_with_prefix(test_blob, capsys):
def test_upload_blob(test_bucket):
with tempfile.NamedTemporaryFile() as source_file:
source_file.write(b"test")
source_file.flush()

storage_upload_file.upload_blob(
test_bucket.name, source_file.name, "test_upload_blob"
Expand Down Expand Up @@ -243,6 +244,7 @@ def test_upload_blob_with_kms(test_bucket):
blob_name = f"test_upload_with_kms_{uuid.uuid4().hex}"
with tempfile.NamedTemporaryFile() as source_file:
source_file.write(b"test")
source_file.flush()
storage_upload_with_kms_key.upload_blob_with_kms(
test_bucket.name,
source_file.name,
Expand Down Expand Up @@ -779,6 +781,7 @@ def test_transfer_manager_download_chunks_concurrently(test_bucket, capsys):

with tempfile.NamedTemporaryFile() as file:
file.write(b"test")
file.flush()

storage_upload_file.upload_blob(test_bucket.name, file.name, BLOB_NAME)

Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"google-cloud-core >= 2.3.0, < 3.0dev",
"google-resumable-media >= 2.6.0",
"requests >= 2.18.0, < 3.0.0dev",
"google-crc32c >= 1.0, < 2.0dev",
]
extras = {"protobuf": ["protobuf<5.0.0dev"]}

Expand Down
13 changes: 12 additions & 1 deletion tests/system/test_transfer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,19 @@ def test_download_chunks_concurrently(shared_bucket, file_data):
with open(trailing_chunk_filename, "rb") as file_obj:
assert _base64_md5hash(file_obj) == source_file["hash"]

# And for a case where there is only one chunk.
trailing_chunk_filename = os.path.join(tempdir, "chunky_file_3")
transfer_manager.download_chunks_concurrently(
download_blob,
trailing_chunk_filename,
chunk_size=size,
deadline=DEADLINE,
)
with open(trailing_chunk_filename, "rb") as file_obj:
assert _base64_md5hash(file_obj) == source_file["hash"]

# Also test threaded mode.
threaded_filename = os.path.join(tempdir, "chunky_file_3")
threaded_filename = os.path.join(tempdir, "chunky_file_4")
transfer_manager.download_chunks_concurrently(
download_blob,
threaded_filename,
Expand Down
Loading