Skip to content

Commit

Permalink
Fix gsutil interactions with transcoding and compression
Browse files Browse the repository at this point in the history
This change sends accept-encoding: gzip to the service only
when the requested object has content-encoding: gzip or the
entire range of the object is being requested. This
prevents compressive transcoding by the service.
Compressive transcoding can cause responses to range requests
to instead issue all of the bytes of the object, which,
while legal according to the HTTP spec, breaks resumability.

The change also adds cache-control no-transform to objects
that are uploaded (via the -z or -Z options) with gzipped
content-encoding. This ensures that objects that are stored
doubly-compressed are served as-is, without removing the first
layer of compression. This is necessary because removing
the first layer of compression would cause the content served
to the user to differ from the cloud-stored hashes for the
doubly-compressed object.

Fixes #324
  • Loading branch information
thobrla committed Mar 7, 2016
1 parent 0866ee6 commit 439573e
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 38 deletions.
8 changes: 5 additions & 3 deletions gslib/boto_translation.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
from gslib.translation_helper import LifecycleTranslation
from gslib.translation_helper import REMOVE_CORS_CONFIG
from gslib.translation_helper import S3MarkerAclFromObjectMetadata
from gslib.util import AddAcceptEncodingGzipIfNeeded
from gslib.util import ConfigureNoOpAuthIfNeeded
from gslib.util import DEFAULT_FILE_BUFFER_SIZE
from gslib.util import GetMaxRetryDelay
Expand Down Expand Up @@ -424,15 +425,16 @@ def _CurryDigester(self, digester_object):
def GetObjectMedia(
self, bucket_name, object_name, download_stream, provider=None,
generation=None, object_size=None,
compressed_encoding=False,
download_strategy=CloudApi.DownloadStrategy.ONE_SHOT,
start_byte=0, end_byte=None, progress_callback=None,
serialization_data=None, digesters=None):
"""See CloudApi class for function doc strings."""
# This implementation will get the object metadata first if we don't pass it
# in via serialization_data.
headers = self._CreateBaseHeaders()
if 'accept-encoding' not in headers:
headers['accept-encoding'] = 'gzip'
AddAcceptEncodingGzipIfNeeded(
headers, compressed_encoding=compressed_encoding)
if end_byte is not None:
headers['range'] = 'bytes=%s-%s' % (start_byte, end_byte)
elif start_byte > 0:
Expand Down Expand Up @@ -582,7 +584,7 @@ def _PerformResumableDownload(self, fp, start_byte, end_byte, key,
fp.flush()
# Download succeeded.
return
except retryable_exceptions, e:
except retryable_exceptions, e: # pylint: disable=catching-non-exception
if debug >= 1:
self.logger.info('Caught exception (%s)', repr(e))
if isinstance(e, IOError) and e.errno == errno.EPIPE:
Expand Down
3 changes: 3 additions & 0 deletions gslib/cat_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from gslib.exception import CommandException
from gslib.exception import NO_URLS_MATCHED_TARGET
from gslib.util import ObjectIsGzipEncoded
from gslib.wildcard_iterator import StorageUrlFromString


Expand Down Expand Up @@ -70,8 +71,10 @@ def CatUrlStrings(self, url_strings, show_header=False, start_byte=0,
cat_object = blr.root_object
storage_url = StorageUrlFromString(blr.url_string)
if storage_url.IsCloudUrl():
compressed_encoding = ObjectIsGzipEncoded(cat_object)
self.command_obj.gsutil_api.GetObjectMedia(
cat_object.bucket, cat_object.name, cat_outfd,
compressed_encoding=compressed_encoding,
start_byte=start_byte, end_byte=end_byte,
object_size=cat_object.size, generation=storage_url.generation,
provider=storage_url.scheme)
Expand Down
2 changes: 2 additions & 0 deletions gslib/cloud_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ class DownloadStrategy(object):

def GetObjectMedia(self, bucket_name, object_name, download_stream,
provider=None, generation=None, object_size=None,
compressed_encoding=False,
download_strategy=DownloadStrategy.ONE_SHOT, start_byte=0,
end_byte=None, progress_callback=None,
serialization_data=None, digesters=None):
Expand All @@ -271,6 +272,7 @@ def GetObjectMedia(self, bucket_name, object_name, download_stream,
class-wide default is used.
generation: Generation of the object to retrieve.
object_size: Total size of the object being downloaded.
compressed_encoding: If true, object is stored with a compressed encoding.
download_strategy: Cloud API download strategy to use for download.
start_byte: Starting point for download (for resumable downloads and
range requests). Can be set to negative to request a range
Expand Down
2 changes: 2 additions & 0 deletions gslib/cloud_api_delegator.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,13 @@ def PatchObjectMetadata(self, bucket_name, object_name, metadata,
def GetObjectMedia(
self, bucket_name, object_name, download_stream, provider=None,
generation=None, object_size=None,
compressed_encoding=False,
download_strategy=CloudApi.DownloadStrategy.ONE_SHOT,
start_byte=0, end_byte=None, progress_callback=None,
serialization_data=None, digesters=None):
return self._GetApi(provider).GetObjectMedia(
bucket_name, object_name, download_stream,
compressed_encoding=compressed_encoding,
download_strategy=download_strategy, start_byte=start_byte,
end_byte=end_byte, generation=generation, object_size=object_size,
progress_callback=progress_callback,
Expand Down
36 changes: 27 additions & 9 deletions gslib/copy_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
from gslib.util import IsCloudSubdirPlaceholder
from gslib.util import MakeHumanReadable
from gslib.util import MIN_SIZE_COMPUTE_LOGGING
from gslib.util import ObjectIsGzipEncoded
from gslib.util import ResumableThreshold
from gslib.util import TEN_MIB
from gslib.util import UsingCrcmodExtension
Expand Down Expand Up @@ -143,7 +144,7 @@

# For debugging purposes; if True, files and objects that fail hash validation
# will be saved with the below suffix appended.
_RENAME_ON_HASH_MISMATCH = False
_RENAME_ON_HASH_MISMATCH = True
_RENAME_ON_HASH_MISMATCH_SUFFIX = '_corrupt'

PARALLEL_UPLOAD_TEMP_NAMESPACE = (
Expand Down Expand Up @@ -1081,7 +1082,7 @@ def _ShouldDoParallelCompositeUpload(logger, allow_splitting, src_url, dst_url,
'"parallel_composite_upload_threshold" value in your .boto '
'configuration file. However, note that if you do this large files '
'will be uploaded as '
'`composite objects <https://cloud.google.com/storage/docs/composite-objects>`_,'
'`composite objects <https://cloud.google.com/storage/docs/composite-objects>`_,' # pylint: disable=line-too-long
'which means that any user who downloads such objects will need to '
'have a compiled crcmod installed (see "gsutil help crcmod"). This '
'is because without a compiled crcmod, computing checksums on '
Expand Down Expand Up @@ -1579,11 +1580,23 @@ def _UploadFileToObject(src_url, src_obj_filestream, src_obj_size,
upload_size = src_obj_size
zipped_file = False
if (gzip_exts == GZIP_ALL_FILES or
(gzip_exts and len(fname_parts) > 1 and fname_parts[-1] in gzip_exts)):
(gzip_exts and len(fname_parts) > 1 and fname_parts[-1] in gzip_exts)):
upload_url, upload_size = _CompressFileForUpload(
src_url, src_obj_filestream, src_obj_size, logger)
upload_stream = open(upload_url.object_name, 'rb')
dst_obj_metadata.contentEncoding = 'gzip'
# If we're sending an object with gzip encoding, it's possible it also
# has an incompressible content type. Google Cloud Storage will remove
# the top layer of compression when serving the object, which would cause
# the served content not to match the CRC32C/MD5 hashes stored and make
# integrity checking impossible. Therefore we set cache control to
# no-transform to ensure it is served in its original form. The caveat is
# that to read this object, other clients must then support
# accept-encoding:gzip.
if not dst_obj_metadata.cacheControl:
dst_obj_metadata.cacheControl = 'no-transform'
elif 'no-transform' not in dst_obj_metadata.cacheControl.lower():
dst_obj_metadata.cacheControl += ',no-transform'
zipped_file = True

elapsed_time = None
Expand Down Expand Up @@ -1708,8 +1721,7 @@ def _GetDownloadFile(dst_url, src_obj_metadata, logger):
# server sends decompressed bytes for a file that is stored compressed
# (double compressed case), there is no way we can validate the hash and
# we will fail our hash check for the object.
if (src_obj_metadata.contentEncoding and
src_obj_metadata.contentEncoding.lower().endswith('gzip')):
if ObjectIsGzipEncoded(src_obj_metadata):
need_to_unzip = True
download_file_name = _GetDownloadTempZipFileName(dst_url)
logger.info(
Expand Down Expand Up @@ -2059,8 +2071,7 @@ def _DoSlicedDownload(src_url, src_obj_metadata, dst_url, download_file_name,
component_lengths[i])

bytes_transferred = 0
expect_gzip = (src_obj_metadata.contentEncoding and
src_obj_metadata.contentEncoding.lower().endswith('gzip'))
expect_gzip = ObjectIsGzipEncoded(src_obj_metadata)
for cp_result in cp_results:
bytes_transferred += cp_result.bytes_transferred
server_gzip = (cp_result.server_encoding and
Expand Down Expand Up @@ -2183,6 +2194,8 @@ def _DownloadObjectToFileResumable(src_url, src_obj_metadata, dst_url,
fp = SlicedDownloadFileWrapper(fp, tracker_file_name, src_obj_metadata,
start_byte, end_byte)

compressed_encoding = ObjectIsGzipEncoded(src_obj_metadata)

# TODO: With gzip encoding (which may occur on-the-fly and not be part of
# the object's metadata), when we request a range to resume, it's possible
# that the server will just resend the entire object, which means our
Expand All @@ -2194,6 +2207,7 @@ def _DownloadObjectToFileResumable(src_url, src_obj_metadata, dst_url,
server_encoding = gsutil_api.GetObjectMedia(
src_url.bucket_name, src_url.object_name, fp,
start_byte=download_start_byte, end_byte=end_byte,
compressed_encoding=compressed_encoding,
generation=src_url.generation, object_size=src_obj_metadata.size,
download_strategy=CloudApi.DownloadStrategy.RESUMABLE,
provider=src_url.scheme, serialization_data=serialization_data,
Expand Down Expand Up @@ -2572,9 +2586,13 @@ def _CopyObjToObjDaisyChainMode(src_url, src_obj_metadata, dst_url,
with open(global_copy_helper_opts.test_callback_file, 'rb') as test_fp:
progress_callback = pickle.loads(test_fp.read()).call

compressed_encoding = ObjectIsGzipEncoded(src_obj_metadata)

start_time = time.time()
upload_fp = DaisyChainWrapper(src_url, src_obj_metadata.size, gsutil_api,
progress_callback=progress_callback)
upload_fp = DaisyChainWrapper(
src_url, src_obj_metadata.size, gsutil_api,
compressed_encoding=compressed_encoding,
progress_callback=progress_callback)
uploaded_object = None
if src_obj_metadata.size == 0:
# Resumable uploads of size 0 are not supported.
Expand Down
30 changes: 25 additions & 5 deletions gslib/daisy_chain_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Wrapper for use in daisy-chained copies."""

from collections import deque
from contextlib import contextmanager
import os
import threading
import time
Expand Down Expand Up @@ -60,6 +61,14 @@ def write(self, data): # pylint: disable=invalid-name
self.daisy_chain_wrapper.bytes_buffered += data_len


@contextmanager
def AcquireLockWithTimeout(lock, timeout):
result = lock.acquire(timeout=timeout)
yield result
if result:
lock.release()


class DaisyChainWrapper(object):
"""Wrapper class for daisy-chaining a cloud download to an upload.
Expand All @@ -73,14 +82,16 @@ class DaisyChainWrapper(object):
used.
"""

def __init__(self, src_url, src_obj_size, gsutil_api, progress_callback=None,
def __init__(self, src_url, src_obj_size, gsutil_api,
compressed_encoding=False, progress_callback=None,
download_chunk_size=_DEFAULT_DOWNLOAD_CHUNK_SIZE):
"""Initializes the daisy chain wrapper.
Args:
src_url: Source CloudUrl to copy from.
src_obj_size: Size of source object.
gsutil_api: gsutil Cloud API to use for the copy.
compressed_encoding: If true, source object has content-encoding: gzip.
progress_callback: Optional callback function for progress notifications
for the download thread. Receives calls with arguments
(bytes_transferred, total_size).
Expand Down Expand Up @@ -114,6 +125,7 @@ def __init__(self, src_url, src_obj_size, gsutil_api, progress_callback=None,

self.src_obj_size = src_obj_size
self.src_url = src_url
self.compressed_encoding = compressed_encoding

# This is safe to use the upload and download thread because the download
# thread calls only GetObjectMedia, which creates a new HTTP connection
Expand All @@ -126,6 +138,7 @@ def __init__(self, src_url, src_obj_size, gsutil_api, progress_callback=None,
self.download_exception = None
self.download_thread = None
self.progress_callback = progress_callback
self.thread_started = threading.Event()
self.stop_download = threading.Event()
self.StartDownloadThread(progress_callback=self.progress_callback)

Expand All @@ -150,10 +163,12 @@ def PerformDownload(start_byte, progress_callback):
# object to support seek() and tell() which requires coordination with
# the upload.
try:
self.thread_started.set()
while start_byte + self._download_chunk_size < self.src_obj_size:
self.gsutil_api.GetObjectMedia(
self.src_url.bucket_name, self.src_url.object_name,
BufferWrapper(self), start_byte=start_byte,
BufferWrapper(self), compressed_encoding=self.compressed_encoding,
start_byte=start_byte,
end_byte=start_byte + self._download_chunk_size - 1,
generation=self.src_url.generation, object_size=self.src_obj_size,
download_strategy=CloudApi.DownloadStrategy.ONE_SHOT,
Expand All @@ -165,8 +180,9 @@ def PerformDownload(start_byte, progress_callback):
start_byte += self._download_chunk_size
self.gsutil_api.GetObjectMedia(
self.src_url.bucket_name, self.src_url.object_name,
BufferWrapper(self), start_byte=start_byte,
generation=self.src_url.generation, object_size=self.src_obj_size,
BufferWrapper(self), compressed_encoding=self.compressed_encoding,
start_byte=start_byte, generation=self.src_url.generation,
object_size=self.src_obj_size,
download_strategy=CloudApi.DownloadStrategy.ONE_SHOT,
provider=self.src_url.scheme, progress_callback=progress_callback)
# We catch all exceptions here because we want to store them.
Expand All @@ -181,6 +197,7 @@ def PerformDownload(start_byte, progress_callback):
target=PerformDownload,
args=(start_byte, progress_callback))
self.download_thread.start()
self.thread_started.wait()

def read(self, amt=None): # pylint: disable=invalid-name
"""Exposes a stream from the in-memory buffer to the upload."""
Expand All @@ -197,11 +214,14 @@ def read(self, amt=None): # pylint: disable=invalid-name
with self.lock:
if self.buffer:
break
with self.download_exception_lock:
if AcquireLockWithTimeout(self.download_exception_lock, 30):
if self.download_exception:
# Download thread died, so we will never recover. Raise the
# exception that killed it.
raise self.download_exception # pylint: disable=raising-bad-type
else:
if not self.download_thread.is_alive():
raise Exception('Download thread died suddenly.')
# Buffer was empty, yield thread priority so the download thread can fill.
time.sleep(0)
with self.lock:
Expand Down
32 changes: 21 additions & 11 deletions gslib/gcs_json_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@
import boto
from boto import config
from gcs_oauth2_boto_plugin import oauth2_helper
import httplib2
import oauth2client
from oauth2client import devshell
from oauth2client import multistore_file

from gslib.cloud_api import AccessDeniedException
from gslib.cloud_api import ArgumentException
Expand Down Expand Up @@ -79,6 +75,7 @@
from gslib.translation_helper import DEFAULT_CONTENT_TYPE
from gslib.translation_helper import PRIVATE_DEFAULT_OBJ_ACL
from gslib.translation_helper import REMOVE_CORS_CONFIG
from gslib.util import AddAcceptEncodingGzipIfNeeded
from gslib.util import GetBotoConfigFileList
from gslib.util import GetCertsFile
from gslib.util import GetCredentialStoreFilename
Expand All @@ -90,6 +87,11 @@
from gslib.util import GetPrintableExceptionString
from gslib.util import JsonResumableChunkSizeDefined

import httplib2
import oauth2client
from oauth2client import devshell
from oauth2client import multistore_file


# Implementation supports only 'gs' URLs, so provider is unused.
# pylint: disable=unused-argument
Expand Down Expand Up @@ -679,6 +681,7 @@ def GetObjectMetadata(self, bucket_name, object_name, generation=None,
def GetObjectMedia(
self, bucket_name, object_name, download_stream,
provider=None, generation=None, object_size=None,
compressed_encoding=False,
download_strategy=CloudApi.DownloadStrategy.ONE_SHOT, start_byte=0,
end_byte=None, progress_callback=None, serialization_data=None,
digesters=None):
Expand Down Expand Up @@ -741,13 +744,16 @@ def GetObjectMedia(
return self._PerformResumableDownload(
bucket_name, object_name, download_stream, apitools_request,
apitools_download, bytes_downloaded_container,
compressed_encoding=compressed_encoding,
generation=generation, start_byte=start_byte, end_byte=end_byte,
serialization_data=serialization_data)
else:
return self._PerformDownload(
bucket_name, object_name, download_stream, apitools_request,
apitools_download, generation=generation, start_byte=start_byte,
end_byte=end_byte, serialization_data=serialization_data)
apitools_download, generation=generation,
compressed_encoding=compressed_encoding,
start_byte=start_byte, end_byte=end_byte,
serialization_data=serialization_data)
except TRANSLATABLE_APITOOLS_EXCEPTIONS, e:
self._TranslateExceptionAndRaise(e, bucket_name=bucket_name,
object_name=object_name,
Expand All @@ -756,14 +762,16 @@ def GetObjectMedia(
def _PerformResumableDownload(
self, bucket_name, object_name, download_stream, apitools_request,
apitools_download, bytes_downloaded_container, generation=None,
start_byte=0, end_byte=None, serialization_data=None):
compressed_encoding=False, start_byte=0, end_byte=None,
serialization_data=None):
retries = 0
last_progress_byte = start_byte
while retries <= self.num_retries:
try:
return self._PerformDownload(
bucket_name, object_name, download_stream, apitools_request,
apitools_download, generation=generation, start_byte=start_byte,
apitools_download, generation=generation,
compressed_encoding=compressed_encoding, start_byte=start_byte,
end_byte=end_byte, serialization_data=serialization_data)
except HTTP_TRANSFER_EXCEPTIONS, e:
self._ValidateHttpAccessTokenRefreshError(e)
Expand All @@ -789,8 +797,8 @@ def _PerformResumableDownload(

def _PerformDownload(
self, bucket_name, object_name, download_stream, apitools_request,
apitools_download, generation=None, start_byte=0, end_byte=None,
serialization_data=None):
apitools_download, generation=None, compressed_encoding=False,
start_byte=0, end_byte=None, serialization_data=None):
if not serialization_data:
try:
self.api_client.objects.Get(apitools_request,
Expand All @@ -816,9 +824,11 @@ def _NoOpCallback(unused_response, unused_download_object):
# Since bytes_http is created in this function, we don't get the
# user-agent header from api_client's http automatically.
additional_headers = {
'accept-encoding': 'gzip',
'user-agent': self.api_client.user_agent
}
AddAcceptEncodingGzipIfNeeded(additional_headers,
compressed_encoding=compressed_encoding)

self._AddPerfTraceTokenToHeaders(additional_headers)

if start_byte or end_byte is not None:
Expand Down
Loading

0 comments on commit 439573e

Please sign in to comment.