Skip to content

Commit

Permalink
fix: last chunk is retriable (#677)
Browse files Browse the repository at this point in the history
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [x] Ensure the tests and linter pass
- [x] Appropriate docs were updated (if necessary)

Fixes #666 ☕️
  • Loading branch information
frankyn authored Jan 12, 2021
1 parent ebb5fb2 commit 44f49e0
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@ public void run() {
// For completeness, this case is not possible because it would require retrying
// a 400 status code which is not allowed.
//
// Case 7: remoteNextByteOffset==-1 && last == true
// Upload is complete and retry occurred in the "last" chunk. Data sent was
// received by the service.
//
// Case 8: remoteNextByteOffset==-1 && last == false
// Upload was completed by another client because this retry did not occur
// during the last chunk.
//
// Get remote offset from API
long remoteNextByteOffset =
getOptions().getStorageRpcV1().getCurrentUploadOffset(getUploadId());
Expand Down Expand Up @@ -154,7 +162,8 @@ && driftOffset < getChunkSize()) {
// Continue to next chunk
retrying = false;
return;
} else {
} else if (localNextByteOffset < remoteNextByteOffset
&& driftOffset > getChunkSize()) {
// Case 5
StringBuilder sb = new StringBuilder();
sb.append(
Expand All @@ -167,6 +176,13 @@ && driftOffset < getChunkSize()) {
sb.append("remoteNextByteOffset: ").append(remoteNextByteOffset).append('\n');
sb.append("driftOffset: ").append(driftOffset).append("\n\n");
throw new StorageException(0, sb.toString());
} else if (remoteNextByteOffset == -1 && last) {
// Case 7
retrying = false;
return;
} else if (remoteNextByteOffset == -1 && !last) {
// Case 8
throw new StorageException(0, "Resumable upload is already complete.");
}
}
// Request was successful and retrying state is now disabled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ public long getCurrentUploadOffset(String uploadId) {
response = httpRequest.execute();
int code = response.getStatusCode();
if (code == 201 || code == 200) {
throw new StorageException(0, "Resumable upload is already complete.");
return -1;
}
StringBuilder sb = new StringBuilder();
sb.append("Not sure what occurred. Here's debugging information:\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,64 @@ public void testWriteWithFlushRetryChunkWithDrift() throws IOException {
assertArrayEquals(buffer.array(), capturedBuffer.getValue());
}

@Test
public void testWriteWithLastFlushRetryChunkButCompleted() throws IOException {
StorageException exception = new StorageException(new SocketException("Socket closed"));
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture<byte[]> capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(
storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
capture(capturedBuffer),
eq(0),
eq(0L),
eq(MIN_CHUNK_SIZE),
eq(true)))
.andThrow(exception);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
replay(storageRpcMock);
writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);
assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
writer.close();
assertFalse(writer.isRetrying());
assertFalse(writer.isOpen());
// Capture captures entire buffer of a chunk even when not completely used.
// Making assert selective up to the size of MIN_CHUNK_SIZE
assertArrayEquals(Arrays.copyOf(capturedBuffer.getValue(), MIN_CHUNK_SIZE), buffer.array());
}

@Test
public void testWriteWithFlushRetryChunkButCompletedByAnotherClient() throws IOException {
StorageException exception = new StorageException(new SocketException("Socket closed"));
StorageException completedException =
new StorageException(0, "Resumable upload is already complete.");
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture<byte[]> capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(
storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
capture(capturedBuffer),
eq(0),
eq(0L),
eq(MIN_CHUNK_SIZE),
eq(false)))
.andThrow(exception);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L);
replay(storageRpcMock);
writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);
writer.setChunkSize(MIN_CHUNK_SIZE);
try {
writer.write(buffer);
fail("Expected completed exception.");
} catch (StorageException ex) {
assertEquals(ex, completedException);
}
assertTrue(writer.isRetrying());
assertTrue(writer.isOpen());
}

@Test
public void testWriteWithFlush() throws IOException {
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
Expand Down

0 comments on commit 44f49e0

Please sign in to comment.