From 84fd41c09d51c43270e3a421e695cb00efccb4a4 Mon Sep 17 00:00:00 2001 From: Benjamin Peterson Date: Wed, 15 Jun 2022 03:25:41 -0700 Subject: [PATCH 1/5] Chunker: Always seek on the uncompressed stream. The `WriteRequest.write_offset` field has bizarre semantics during compressed uploads as documented in the remote API protos: https://github.com/bazelbuild/remote-apis/blob/3b4b6402103539d66fcdd1a4d945f660742665ca/build/bazel/remote/execution/v2/remote_execution.proto#L241-L248 In particular, the write offset of the first `WriteRequest` refers to the offset in the uncompressed source. This change ensures we always seek the uncompressed stream to the correct offset when starting an upload call. The old code could fail to resume compressed uploads under some conditions. The `progressiveCompressedUploadShouldWork` test purported to exercise this situation. The test, however, contained the same logic error as the code under test. Closes #15669. PiperOrigin-RevId: 455083727 Change-Id: Ie22dacf31f15644c7a83f49776e7a633d8bb4bca --- .../devtools/build/lib/remote/Chunker.java | 30 ++++++++--- .../lib/remote/ByteStreamUploaderTest.java | 53 +++++++++---------- 2 files changed, 46 insertions(+), 37 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java index d1024a3d3143cf..cdbac4abf53cfa 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java +++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java @@ -142,17 +142,23 @@ public void reset() throws IOException { } /** - * Seek to an offset, if necessary resetting or initializing + * Seek to an offset in the source stream. * - *

May close open resources in order to seek to an earlier offset. + *

May close and reopen resources in order to seek to an earlier offset. + * + * @param toOffset the offset from beginning of the source stream. If the source stream is + * compressed, it refers to the offset in the uncompressed form to align with `write_offset` + * in REAPI. */ public void seek(long toOffset) throws IOException { - if (toOffset < offset) { + // For compressed stream, we need to reinitialize the stream since the offset refers to the + // uncompressed form. + if (initialized && toOffset >= offset && !compressed) { + ByteStreams.skipFully(data, toOffset - offset); + } else { reset(); + initialize(toOffset); } - maybeInitialize(); - ByteStreams.skipFully(data, toOffset - offset); - offset = toOffset; if (data.finished()) { close(); } @@ -245,18 +251,26 @@ private void maybeInitialize() throws IOException { if (initialized) { return; } + initialize(0); + } + + private void initialize(long srcPos) throws IOException { + checkState(!initialized); checkState(data == null); checkState(offset == 0); checkState(chunkCache == null); try { + var src = dataSupplier.get(); + ByteStreams.skipFully(src, srcPos); data = compressed - ? new ChunkerInputStream(new ZstdCompressingInputStream(dataSupplier.get())) - : new ChunkerInputStream(dataSupplier.get()); + ? new ChunkerInputStream(new ZstdCompressingInputStream(src)) + : new ChunkerInputStream(src); } catch (RuntimeException e) { Throwables.propagateIfPossible(e.getCause(), IOException.class); throw e; } + offset = srcPos; initialized = true; } diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index c4165c2de1cf97..18c168084c0e35 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -69,6 +69,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -357,23 +358,18 @@ public void progressiveCompressedUploadShouldWork() throws Exception { 300, retrier); - byte[] blob = new byte[CHUNK_SIZE * 2 + 1]; + int chunkSize = 1024; + int skipSize = chunkSize + 1; + byte[] blob = new byte[chunkSize * 2 + 1]; new Random().nextBytes(blob); Chunker chunker = - Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(CHUNK_SIZE).build(); - HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash()); - - while (chunker.hasNext()) { - chunker.next(); - } - long expectedSize = chunker.getOffset(); - chunker.reset(); + Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(chunkSize).build(); + Digest digest = DIGEST_UTIL.compute(blob); + ByteArrayOutputStream output = new ByteArrayOutputStream(); serviceRegistry.addService( new ByteStreamImplBase() { - - byte[] receivedData = new byte[(int) expectedSize]; String receivedResourceName = null; boolean receivedComplete = false; long nextOffset = 0; @@ -398,21 +394,21 @@ public void onNext(WriteRequest writeRequest) { assertThat(resourceName).isEmpty(); } - assertThat(writeRequest.getWriteOffset()).isEqualTo(nextOffset); - - ByteString data = writeRequest.getData(); - - System.arraycopy( - data.toByteArray(), 0, receivedData, (int) nextOffset, data.size()); - - nextOffset += data.size(); - receivedComplete = expectedSize == nextOffset; - assertThat(writeRequest.getFinishWrite()).isEqualTo(receivedComplete); - if (initialOffset == 0) { streamObserver.onError(Status.DEADLINE_EXCEEDED.asException()); mustQueryWriteStatus = true; - initialOffset = nextOffset; + initialOffset = skipSize; + nextOffset = initialOffset; + } else { + ByteString data = writeRequest.getData(); + try { + data.writeTo(output); + } catch (IOException e) { + streamObserver.onError(e); + return; + } + nextOffset += data.size(); + receivedComplete = writeRequest.getFinishWrite(); } } @@ -423,10 +419,6 @@ public void onError(Throwable throwable) { @Override public void onCompleted() { - assertThat(nextOffset).isEqualTo(expectedSize); - byte[] decompressed = Zstd.decompress(receivedData, blob.length); - assertThat(decompressed).isEqualTo(blob); - WriteResponse response = WriteResponse.newBuilder().setCommittedSize(nextOffset).build(); streamObserver.onNext(response); @@ -444,7 +436,7 @@ public void queryWriteStatus( if (receivedResourceName != null && receivedResourceName.equals(resourceName)) { assertThat(mustQueryWriteStatus).isTrue(); mustQueryWriteStatus = false; - committedSize = nextOffset; + committedSize = receivedComplete ? blob.length : skipSize; complete = receivedComplete; } else { committedSize = 0; @@ -459,7 +451,10 @@ public void queryWriteStatus( } }); - uploader.uploadBlob(context, hash, chunker, true); + uploader.uploadBlob(context, digest, chunker); + byte[] decompressed = Zstd.decompress(output.toByteArray(), blob.length - skipSize); + assertThat(Arrays.equals(decompressed, 0, decompressed.length, blob, skipSize, blob.length)) + .isTrue(); // This test should not have triggered any retries. Mockito.verify(mockBackoff, Mockito.never()).nextDelayMillis(any(Exception.class)); From 6e544b4bced5345e1cedc10ec9133f4d2e17ff8a Mon Sep 17 00:00:00 2001 From: Chenchu Kolli Date: Fri, 1 Jul 2022 10:25:11 -0500 Subject: [PATCH 2/5] Updated chunker.java file. --- src/main/java/com/google/devtools/build/lib/remote/Chunker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java index cdbac4abf53cfa..64399b9b91f02f 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java +++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java @@ -260,7 +260,7 @@ private void initialize(long srcPos) throws IOException { checkState(offset == 0); checkState(chunkCache == null); try { - var src = dataSupplier.get(); + InputStream src = dataSupplier.get(); ByteStreams.skipFully(src, srcPos); data = compressed From 926fcc286142cdddd07bbcf51237b7254c7a5947 Mon Sep 17 00:00:00 2001 From: Chenchu K Date: Wed, 6 Jul 2022 15:58:51 -0500 Subject: [PATCH 3/5] Update src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java Co-authored-by: Benjamin Peterson --- .../devtools/build/lib/remote/ByteStreamUploaderTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index 18c168084c0e35..ad9f0827e87121 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -451,7 +451,7 @@ public void queryWriteStatus( } }); - uploader.uploadBlob(context, digest, chunker); + uploader.uploadBlob(context, digest, chunker, true); byte[] decompressed = Zstd.decompress(output.toByteArray(), blob.length - skipSize); assertThat(Arrays.equals(decompressed, 0, decompressed.length, blob, skipSize, blob.length)) .isTrue(); From 950d403455c928c7b5009ef9463d896177b1c1f6 Mon Sep 17 00:00:00 2001 From: Chenchu K Date: Wed, 6 Jul 2022 19:22:12 -0500 Subject: [PATCH 4/5] Update src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java Co-authored-by: Benjamin Peterson --- .../devtools/build/lib/remote/ByteStreamUploaderTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index ad9f0827e87121..38721724c3ee84 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -451,7 +451,7 @@ public void queryWriteStatus( } }); - uploader.uploadBlob(context, digest, chunker, true); + uploader.uploadBlob(context, hash, chunker, true); byte[] decompressed = Zstd.decompress(output.toByteArray(), blob.length - skipSize); assertThat(Arrays.equals(decompressed, 0, decompressed.length, blob, skipSize, blob.length)) .isTrue(); From 36a65fe8a66fc20d4f9dcbd5b33f8b82b383b465 Mon Sep 17 00:00:00 2001 From: Chenchu K Date: Wed, 6 Jul 2022 19:24:48 -0500 Subject: [PATCH 5/5] Update src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java Co-authored-by: Benjamin Peterson --- .../devtools/build/lib/remote/ByteStreamUploaderTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index 38721724c3ee84..1b88a3267d442e 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -365,7 +365,7 @@ public void progressiveCompressedUploadShouldWork() throws Exception { Chunker chunker = Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(chunkSize).build(); - Digest digest = DIGEST_UTIL.compute(blob); + HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash()); ByteArrayOutputStream output = new ByteArrayOutputStream(); serviceRegistry.addService(