From 64571a428ffe2bf09f1a5eea13e770a7d0381620 Mon Sep 17 00:00:00 2001 From: Chenchu K Date: Thu, 7 Jul 2022 08:15:52 -0500 Subject: [PATCH] Ck/cherrypick 15669 (#15788) * Chunker: Always seek on the uncompressed stream. The `WriteRequest.write_offset` field has bizarre semantics during compressed uploads as documented in the remote API protos: https://github.com/bazelbuild/remote-apis/blob/3b4b6402103539d66fcdd1a4d945f660742665ca/build/bazel/remote/execution/v2/remote_execution.proto#L241-L248 In particular, the write offset of the first `WriteRequest` refers to the offset in the uncompressed source. This change ensures we always seek the uncompressed stream to the correct offset when starting an upload call. The old code could fail to resume compressed uploads under some conditions. The `progressiveCompressedUploadShouldWork` test purported to exercise this situation. The test, however, contained the same logic error as the code under test. Closes #15669. PiperOrigin-RevId: 455083727 Change-Id: Ie22dacf31f15644c7a83f49776e7a633d8bb4bca * Updated chunker.java file. * Update src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java Co-authored-by: Benjamin Peterson * Update src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java Co-authored-by: Benjamin Peterson * Update src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java Co-authored-by: Benjamin Peterson Co-authored-by: Benjamin Peterson Co-authored-by: Benjamin Peterson --- .../devtools/build/lib/remote/Chunker.java | 30 +++++++++--- .../lib/remote/ByteStreamUploaderTest.java | 49 +++++++++---------- 2 files changed, 44 insertions(+), 35 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java index d1024a3d3143cf..64399b9b91f02f 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java +++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java @@ -142,17 +142,23 @@ public void reset() throws IOException { } /** - * Seek to an offset, if necessary resetting or initializing + * Seek to an offset in the source stream. * - *

May close open resources in order to seek to an earlier offset. + *

May close and reopen resources in order to seek to an earlier offset. + * + * @param toOffset the offset from beginning of the source stream. If the source stream is + * compressed, it refers to the offset in the uncompressed form to align with `write_offset` + * in REAPI. */ public void seek(long toOffset) throws IOException { - if (toOffset < offset) { + // For compressed stream, we need to reinitialize the stream since the offset refers to the + // uncompressed form. + if (initialized && toOffset >= offset && !compressed) { + ByteStreams.skipFully(data, toOffset - offset); + } else { reset(); + initialize(toOffset); } - maybeInitialize(); - ByteStreams.skipFully(data, toOffset - offset); - offset = toOffset; if (data.finished()) { close(); } @@ -245,18 +251,26 @@ private void maybeInitialize() throws IOException { if (initialized) { return; } + initialize(0); + } + + private void initialize(long srcPos) throws IOException { + checkState(!initialized); checkState(data == null); checkState(offset == 0); checkState(chunkCache == null); try { + InputStream src = dataSupplier.get(); + ByteStreams.skipFully(src, srcPos); data = compressed - ? new ChunkerInputStream(new ZstdCompressingInputStream(dataSupplier.get())) - : new ChunkerInputStream(dataSupplier.get()); + ? new ChunkerInputStream(new ZstdCompressingInputStream(src)) + : new ChunkerInputStream(src); } catch (RuntimeException e) { Throwables.propagateIfPossible(e.getCause(), IOException.class); throw e; } + offset = srcPos; initialized = true; } diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index c4165c2de1cf97..1b88a3267d442e 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -69,6 +69,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -357,23 +358,18 @@ public void progressiveCompressedUploadShouldWork() throws Exception { 300, retrier); - byte[] blob = new byte[CHUNK_SIZE * 2 + 1]; + int chunkSize = 1024; + int skipSize = chunkSize + 1; + byte[] blob = new byte[chunkSize * 2 + 1]; new Random().nextBytes(blob); Chunker chunker = - Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(CHUNK_SIZE).build(); + Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(chunkSize).build(); HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash()); - while (chunker.hasNext()) { - chunker.next(); - } - long expectedSize = chunker.getOffset(); - chunker.reset(); - + ByteArrayOutputStream output = new ByteArrayOutputStream(); serviceRegistry.addService( new ByteStreamImplBase() { - - byte[] receivedData = new byte[(int) expectedSize]; String receivedResourceName = null; boolean receivedComplete = false; long nextOffset = 0; @@ -398,21 +394,21 @@ public void onNext(WriteRequest writeRequest) { assertThat(resourceName).isEmpty(); } - assertThat(writeRequest.getWriteOffset()).isEqualTo(nextOffset); - - ByteString data = writeRequest.getData(); - - System.arraycopy( - data.toByteArray(), 0, receivedData, (int) nextOffset, data.size()); - - nextOffset += data.size(); - receivedComplete = expectedSize == nextOffset; - assertThat(writeRequest.getFinishWrite()).isEqualTo(receivedComplete); - if (initialOffset == 0) { streamObserver.onError(Status.DEADLINE_EXCEEDED.asException()); mustQueryWriteStatus = true; - initialOffset = nextOffset; + initialOffset = skipSize; + nextOffset = initialOffset; + } else { + ByteString data = writeRequest.getData(); + try { + data.writeTo(output); + } catch (IOException e) { + streamObserver.onError(e); + return; + } + nextOffset += data.size(); + receivedComplete = writeRequest.getFinishWrite(); } } @@ -423,10 +419,6 @@ public void onError(Throwable throwable) { @Override public void onCompleted() { - assertThat(nextOffset).isEqualTo(expectedSize); - byte[] decompressed = Zstd.decompress(receivedData, blob.length); - assertThat(decompressed).isEqualTo(blob); - WriteResponse response = WriteResponse.newBuilder().setCommittedSize(nextOffset).build(); streamObserver.onNext(response); @@ -444,7 +436,7 @@ public void queryWriteStatus( if (receivedResourceName != null && receivedResourceName.equals(resourceName)) { assertThat(mustQueryWriteStatus).isTrue(); mustQueryWriteStatus = false; - committedSize = nextOffset; + committedSize = receivedComplete ? blob.length : skipSize; complete = receivedComplete; } else { committedSize = 0; @@ -460,6 +452,9 @@ public void queryWriteStatus( }); uploader.uploadBlob(context, hash, chunker, true); + byte[] decompressed = Zstd.decompress(output.toByteArray(), blob.length - skipSize); + assertThat(Arrays.equals(decompressed, 0, decompressed.length, blob, skipSize, blob.length)) + .isTrue(); // This test should not have triggered any retries. Mockito.verify(mockBackoff, Mockito.never()).nextDelayMillis(any(Exception.class));