Skip to content

Commit

Permalink
Chunker: Always seek on the uncompressed stream.
Browse files Browse the repository at this point in the history
The `WriteRequest.write_offset` field has bizarre semantics during compressed uploads as documented in the remote API protos: https://github.com/bazelbuild/remote-apis/blob/3b4b6402103539d66fcdd1a4d945f660742665ca/build/bazel/remote/execution/v2/remote_execution.proto#L241-L248 In particular, the write offset of the first `WriteRequest` refers to the offset in the uncompressed source.

This change ensures we always seek the uncompressed stream to the correct offset when starting an upload call. The old code could fail to resume compressed uploads under some conditions. The `progressiveCompressedUploadShouldWork` test purported to exercise this situation. The test, however, contained the same logic error as the code under test.

Closes #15669.

PiperOrigin-RevId: 455083727
Change-Id: Ie22dacf31f15644c7a83f49776e7a633d8bb4bca
  • Loading branch information
benjaminp authored and copybara-github committed Jun 15, 2022
1 parent 8b57c58 commit dd57d41
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 35 deletions.
30 changes: 22 additions & 8 deletions src/main/java/com/google/devtools/build/lib/remote/Chunker.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,17 +144,23 @@ public void reset() throws IOException {
}

/**
* Seek to an offset, if necessary resetting or initializing
* Seek to an offset in the source stream.
*
* <p>May close open resources in order to seek to an earlier offset.
* <p>May close and reopen resources in order to seek to an earlier offset.
*
* @param toOffset the offset from beginning of the source stream. If the source stream is
* compressed, it refers to the offset in the uncompressed form to align with `write_offset`
* in REAPI.
*/
public void seek(long toOffset) throws IOException {
if (toOffset < offset) {
// For compressed stream, we need to reinitialize the stream since the offset refers to the
// uncompressed form.
if (initialized && toOffset >= offset && !compressed) {
ByteStreams.skipFully(data, toOffset - offset);
} else {
reset();
initialize(toOffset);
}
maybeInitialize();
ByteStreams.skipFully(data, toOffset - offset);
offset = toOffset;
if (data.finished()) {
close();
}
Expand Down Expand Up @@ -247,18 +253,26 @@ private void maybeInitialize() throws IOException {
if (initialized) {
return;
}
initialize(0);
}

private void initialize(long srcPos) throws IOException {
checkState(!initialized);
checkState(data == null);
checkState(offset == 0);
checkState(chunkCache == null);
try {
var src = dataSupplier.get();
ByteStreams.skipFully(src, srcPos);
data =
compressed
? new ChunkerInputStream(new ZstdCompressingInputStream(dataSupplier.get()))
: new ChunkerInputStream(dataSupplier.get());
? new ChunkerInputStream(new ZstdCompressingInputStream(src))
: new ChunkerInputStream(src);
} catch (RuntimeException e) {
Throwables.propagateIfPossible(e.getCause(), IOException.class);
throw e;
}
offset = srcPos;
initialized = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -373,23 +374,18 @@ public void progressiveCompressedUploadShouldWork() throws Exception {
retrier,
/*maximumOpenFiles=*/ -1);

byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
int chunkSize = 1024;
int skipSize = chunkSize + 1;
byte[] blob = new byte[chunkSize * 2 + 1];
new Random().nextBytes(blob);

Chunker chunker =
Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(CHUNK_SIZE).build();
Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(chunkSize).build();
Digest digest = DIGEST_UTIL.compute(blob);

while (chunker.hasNext()) {
chunker.next();
}
long expectedSize = chunker.getOffset();
chunker.reset();

ByteArrayOutputStream output = new ByteArrayOutputStream();
serviceRegistry.addService(
new ByteStreamImplBase() {

byte[] receivedData = new byte[(int) expectedSize];
String receivedResourceName = null;
boolean receivedComplete = false;
long nextOffset = 0;
Expand All @@ -414,21 +410,21 @@ public void onNext(WriteRequest writeRequest) {
assertThat(resourceName).isEmpty();
}

assertThat(writeRequest.getWriteOffset()).isEqualTo(nextOffset);

ByteString data = writeRequest.getData();

System.arraycopy(
data.toByteArray(), 0, receivedData, (int) nextOffset, data.size());

nextOffset += data.size();
receivedComplete = expectedSize == nextOffset;
assertThat(writeRequest.getFinishWrite()).isEqualTo(receivedComplete);

if (initialOffset == 0) {
streamObserver.onError(Status.DEADLINE_EXCEEDED.asException());
mustQueryWriteStatus = true;
initialOffset = nextOffset;
initialOffset = skipSize;
nextOffset = initialOffset;
} else {
ByteString data = writeRequest.getData();
try {
data.writeTo(output);
} catch (IOException e) {
streamObserver.onError(e);
return;
}
nextOffset += data.size();
receivedComplete = writeRequest.getFinishWrite();
}
}

Expand All @@ -439,10 +435,6 @@ public void onError(Throwable throwable) {

@Override
public void onCompleted() {
assertThat(nextOffset).isEqualTo(expectedSize);
byte[] decompressed = Zstd.decompress(receivedData, blob.length);
assertThat(decompressed).isEqualTo(blob);

WriteResponse response =
WriteResponse.newBuilder().setCommittedSize(nextOffset).build();
streamObserver.onNext(response);
Expand All @@ -460,7 +452,7 @@ public void queryWriteStatus(
if (receivedResourceName != null && receivedResourceName.equals(resourceName)) {
assertThat(mustQueryWriteStatus).isTrue();
mustQueryWriteStatus = false;
committedSize = nextOffset;
committedSize = receivedComplete ? blob.length : skipSize;
complete = receivedComplete;
} else {
committedSize = 0;
Expand All @@ -476,6 +468,9 @@ public void queryWriteStatus(
});

uploader.uploadBlob(context, digest, chunker);
byte[] decompressed = Zstd.decompress(output.toByteArray(), blob.length - skipSize);
assertThat(Arrays.equals(decompressed, 0, decompressed.length, blob, skipSize, blob.length))
.isTrue();

// This test triggers one retry.
Mockito.verify(mockBackoff, Mockito.times(1)).nextDelayMillis(any(Exception.class));
Expand Down

0 comments on commit dd57d41

Please sign in to comment.