Skip to content

Commit

Permalink
fix: fix Storage#readAllBytes to allow reading compressed bytes
Browse files Browse the repository at this point in the history
Storage#readAllBytes currently ignores BlobSourceOption.shouldReturnRawInputStream(true), fix this so that the option is respected.

Update grpc transport default compression handling for each of the following methods, so it is the same behavior as HTTP:
* GrpcStorage.Impl#readAllBytes
* GrpcStorage.Impl#downloadTo(BlobInfo, Path)
* GrpcStorage.Impl#downloadTo(BlobInfo, OutputStream)

Move the tow tests from ITDownloadToTest into new ITAutomaticGzipDecompressionTest which tests compression handling for all read paths.
  • Loading branch information
BenWhitehead committed Nov 15, 2023
1 parent d4bfcf0 commit 47d2a76
Show file tree
Hide file tree
Showing 8 changed files with 297 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,7 @@ final class BlobReadChannelV2 extends BaseStorageReadChannel<StorageObject> {
this.opts = opts;
this.blobReadChannelContext = blobReadChannelContext;
this.autoGzipDecompression =
// RETURN_RAW_INPUT_STREAM means do not add GZIPInputStream to the pipeline. Meaning, if
// RETURN_RAW_INPUT_STREAM is false, automatically attempt to decompress if Content-Encoding
// gzip.
Boolean.FALSE.equals(opts.get(StorageRpc.Option.RETURN_RAW_INPUT_STREAM));
Utils.isAutoGzipDecompression(opts, /*defaultWhenUndefined=*/ false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,8 @@ public byte[] readAllBytes(String bucket, String blob, BlobSourceOption... optio

@Override
public byte[] readAllBytes(BlobId blob, BlobSourceOption... options) {
UnbufferedReadableByteChannelSession<Object> session = unbufferedReadSession(blob, options);
UnbufferedReadableByteChannelSession<Object> session =
unbufferedDefaultAutoGzipDecompressingReadSession(blob, options);

ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (UnbufferedReadableByteChannel r = session.open();
Expand Down Expand Up @@ -681,16 +682,19 @@ public GrpcBlobReadChannel reader(BlobId blob, BlobSourceOption... options) {
ReadObjectRequest request = getReadObjectRequest(blob, opts);
Set<StatusCode.Code> codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(request));
GrpcCallContext grpcCallContext = Retrying.newCallContext().withRetryableCodes(codes);
boolean autoGzipDecompression =
Utils.isAutoGzipDecompression(opts, /*defaultWhenUndefined=*/ false);
return new GrpcBlobReadChannel(
storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext),
request,
!opts.autoGzipDecompression());
autoGzipDecompression);
}

@Override
public void downloadTo(BlobId blob, Path path, BlobSourceOption... options) {

UnbufferedReadableByteChannelSession<Object> session = unbufferedReadSession(blob, options);
UnbufferedReadableByteChannelSession<Object> session =
unbufferedDefaultAutoGzipDecompressingReadSession(blob, options);

try (UnbufferedReadableByteChannel r = session.open();
WritableByteChannel w = Files.newByteChannel(path, WRITE_OPS)) {
Expand All @@ -703,7 +707,8 @@ public void downloadTo(BlobId blob, Path path, BlobSourceOption... options) {
@Override
public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption... options) {

UnbufferedReadableByteChannelSession<Object> session = unbufferedReadSession(blob, options);
UnbufferedReadableByteChannelSession<Object> session =
unbufferedDefaultAutoGzipDecompressingReadSession(blob, options);

try (UnbufferedReadableByteChannel r = session.open();
WritableByteChannel w = Channels.newChannel(outputStream)) {
Expand Down Expand Up @@ -1801,18 +1806,20 @@ WriteObjectRequest getWriteObjectRequest(BlobInfo info, Opts<ObjectTargetOpt> op
return opts.writeObjectRequest().apply(requestBuilder).build();
}

private UnbufferedReadableByteChannelSession<Object> unbufferedReadSession(
BlobId blob, BlobSourceOption[] options) {
private UnbufferedReadableByteChannelSession<Object>
unbufferedDefaultAutoGzipDecompressingReadSession(BlobId blob, BlobSourceOption[] options) {
Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob).prepend(defaultOpts);
ReadObjectRequest readObjectRequest = getReadObjectRequest(blob, opts);
Set<StatusCode.Code> codes =
resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(readObjectRequest));
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(Retrying.newCallContext().withRetryableCodes(codes));
boolean autoGzipDecompression =
Utils.isAutoGzipDecompression(opts, /*defaultWhenUndefined=*/ true);
return ResumableMedia.gapic()
.read()
.byteChannel(storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext))
.setAutoGzipDecompression(!opts.autoGzipDecompression())
.setAutoGzipDecompression(autoGzipDecompression)
.unbuffered()
.setReadObjectRequest(readObjectRequest)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2301,18 +2301,6 @@ Mapper<BlobInfo.Builder> blobInfoMapper() {
return fuseMappers(ObjectTargetOpt.class, ObjectTargetOpt::blobInfo);
}

/**
* Here for compatibility. This should NOT be an "Opt" instead an attribute of the channel
* builder. When {@link ReturnRawInputStream} is removed, this method should be removed as well.
*
* @see
* GapicDownloadSessionBuilder.ReadableByteChannelSessionBuilder#setAutoGzipDecompression(boolean)
*/
@Deprecated
boolean autoGzipDecompression() {
return filterTo(ReturnRawInputStream.class).findFirst().map(r -> r.val).orElse(true);
}

Decoder<BlobInfo, BlobInfo> clearBlobFields() {
return filterTo(Fields.class)
.findFirst()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.storage.Conversions.Codec;
import com.google.cloud.storage.UnifiedOpts.NamedField;
import com.google.cloud.storage.UnifiedOpts.Opts;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -310,4 +312,28 @@ private static String crc32cEncode(int from) {
static GrpcCallContext merge(@NonNull GrpcCallContext l, @NonNull GrpcCallContext r) {
return (GrpcCallContext) l.merge(r);
}

/**
* RETURN_RAW_INPUT_STREAM means do not add GZIPInputStream to the pipeline. Meaning, if
* RETURN_RAW_INPUT_STREAM is false, automatically attempt to decompress if Content-Encoding gzip.
*/
static boolean isAutoGzipDecompression(Opts<?> opts, boolean defaultWhenUndefined) {
return isAutoGzipDecompression(opts.getRpcOptions(), defaultWhenUndefined);
}

/**
* RETURN_RAW_INPUT_STREAM means do not add GZIPInputStream to the pipeline. Meaning, if
* RETURN_RAW_INPUT_STREAM is false, automatically attempt to decompress if Content-Encoding gzip.
*/
static boolean isAutoGzipDecompression(
Map<StorageRpc.Option, ?> opts, boolean defaultWhenUndefined) {
// Option.getBoolean is package private, and we don't want to open it.
// if non-null explicitly compare to a boolean value to coerce it to a boolean result
Object returnRawInputStream = opts.get(StorageRpc.Option.RETURN_RAW_INPUT_STREAM);
if (returnRawInputStream == null) {
return defaultWhenUndefined;
} else {
return Boolean.FALSE.equals(returnRawInputStream);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,12 @@ public byte[] load(StorageObject from, Map<Option, ?> options) {
.setIfGenerationNotMatch(Option.IF_GENERATION_NOT_MATCH.getLong(options))
.setUserProject(Option.USER_PROJECT.getString(options));
setEncryptionHeaders(getRequest.getRequestHeaders(), ENCRYPTION_KEY_PREFIX, options);
Boolean shouldReturnRawInputStream = Option.RETURN_RAW_INPUT_STREAM.getBoolean(options);
if (shouldReturnRawInputStream != null) {
getRequest.setReturnRawInputStream(shouldReturnRawInputStream);
} else {
getRequest.setReturnRawInputStream(false);
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
getRequest.executeMedia().download(out);
return out.toByteArray();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage;

import static com.google.cloud.storage.TestUtils.assertAll;
import static com.google.common.truth.Truth.assertThat;

import com.google.cloud.storage.UnifiedOpts.Opt;
import com.google.cloud.storage.UnifiedOpts.Opts;
import org.junit.Test;

public final class UtilsTest {
private static final Opts<Opt> autoGzipDecompress_undefined = Opts.empty();
private static final Opts<Opt> autoGzipDecompress_no =
Opts.from(UnifiedOpts.returnRawInputStream(true));
private static final Opts<Opt> autoGzipDecompress_yes =
Opts.from(UnifiedOpts.returnRawInputStream(false));

@Test
public void isAutoGzipDecompression() throws Exception {
assertAll(
() ->
assertThat(
Utils.isAutoGzipDecompression(
autoGzipDecompress_undefined, /*defaultWhenUndefined=*/ false))
.isFalse(),
() ->
assertThat(
Utils.isAutoGzipDecompression(
autoGzipDecompress_undefined, /*defaultWhenUndefined=*/ true))
.isTrue(),
() ->
assertThat(
Utils.isAutoGzipDecompression(
autoGzipDecompress_no, /*defaultWhenUndefined=*/ false))
.isFalse(),
() ->
assertThat(
Utils.isAutoGzipDecompression(
autoGzipDecompress_no, /*defaultWhenUndefined=*/ true))
.isFalse(),
() ->
assertThat(
Utils.isAutoGzipDecompression(
autoGzipDecompress_yes, /*defaultWhenUndefined=*/ false))
.isTrue(),
() ->
assertThat(
Utils.isAutoGzipDecompression(
autoGzipDecompress_yes, /*defaultWhenUndefined=*/ true))
.isTrue());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage.it;

import static com.google.cloud.storage.TestUtils.xxd;
import static com.google.common.truth.Truth.assertThat;

import com.google.cloud.ReadChannel;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobSourceOption;
import com.google.cloud.storage.Storage.BlobTargetOption;
import com.google.cloud.storage.TestUtils;
import com.google.cloud.storage.TransportCompatibility.Transport;
import com.google.cloud.storage.it.runner.StorageITRunner;
import com.google.cloud.storage.it.runner.annotations.Backend;
import com.google.cloud.storage.it.runner.annotations.CrossRun;
import com.google.cloud.storage.it.runner.annotations.Inject;
import com.google.cloud.storage.it.runner.registry.Generator;
import com.google.common.io.ByteStreams;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(StorageITRunner.class)
@CrossRun(
backends = {Backend.PROD},
transports = {Transport.HTTP, Transport.GRPC})
public final class ITAutomaticGzipDecompressionTest {

private static final byte[] helloWorldTextBytes = "hello world".getBytes();
private static final byte[] helloWorldGzipBytes = TestUtils.gzipBytes(helloWorldTextBytes);

@Inject public Storage storage;
@Inject public BucketInfo bucket;
@Inject public Generator generator;

private BlobInfo info;
private BlobId blobId;

@Before
public void setUp() throws Exception {
BlobInfo tmp =
BlobInfo.newBuilder(bucket, generator.randomObjectName())
// define an object with explicit content type and encoding.
// JSON and gRPC have differing default behavior returning these values if they are
// either undefined, or match HTTP defaults.
.setContentType("text/plain")
.setContentEncoding("gzip")
.build();

Blob gen1 = storage.create(tmp, helloWorldGzipBytes, BlobTargetOption.doesNotExist());
info = gen1.asBlobInfo();
blobId = info.getBlobId();
}

@Test
public void readAllBytes_default_uncompressed() {
byte[] bytes = storage.readAllBytes(blobId);
assertThat(xxd(bytes)).isEqualTo(xxd(helloWorldTextBytes));
}

@Test
public void readAllBytes_returnRawInputStream_yes() {
byte[] bytes = storage.readAllBytes(blobId, BlobSourceOption.shouldReturnRawInputStream(true));
assertThat(xxd(bytes)).isEqualTo(xxd(helloWorldGzipBytes));
}

@Test
public void readAllBytes_returnRawInputStream_no() {
byte[] bytes = storage.readAllBytes(blobId, BlobSourceOption.shouldReturnRawInputStream(false));
assertThat(xxd(bytes)).isEqualTo(xxd(helloWorldTextBytes));
}

@Test
public void reader_default_compressed() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ReadChannel r = storage.reader(blobId)) {
WritableByteChannel w = Channels.newChannel(baos);
ByteStreams.copy(r, w);
}

assertThat(xxd(baos.toByteArray())).isEqualTo(xxd(helloWorldGzipBytes));
}

@Test
public void reader_returnRawInputStream_yes() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ReadChannel r =
storage.reader(blobId, BlobSourceOption.shouldReturnRawInputStream(true))) {
WritableByteChannel w = Channels.newChannel(baos);
ByteStreams.copy(r, w);
}

assertThat(xxd(baos.toByteArray())).isEqualTo(xxd(helloWorldGzipBytes));
}

@Test
public void reader_returnRawInputStream_no() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ReadChannel r =
storage.reader(blobId, BlobSourceOption.shouldReturnRawInputStream(false))) {
WritableByteChannel w = Channels.newChannel(baos);
ByteStreams.copy(r, w);
}

assertThat(xxd(baos.toByteArray())).isEqualTo(xxd(helloWorldTextBytes));
}

@Test
public void downloadTo_path_default_uncompressed() throws IOException {
Path helloWorldTxtGz = File.createTempFile(blobId.getName(), ".txt.gz").toPath();
storage.downloadTo(blobId, helloWorldTxtGz);

byte[] actualTxtGzBytes = Files.readAllBytes(helloWorldTxtGz);
assertThat(xxd(actualTxtGzBytes)).isEqualTo(xxd(helloWorldTextBytes));
}

@Test
public void downloadTo_path_returnRawInputStream_yes() throws IOException {
Path helloWorldTxtGz = File.createTempFile(blobId.getName(), ".txt.gz").toPath();
storage.downloadTo(blobId, helloWorldTxtGz, BlobSourceOption.shouldReturnRawInputStream(true));

byte[] actualTxtGzBytes = Files.readAllBytes(helloWorldTxtGz);
assertThat(xxd(actualTxtGzBytes)).isEqualTo(xxd(helloWorldGzipBytes));
}

@Test
public void downloadTo_path_returnRawInputStream_no() throws IOException {
Path helloWorldTxt = File.createTempFile(blobId.getName(), ".txt").toPath();
storage.downloadTo(blobId, helloWorldTxt, BlobSourceOption.shouldReturnRawInputStream(false));
byte[] actualTxtBytes = Files.readAllBytes(helloWorldTxt);
assertThat(xxd(actualTxtBytes)).isEqualTo(xxd(helloWorldTextBytes));
}

@Test
public void downloadTo_outputStream_default_uncompressed() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
storage.downloadTo(blobId, baos);
byte[] actual = baos.toByteArray();
assertThat(xxd(actual)).isEqualTo(xxd(helloWorldTextBytes));
}

@Test
public void downloadTo_outputStream_returnRawInputStream_yes() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
storage.downloadTo(blobId, baos, BlobSourceOption.shouldReturnRawInputStream(true));
byte[] actual = baos.toByteArray();
assertThat(xxd(actual)).isEqualTo(xxd(helloWorldGzipBytes));
}

@Test
public void downloadTo_outputStream_returnRawInputStream_no() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
storage.downloadTo(blobId, baos, BlobSourceOption.shouldReturnRawInputStream(false));
byte[] actual = baos.toByteArray();
assertThat(xxd(actual)).isEqualTo(xxd(helloWorldTextBytes));
}
}
Loading

0 comments on commit 47d2a76

Please sign in to comment.