Skip to content

Commit

Permalink
fix: Revert "fix: fix Storage#readAllBytes to allow reading compresse…
Browse files Browse the repository at this point in the history
…d byt… (#2330)

…es (#2304)"

This reverts commit 68b96a9.

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/java-storage/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> ☕️

If you write sample code, please follow the [samples format](
https://github.com/GoogleCloudPlatform/java-docs-samples/blob/main/SAMPLE_FORMAT.md).
  • Loading branch information
sydney-munro committed Dec 6, 2023
1 parent 57f1819 commit f42283d
Show file tree
Hide file tree
Showing 19 changed files with 347 additions and 435 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
long totalRead = 0;
do {
if (sbc == null) {
try {
sbc = Retrying.run(options, resultRetryAlgorithm, this::open, Function.identity());
} catch (StorageException e) {
result.setException(e);
throw e;
}
sbc = Retrying.run(options, resultRetryAlgorithm, this::open, Function.identity());
}

long totalRemaining = Buffers.totalRemaining(dsts, offset, length);
Expand All @@ -129,17 +124,13 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
sbc = null;
} else if (t instanceof IOException) {
IOException ioE = (IOException) t;
StorageException translate = StorageException.translate(ioE);
if (resultRetryAlgorithm.shouldRetry(translate, null)) {
if (resultRetryAlgorithm.shouldRetry(StorageException.translate(ioE), null)) {
sbc = null;
} else {
result.setException(translate);
throw ioE;
}
} else {
BaseServiceException coalesce = StorageException.coalesce(t);
result.setException(coalesce);
throw new IOException(coalesce);
throw new IOException(StorageException.coalesce(t));
}
} finally {
long totalRemainingAfter = Buffers.totalRemaining(dsts, offset, length);
Expand Down Expand Up @@ -216,17 +207,20 @@ private ScatteringByteChannel open() {
if (xGoogGeneration != null) {
int statusCode = e.getStatusCode();
if (statusCode == 404) {
StorageException storageException =
new StorageException(404, "Failure while trying to resume download", e);
result.setException(storageException);
throw storageException;
throw new StorageException(404, "Failure while trying to resume download", e);
}
}
throw StorageException.translate(e);
StorageException translate = StorageException.translate(e);
result.setException(translate);
throw translate;
} catch (IOException e) {
throw StorageException.translate(e);
StorageException translate = StorageException.translate(e);
result.setException(translate);
throw translate;
} catch (Throwable t) {
throw StorageException.coalesce(t);
BaseServiceException coalesce = StorageException.coalesce(t);
result.setException(coalesce);
throw coalesce;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,12 @@ public final synchronized int write(ByteBuffer src) throws IOException {
}
int write = tmp.write(src);
return write;
} catch (StorageException e) {
throw new IOException(e);
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw StorageException.coalesce(e);
throw new IOException(StorageException.coalesce(e));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ final class BlobReadChannelV2 extends BaseStorageReadChannel<StorageObject> {
this.opts = opts;
this.blobReadChannelContext = blobReadChannelContext;
this.autoGzipDecompression =
Utils.isAutoGzipDecompression(opts, /*defaultWhenUndefined=*/ false);
// RETURN_RAW_INPUT_STREAM means do not add GZIPInputStream to the pipeline. Meaning, if
// RETURN_RAW_INPUT_STREAM is false, automatically attempt to decompress if Content-Encoding
// gzip.
Boolean.FALSE.equals(opts.get(StorageRpc.Option.RETURN_RAW_INPUT_STREAM));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,34 +256,11 @@ public Blob create(

@Override
public Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options) {
requireNonNull(blobInfo, "blobInfo must be non null");

Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts);
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);

UnbufferedWritableByteChannelSession<WriteObjectResponse> session =
ResumableMedia.gapic()
.write()
.byteChannel(
storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext))
.setHasher(Hasher.enabled())
.setByteStringStrategy(ByteStringStrategy.noCopy())
.direct()
.unbuffered()
.setRequest(req)
.build();

// Specifically not in the try-with, so we don't close the provided stream
ReadableByteChannel src =
Channels.newChannel(firstNonNull(content, new ByteArrayInputStream(ZERO_BYTES)));
try (UnbufferedWritableByteChannel dst = session.open()) {
ByteStreams.copy(src, dst);
} catch (Exception e) {
try {
return createFrom(blobInfo, content, options);
} catch (IOException e) {
throw StorageException.coalesce(e);
}
return getBlob(session.getResult());
}

@Override
Expand Down Expand Up @@ -338,7 +315,7 @@ public Blob internalCreateFrom(Path path, BlobInfo info, Opts<ObjectTargetOpt> o
}
return codecs.blobInfo().decode(object).asBlob(this);
} catch (InterruptedException | ExecutionException e) {
throw StorageException.coalesce(e.getCause());
throw StorageException.coalesce(e);
}
}

Expand Down Expand Up @@ -388,14 +365,7 @@ public Blob createFrom(
@Override
public Bucket get(String bucket, BucketGetOption... options) {
Opts<BucketSourceOpt> unwrap = Opts.unwrap(options);
try {
return internalBucketGet(bucket, unwrap);
} catch (StorageException e) {
if (e.getCode() == 404) {
return null;
}
throw e;
}
return internalBucketGet(bucket, unwrap);
}

@Override
Expand Down Expand Up @@ -689,8 +659,7 @@ public byte[] readAllBytes(String bucket, String blob, BlobSourceOption... optio

@Override
public byte[] readAllBytes(BlobId blob, BlobSourceOption... options) {
UnbufferedReadableByteChannelSession<Object> session =
unbufferedDefaultAutoGzipDecompressingReadSession(blob, options);
UnbufferedReadableByteChannelSession<Object> session = unbufferedReadSession(blob, options);

ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (UnbufferedReadableByteChannel r = session.open();
Expand Down Expand Up @@ -718,19 +687,16 @@ public GrpcBlobReadChannel reader(BlobId blob, BlobSourceOption... options) {
ReadObjectRequest request = getReadObjectRequest(blob, opts);
Set<StatusCode.Code> codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(request));
GrpcCallContext grpcCallContext = Retrying.newCallContext().withRetryableCodes(codes);
boolean autoGzipDecompression =
Utils.isAutoGzipDecompression(opts, /*defaultWhenUndefined=*/ false);
return new GrpcBlobReadChannel(
storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext),
request,
autoGzipDecompression);
!opts.autoGzipDecompression());
}

@Override
public void downloadTo(BlobId blob, Path path, BlobSourceOption... options) {

UnbufferedReadableByteChannelSession<Object> session =
unbufferedDefaultAutoGzipDecompressingReadSession(blob, options);
UnbufferedReadableByteChannelSession<Object> session = unbufferedReadSession(blob, options);

try (UnbufferedReadableByteChannel r = session.open();
WritableByteChannel w = Files.newByteChannel(path, WRITE_OPS)) {
Expand All @@ -743,8 +709,7 @@ public void downloadTo(BlobId blob, Path path, BlobSourceOption... options) {
@Override
public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption... options) {

UnbufferedReadableByteChannelSession<Object> session =
unbufferedDefaultAutoGzipDecompressingReadSession(blob, options);
UnbufferedReadableByteChannelSession<Object> session = unbufferedReadSession(blob, options);

try (UnbufferedReadableByteChannel r = session.open();
WritableByteChannel w = Channels.newChannel(outputStream)) {
Expand Down Expand Up @@ -1842,20 +1807,18 @@ WriteObjectRequest getWriteObjectRequest(BlobInfo info, Opts<ObjectTargetOpt> op
return opts.writeObjectRequest().apply(requestBuilder).build();
}

private UnbufferedReadableByteChannelSession<Object>
unbufferedDefaultAutoGzipDecompressingReadSession(BlobId blob, BlobSourceOption[] options) {
private UnbufferedReadableByteChannelSession<Object> unbufferedReadSession(
BlobId blob, BlobSourceOption[] options) {
Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob).prepend(defaultOpts);
ReadObjectRequest readObjectRequest = getReadObjectRequest(blob, opts);
Set<StatusCode.Code> codes =
resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(readObjectRequest));
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(Retrying.newCallContext().withRetryableCodes(codes));
boolean autoGzipDecompression =
Utils.isAutoGzipDecompression(opts, /*defaultWhenUndefined=*/ true);
return ResumableMedia.gapic()
.read()
.byteChannel(storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext))
.setAutoGzipDecompression(autoGzipDecompression)
.setAutoGzipDecompression(!opts.autoGzipDecompression())
.unbuffered()
.setReadObjectRequest(readObjectRequest)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.google.cloud.storage;

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiExceptions;
import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel;
import java.io.ByteArrayInputStream;
import java.io.FilterInputStream;
Expand All @@ -28,6 +27,7 @@
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.ScatteringByteChannel;
import java.util.concurrent.ExecutionException;
import java.util.zip.GZIPInputStream;

final class GzipReadableByteChannel implements UnbufferedReadableByteChannel {
Expand Down Expand Up @@ -60,7 +60,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
source.read(wrap);
try {
// Step 2: wait for the object metadata, this is populated in the first message from GCS
String contentEncoding = ApiExceptions.callAndTranslateApiException(this.contentEncoding);
String contentEncoding = this.contentEncoding.get();
// if the Content-Encoding is gzip, Step 3: wire gzip decompression into the byte path
// this will have a copy impact as we are no longer controlling all the buffers
if ("gzip".equals(contentEncoding) || "x-gzip".equals(contentEncoding)) {
Expand All @@ -86,9 +86,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
bytesRead += Buffers.copy(wrap, dsts, offset, length);
delegate = source;
}
} catch (StorageException se) {
throw se;
} catch (Exception e) {
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.Executors.callable;

import com.google.api.core.ApiFuture;
import com.google.api.gax.paging.Page;
Expand All @@ -36,14 +37,12 @@
import com.google.cloud.Policy;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Acl.Entity;
import com.google.cloud.storage.ApiaryUnbufferedReadableByteChannel.ApiaryReadRequest;
import com.google.cloud.storage.BlobReadChannelV2.BlobReadChannelContext;
import com.google.cloud.storage.HmacKey.HmacKeyMetadata;
import com.google.cloud.storage.PostPolicyV4.ConditionV4Type;
import com.google.cloud.storage.PostPolicyV4.PostConditionsV4;
import com.google.cloud.storage.PostPolicyV4.PostFieldsV4;
import com.google.cloud.storage.PostPolicyV4.PostPolicyV4Document;
import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel;
import com.google.cloud.storage.UnifiedOpts.ObjectSourceOpt;
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
import com.google.cloud.storage.UnifiedOpts.Opts;
Expand All @@ -60,10 +59,9 @@
import com.google.common.collect.Maps;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import com.google.common.io.ByteStreams;
import com.google.common.io.CountingOutputStream;
import com.google.common.primitives.Ints;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -75,7 +73,6 @@
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.text.SimpleDateFormat;
Expand Down Expand Up @@ -617,25 +614,9 @@ public byte[] readAllBytes(BlobId blob, BlobSourceOption... options) {
Opts<ObjectSourceOpt> unwrap = Opts.unwrap(options);
Opts<ObjectSourceOpt> resolve = unwrap.resolveFrom(blob);
ImmutableMap<StorageRpc.Option, ?> optionsMap = resolve.getRpcOptions();
boolean autoGzipDecompression =
Utils.isAutoGzipDecompression(resolve, /*defaultWhenUndefined=*/ true);
UnbufferedReadableByteChannelSession<StorageObject> session =
ResumableMedia.http()
.read()
.byteChannel(BlobReadChannelContext.from(this))
.setAutoGzipDecompression(autoGzipDecompression)
.unbuffered()
.setApiaryReadRequest(
new ApiaryReadRequest(storageObject, optionsMap, ByteRangeSpec.nullRange()))
.build();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (UnbufferedReadableByteChannel r = session.open();
WritableByteChannel w = Channels.newChannel(baos)) {
ByteStreams.copy(r, w);
} catch (IOException e) {
throw StorageException.translate(e);
}
return baos.toByteArray();
ResultRetryAlgorithm<?> algorithm =
retryAlgorithmManager.getForObjectsGet(storageObject, optionsMap);
return run(algorithm, () -> storageRpc.load(storageObject, optionsMap), Function.identity());
}

@Override
Expand Down Expand Up @@ -667,26 +648,19 @@ public void downloadTo(BlobId blob, Path path, BlobSourceOption... options) {

@Override
public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption... options) {
final CountingOutputStream countingOutputStream = new CountingOutputStream(outputStream);
final StorageObject pb = codecs.blobId().encode(blob);
Opts<ObjectSourceOpt> resolve = Opts.unwrap(options).resolveFrom(blob);
ImmutableMap<StorageRpc.Option, ?> optionsMap = resolve.getRpcOptions();
boolean autoGzipDecompression =
Utils.isAutoGzipDecompression(resolve, /*defaultWhenUndefined=*/ true);
UnbufferedReadableByteChannelSession<StorageObject> session =
ResumableMedia.http()
.read()
.byteChannel(BlobReadChannelContext.from(this))
.setAutoGzipDecompression(autoGzipDecompression)
.unbuffered()
.setApiaryReadRequest(new ApiaryReadRequest(pb, optionsMap, ByteRangeSpec.nullRange()))
.build();
// don't close the provided stream
WritableByteChannel w = Channels.newChannel(outputStream);
try (UnbufferedReadableByteChannel r = session.open()) {
ByteStreams.copy(r, w);
} catch (IOException e) {
throw StorageException.translate(e);
}
ImmutableMap<StorageRpc.Option, ?> optionsMap =
Opts.unwrap(options).resolveFrom(blob).getRpcOptions();
ResultRetryAlgorithm<?> algorithm = retryAlgorithmManager.getForObjectsGet(pb, optionsMap);
run(
algorithm,
callable(
() -> {
storageRpc.read(
pb, optionsMap, countingOutputStream.getCount(), countingOutputStream);
}),
Function.identity());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2339,6 +2339,18 @@ Mapper<BlobInfo.Builder> blobInfoMapper() {
return fuseMappers(ObjectTargetOpt.class, ObjectTargetOpt::blobInfo);
}

/**
* Here for compatibility. This should NOT be an "Opt" instead an attribute of the channel
* builder. When {@link ReturnRawInputStream} is removed, this method should be removed as well.
*
* @see
* GapicDownloadSessionBuilder.ReadableByteChannelSessionBuilder#setAutoGzipDecompression(boolean)
*/
@Deprecated
boolean autoGzipDecompression() {
return filterTo(ReturnRawInputStream.class).findFirst().map(r -> r.val).orElse(true);
}

Decoder<BlobInfo, BlobInfo> clearBlobFields() {
return filterTo(Fields.class)
.findFirst()
Expand Down
Loading

0 comments on commit f42283d

Please sign in to comment.