Skip to content

Commit

Permalink
fix: fix Storage#readAllBytes to allow reading compressed bytes (#2304)
Browse files Browse the repository at this point in the history
Storage#readAllBytes currently ignores BlobSourceOption.shouldReturnRawInputStream(true), fix this so that the option is respected.

Update grpc transport default compression handling for each of the following methods, so it is the same behavior as HTTP:
* GrpcStorage.Impl#readAllBytes
* GrpcStorage.Impl#downloadTo(BlobInfo, Path)
* GrpcStorage.Impl#downloadTo(BlobInfo, OutputStream)

Move the two tests from ITDownloadToTest into new ITAutomaticGzipDecompressionTest which tests compression handling for all read paths.
  • Loading branch information
BenWhitehead authored Nov 15, 2023
1 parent d4bfcf0 commit 68b96a9
Show file tree
Hide file tree
Showing 11 changed files with 323 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,12 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
long totalRead = 0;
do {
if (sbc == null) {
sbc = Retrying.run(options, resultRetryAlgorithm, this::open, Function.identity());
try {
sbc = Retrying.run(options, resultRetryAlgorithm, this::open, Function.identity());
} catch (StorageException e) {
result.setException(e);
throw e;
}
}

long totalRemaining = Buffers.totalRemaining(dsts, offset, length);
Expand All @@ -124,13 +129,17 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
sbc = null;
} else if (t instanceof IOException) {
IOException ioE = (IOException) t;
if (resultRetryAlgorithm.shouldRetry(StorageException.translate(ioE), null)) {
StorageException translate = StorageException.translate(ioE);
if (resultRetryAlgorithm.shouldRetry(translate, null)) {
sbc = null;
} else {
result.setException(translate);
throw ioE;
}
} else {
throw new IOException(StorageException.coalesce(t));
BaseServiceException coalesce = StorageException.coalesce(t);
result.setException(coalesce);
throw new IOException(coalesce);
}
} finally {
long totalRemainingAfter = Buffers.totalRemaining(dsts, offset, length);
Expand Down Expand Up @@ -207,20 +216,17 @@ private ScatteringByteChannel open() {
if (xGoogGeneration != null) {
int statusCode = e.getStatusCode();
if (statusCode == 404) {
throw new StorageException(404, "Failure while trying to resume download", e);
StorageException storageException =
new StorageException(404, "Failure while trying to resume download", e);
result.setException(storageException);
throw storageException;
}
}
StorageException translate = StorageException.translate(e);
result.setException(translate);
throw translate;
throw StorageException.translate(e);
} catch (IOException e) {
StorageException translate = StorageException.translate(e);
result.setException(translate);
throw translate;
throw StorageException.translate(e);
} catch (Throwable t) {
BaseServiceException coalesce = StorageException.coalesce(t);
result.setException(coalesce);
throw coalesce;
throw StorageException.coalesce(t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,7 @@ final class BlobReadChannelV2 extends BaseStorageReadChannel<StorageObject> {
this.opts = opts;
this.blobReadChannelContext = blobReadChannelContext;
this.autoGzipDecompression =
// RETURN_RAW_INPUT_STREAM means do not add GZIPInputStream to the pipeline. Meaning, if
// RETURN_RAW_INPUT_STREAM is false, automatically attempt to decompress if Content-Encoding
// gzip.
Boolean.FALSE.equals(opts.get(StorageRpc.Option.RETURN_RAW_INPUT_STREAM));
Utils.isAutoGzipDecompression(opts, /*defaultWhenUndefined=*/ false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,8 @@ public byte[] readAllBytes(String bucket, String blob, BlobSourceOption... optio

@Override
public byte[] readAllBytes(BlobId blob, BlobSourceOption... options) {
UnbufferedReadableByteChannelSession<Object> session = unbufferedReadSession(blob, options);
UnbufferedReadableByteChannelSession<Object> session =
unbufferedDefaultAutoGzipDecompressingReadSession(blob, options);

ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (UnbufferedReadableByteChannel r = session.open();
Expand Down Expand Up @@ -681,16 +682,19 @@ public GrpcBlobReadChannel reader(BlobId blob, BlobSourceOption... options) {
ReadObjectRequest request = getReadObjectRequest(blob, opts);
Set<StatusCode.Code> codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(request));
GrpcCallContext grpcCallContext = Retrying.newCallContext().withRetryableCodes(codes);
boolean autoGzipDecompression =
Utils.isAutoGzipDecompression(opts, /*defaultWhenUndefined=*/ false);
return new GrpcBlobReadChannel(
storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext),
request,
!opts.autoGzipDecompression());
autoGzipDecompression);
}

@Override
public void downloadTo(BlobId blob, Path path, BlobSourceOption... options) {

UnbufferedReadableByteChannelSession<Object> session = unbufferedReadSession(blob, options);
UnbufferedReadableByteChannelSession<Object> session =
unbufferedDefaultAutoGzipDecompressingReadSession(blob, options);

try (UnbufferedReadableByteChannel r = session.open();
WritableByteChannel w = Files.newByteChannel(path, WRITE_OPS)) {
Expand All @@ -703,7 +707,8 @@ public void downloadTo(BlobId blob, Path path, BlobSourceOption... options) {
@Override
public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption... options) {

UnbufferedReadableByteChannelSession<Object> session = unbufferedReadSession(blob, options);
UnbufferedReadableByteChannelSession<Object> session =
unbufferedDefaultAutoGzipDecompressingReadSession(blob, options);

try (UnbufferedReadableByteChannel r = session.open();
WritableByteChannel w = Channels.newChannel(outputStream)) {
Expand Down Expand Up @@ -1801,18 +1806,20 @@ WriteObjectRequest getWriteObjectRequest(BlobInfo info, Opts<ObjectTargetOpt> op
return opts.writeObjectRequest().apply(requestBuilder).build();
}

private UnbufferedReadableByteChannelSession<Object> unbufferedReadSession(
BlobId blob, BlobSourceOption[] options) {
private UnbufferedReadableByteChannelSession<Object>
unbufferedDefaultAutoGzipDecompressingReadSession(BlobId blob, BlobSourceOption[] options) {
Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob).prepend(defaultOpts);
ReadObjectRequest readObjectRequest = getReadObjectRequest(blob, opts);
Set<StatusCode.Code> codes =
resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(readObjectRequest));
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(Retrying.newCallContext().withRetryableCodes(codes));
boolean autoGzipDecompression =
Utils.isAutoGzipDecompression(opts, /*defaultWhenUndefined=*/ true);
return ResumableMedia.gapic()
.read()
.byteChannel(storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext))
.setAutoGzipDecompression(!opts.autoGzipDecompression())
.setAutoGzipDecompression(autoGzipDecompression)
.unbuffered()
.setReadObjectRequest(readObjectRequest)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.storage;

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiExceptions;
import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel;
import java.io.ByteArrayInputStream;
import java.io.FilterInputStream;
Expand All @@ -27,7 +28,6 @@
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.util.concurrent.ExecutionException;
import java.util.zip.GZIPInputStream;

final class GzipReadableByteChannel implements UnbufferedReadableByteChannel {
Expand Down Expand Up @@ -60,7 +60,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
source.read(wrap);
try {
// Step 2: wait for the object metadata, this is populated in the first message from GCS
String contentEncoding = this.contentEncoding.get();
String contentEncoding = ApiExceptions.callAndTranslateApiException(this.contentEncoding);
// if the Content-Encoding is gzip, Step 3: wire gzip decompression into the byte path
// this will have a copy impact as we are no longer controlling all the buffers
if ("gzip".equals(contentEncoding) || "x-gzip".equals(contentEncoding)) {
Expand All @@ -86,7 +86,9 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
bytesRead += Buffers.copy(wrap, dsts, offset, length);
delegate = source;
}
} catch (InterruptedException | ExecutionException e) {
} catch (StorageException se) {
throw se;
} catch (Exception e) {
throw new IOException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2301,18 +2301,6 @@ Mapper<BlobInfo.Builder> blobInfoMapper() {
return fuseMappers(ObjectTargetOpt.class, ObjectTargetOpt::blobInfo);
}

/**
* Here for compatibility. This should NOT be an "Opt" instead an attribute of the channel
* builder. When {@link ReturnRawInputStream} is removed, this method should be removed as well.
*
* @see
* GapicDownloadSessionBuilder.ReadableByteChannelSessionBuilder#setAutoGzipDecompression(boolean)
*/
@Deprecated
boolean autoGzipDecompression() {
return filterTo(ReturnRawInputStream.class).findFirst().map(r -> r.val).orElse(true);
}

Decoder<BlobInfo, BlobInfo> clearBlobFields() {
return filterTo(Fields.class)
.findFirst()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.storage.Conversions.Codec;
import com.google.cloud.storage.UnifiedOpts.NamedField;
import com.google.cloud.storage.UnifiedOpts.Opts;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -310,4 +312,28 @@ private static String crc32cEncode(int from) {
static GrpcCallContext merge(@NonNull GrpcCallContext l, @NonNull GrpcCallContext r) {
return (GrpcCallContext) l.merge(r);
}

/**
* RETURN_RAW_INPUT_STREAM means do not add GZIPInputStream to the pipeline. Meaning, if
* RETURN_RAW_INPUT_STREAM is false, automatically attempt to decompress if Content-Encoding gzip.
*/
static boolean isAutoGzipDecompression(Opts<?> opts, boolean defaultWhenUndefined) {
return isAutoGzipDecompression(opts.getRpcOptions(), defaultWhenUndefined);
}

/**
* RETURN_RAW_INPUT_STREAM means do not add GZIPInputStream to the pipeline. Meaning, if
* RETURN_RAW_INPUT_STREAM is false, automatically attempt to decompress if Content-Encoding gzip.
*/
static boolean isAutoGzipDecompression(
Map<StorageRpc.Option, ?> opts, boolean defaultWhenUndefined) {
// Option.getBoolean is package private, and we don't want to open it.
// if non-null explicitly compare to a boolean value to coerce it to a boolean result
Object returnRawInputStream = opts.get(StorageRpc.Option.RETURN_RAW_INPUT_STREAM);
if (returnRawInputStream == null) {
return defaultWhenUndefined;
} else {
return Boolean.FALSE.equals(returnRawInputStream);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,12 @@ public byte[] load(StorageObject from, Map<Option, ?> options) {
.setIfGenerationNotMatch(Option.IF_GENERATION_NOT_MATCH.getLong(options))
.setUserProject(Option.USER_PROJECT.getString(options));
setEncryptionHeaders(getRequest.getRequestHeaders(), ENCRYPTION_KEY_PREFIX, options);
Boolean shouldReturnRawInputStream = Option.RETURN_RAW_INPUT_STREAM.getBoolean(options);
if (shouldReturnRawInputStream != null) {
getRequest.setReturnRawInputStream(shouldReturnRawInputStream);
} else {
getRequest.setReturnRawInputStream(false);
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
getRequest.executeMedia().download(out);
return out.toByteArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,10 @@ public void autoGzipDecompress_default_disabled() throws IOException {
}

@Test
public void storage_readAllBytes_defaultCompressed() {
public void storage_readAllBytes_defaultUncompressed() {
Storage s = storageFixture.getInstance();
byte[] actual = s.readAllBytes(BlobId.of("buck", "obj-compressed"));
assertThat(actual).isEqualTo(dataCompressed);
assertThat(actual).isEqualTo(dataUncompressed);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage;

import static com.google.cloud.storage.TestUtils.assertAll;
import static com.google.common.truth.Truth.assertThat;

import com.google.cloud.storage.UnifiedOpts.Opt;
import com.google.cloud.storage.UnifiedOpts.Opts;
import org.junit.Test;

public final class UtilsTest {
private static final Opts<Opt> autoGzipDecompress_undefined = Opts.empty();
private static final Opts<Opt> autoGzipDecompress_no =
Opts.from(UnifiedOpts.returnRawInputStream(true));
private static final Opts<Opt> autoGzipDecompress_yes =
Opts.from(UnifiedOpts.returnRawInputStream(false));

@Test
public void isAutoGzipDecompression() throws Exception {
assertAll(
() ->
assertThat(
Utils.isAutoGzipDecompression(
autoGzipDecompress_undefined, /*defaultWhenUndefined=*/ false))
.isFalse(),
() ->
assertThat(
Utils.isAutoGzipDecompression(
autoGzipDecompress_undefined, /*defaultWhenUndefined=*/ true))
.isTrue(),
() ->
assertThat(
Utils.isAutoGzipDecompression(
autoGzipDecompress_no, /*defaultWhenUndefined=*/ false))
.isFalse(),
() ->
assertThat(
Utils.isAutoGzipDecompression(
autoGzipDecompress_no, /*defaultWhenUndefined=*/ true))
.isFalse(),
() ->
assertThat(
Utils.isAutoGzipDecompression(
autoGzipDecompress_yes, /*defaultWhenUndefined=*/ false))
.isTrue(),
() ->
assertThat(
Utils.isAutoGzipDecompression(
autoGzipDecompress_yes, /*defaultWhenUndefined=*/ true))
.isTrue());
}
}
Loading

0 comments on commit 68b96a9

Please sign in to comment.