diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannel.java index d5f9cd3538..a5dd1375e0 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannel.java @@ -47,6 +47,7 @@ import java.math.BigInteger; import java.nio.ByteBuffer; import java.nio.channels.Channels; +import java.nio.channels.ClosedChannelException; import java.nio.channels.ReadableByteChannel; import java.nio.channels.ScatteringByteChannel; import java.util.List; @@ -68,6 +69,7 @@ class ApiaryUnbufferedReadableByteChannel implements UnbufferedReadableByteChann private long position; private ScatteringByteChannel sbc; private boolean open; + private boolean returnEOF; // returned X-Goog-Generation header value private Long xGoogGeneration; @@ -84,16 +86,25 @@ class ApiaryUnbufferedReadableByteChannel implements UnbufferedReadableByteChann this.options = options; this.resultRetryAlgorithm = resultRetryAlgorithm; this.open = true; + this.returnEOF = false; this.position = apiaryReadRequest.getByteRangeSpec().beginOffset(); } @Override public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { + if (returnEOF) { + close(); + return -1; + } else if (!open) { + throw new ClosedChannelException(); + } + long totalRead = 0; do { if (sbc == null) { sbc = Retrying.run(options, resultRetryAlgorithm, this::open, Function.identity()); } + long totalRemaining = Buffers.totalRemaining(dsts, offset, length); try { // According to the contract of Retrying#run it's possible for sbc to be null even after // invocation. However, the function we provide is guaranteed to return non-null or throw @@ -101,11 +112,11 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { //noinspection ConstantConditions long read = sbc.read(dsts, offset, length); if (read == -1) { - open = false; + returnEOF = true; } else { - position += read; + totalRead += read; } - return read; + return totalRead; } catch (Exception t) { if (resultRetryAlgorithm.shouldRetry(t, null)) { // if our retry algorithm COULD allow a retry, continue the loop and allow trying to @@ -121,6 +132,13 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { } else { throw new IOException(StorageException.coalesce(t)); } + } finally { + long totalRemainingAfter = Buffers.totalRemaining(dsts, offset, length); + long delta = totalRemaining - totalRemainingAfter; + if (delta > 0) { + position += delta; + totalRead += delta; + } } } while (true); } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/Buffers.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/Buffers.java index e72613feac..1ea80287dd 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/Buffers.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/Buffers.java @@ -20,6 +20,7 @@ import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; +import java.util.Arrays; import java.util.function.Consumer; /** @@ -159,4 +160,9 @@ static int fillFrom(ByteBuffer buf, ReadableByteChannel c) throws IOException { } return total; } + + static long totalRemaining(ByteBuffer[] buffers, int offset, int length) { + ByteBuffer[] sub = Arrays.copyOfRange(buffers, offset, length); + return Arrays.stream(sub).mapToLong(Buffer::remaining).sum(); + } } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMappings.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMappings.java index dbd0e9256c..9dc246373c 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMappings.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMappings.java @@ -16,6 +16,7 @@ package com.google.cloud.storage.conformance.retry; +import static com.google.cloud.storage.TestUtils.xxd; import static com.google.cloud.storage.conformance.retry.CtxFunctions.ResourceSetup.defaultSetup; import static com.google.cloud.storage.conformance.retry.CtxFunctions.ResourceSetup.notificationSetup; import static com.google.cloud.storage.conformance.retry.CtxFunctions.ResourceSetup.pubsubTopicSetup; @@ -1277,9 +1278,12 @@ private static void get(ArrayList a) { try { ReadChannel reader = ctx.getStorage().reader(ctx.getState().getBlob().getBlobId()); - WritableByteChannel write = - Channels.newChannel(ByteStreams.nullOutputStream()); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + WritableByteChannel write = Channels.newChannel(baos); ByteStreams.copy(reader, write); + + assertThat(xxd(baos.toByteArray())) + .isEqualTo(xxd(c.getHelloWorldUtf8Bytes())); } catch (IOException e) { if (e.getCause() instanceof BaseServiceException) { throw e.getCause(); @@ -1299,9 +1303,12 @@ private static void get(ArrayList a) { .reader( ctx.getState().getBlob().getBlobId().getBucket(), ctx.getState().getBlob().getBlobId().getName()); - WritableByteChannel write = - Channels.newChannel(ByteStreams.nullOutputStream()); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + WritableByteChannel write = Channels.newChannel(baos); ByteStreams.copy(reader, write); + + assertThat(xxd(baos.toByteArray())) + .isEqualTo(xxd(c.getHelloWorldUtf8Bytes())); } catch (IOException e) { if (e.getCause() instanceof BaseServiceException) { throw e.getCause(); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobReadChannelV2RetryTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobReadChannelV2RetryTest.java index 4186b73aca..07ff4b0fd8 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobReadChannelV2RetryTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobReadChannelV2RetryTest.java @@ -160,8 +160,6 @@ public void restartingAStreamForGzipContentIsAtTheCorrectOffset() throws Excepti .setContentEncoding("gzip") .build(); Blob gen1 = storage.create(info, gzipped.getBytes(), BlobTargetOption.doesNotExist()); - String uri = gen1.getBlobId().toGsUtilUri(); - System.out.println("uri = " + uri); JsonObject instructions = new JsonObject(); JsonArray value = new JsonArray(); @@ -200,4 +198,57 @@ public void restartingAStreamForGzipContentIsAtTheCorrectOffset() throws Excepti .isEqualTo(ImmutableList.of(String.format("bytes=%d-", 256 * 1024)))); } } + + @Test + public void resumeFromCorrectOffsetWhenPartialReadSuccess() throws Exception { + StorageOptions baseOptions = storage.getOptions(); + Random rand = new Random(918273645); + + ChecksummedTestContent uncompressed; + { + // must use random strategy, base64 characters compress too well. 512KiB uncompressed becomes + // ~1600 bytes which is smaller than our 'return-broken-stream-after-256K' rule + byte[] bytes = DataGenerator.rand(rand).genBytes(_512KiB); + // byte[] bytes = DataGenerator.base64Characters().genBytes(_512KiB); + uncompressed = ChecksummedTestContent.of(bytes); + } + BlobId id = BlobId.of(bucket.getName(), generator.randomObjectName()); + BlobInfo info = BlobInfo.newBuilder(id).build(); + Blob gen1 = storage.create(info, uncompressed.getBytes(), BlobTargetOption.doesNotExist()); + + JsonObject instructions = new JsonObject(); + JsonArray value = new JsonArray(); + value.add("return-broken-stream-after-256K"); + instructions.add("storage.objects.get", value); + RetryTestResource retryTestResource = new RetryTestResource(instructions); + RetryTestResource retryTest = testBench.createRetryTest(retryTestResource); + + ImmutableMap headers = ImmutableMap.of("x-retry-test-id", retryTest.id); + + RequestAuditing requestAuditing = new RequestAuditing(); + StorageOptions testStorageOptions = + baseOptions + .toBuilder() + .setTransportOptions(requestAuditing) + .setHeaderProvider(FixedHeaderProvider.create(headers)) + .build(); + + String expected = xxd(uncompressed.getBytes()); + + try (Storage testStorage = testStorageOptions.getService(); + ReadChannel r = testStorage.reader(gen1.getBlobId()); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + WritableByteChannel w = Channels.newChannel(baos)) { + long copy = ByteStreams.copy(r, w); + String actual = xxd(baos.toByteArray()); + ImmutableList requests = requestAuditing.getRequests(); + assertAll( + () -> assertThat(copy).isEqualTo(uncompressed.getBytes().length), + () -> assertThat(actual).isEqualTo(expected), + () -> assertThat(requests.get(0).getHeaders().get("range")).isNull(), + () -> + assertThat(requests.get(1).getHeaders().get("range")) + .isEqualTo(ImmutableList.of(String.format("bytes=%d-", 256 * 1024)))); + } + } }