Skip to content

Commit

Permalink
fix: retry using remote offset
Browse files Browse the repository at this point in the history
  • Loading branch information
frankyn committed Nov 9, 2020
1 parent ea25492 commit 0fdb9d7
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 5 deletions.
5 changes: 5 additions & 0 deletions google-cloud-storage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,9 @@
<method>com.google.cloud.storage.BucketInfo$Builder setUpdateTime(java.lang.Long)</method>
<differenceType>7013</differenceType>
</difference>
<difference>
<className>com/google/cloud/storage/spi/v1/StorageRpc</className>
<method>long getCurrentUploadOffset(java.lang.String)</method>
<differenceType>7012</differenceType>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ class BlobWriteChannel extends BaseWriteChannel<StorageOptions, BlobInfo> {
// Contains metadata of the updated object or null if upload is not completed.
private StorageObject storageObject;

// Detect if flushBuffer() is being retried or not.
// TODO: I don't think this is thread safe, and there's probably a better way to detect a retry
// occuring.
private boolean retrying = false;

boolean isRetrying() {
return retrying;
}

StorageObject getStorageObject() {
return storageObject;
}
Expand All @@ -63,11 +72,105 @@ protected void flushBuffer(final int length, final boolean last) {
new Runnable() {
@Override
public void run() {
storageObject =
getOptions()
.getStorageRpcV1()
.writeWithResponse(
getUploadId(), getBuffer(), 0, getPosition(), length, last);
if (!isRetrying()) {
// Enable isRetrying state to reduce number of calls to getCurrentUploadOffset()
retrying = true;
storageObject =
getOptions()
.getStorageRpcV1()
.writeWithResponse(
getUploadId(), getBuffer(), 0, getPosition(), length, last);
} else {
// Retriable interruption occurred.
// Variables:
// chunk = getBuffer()
// localNextByteOffset == getPosition()
// chunkSize = getChunkSize()
//
// Case 1: localNextByteOffset == 0 && remoteNextByteOffset == 0:
// we are retrying from first chunk start from 0 offset.
//
// Case 2: localNextByteOffset == remoteNextByteOffset:
// Special case of Case 1 when a chunk is retried.
//
// Case 3: localNextByteOffset < remoteNextByteOffset
// && driftOffset < chunkSize:
// Upload progressed and localNextByteOffset is not in-sync with
// remoteNextByteOffset and driftOffset is less than chunkSize.
// driftOffset must be less than chunkSize for it to retry using
// chunk maintained in memory.
// Find the driftOffset by subtracting localNextByteOffset from
// remoteNextByteOffset.
// Use driftOffset to determine where to restart from using the chunk in
// memory.
//
// Case 4: localNextByteOffset < remoteNextByteOffset
// && driftOffset == chunkSize:
// Special case of Case 3.
// If chunkSize is equal to driftOffset then remoteNextByteOffset has moved on
// to the next chunk.
//
// Case 5: localNextByteOffset < remoteNextByteOffset
// && driftOffset > chunkSize:
// Throw exception as remoteNextByteOffset has drifted beyond the retriable
// chunk maintained in memory. This is not possible unless there's multiple
// clients uploading to the same resumable upload session.
//
// Case 6: localNextByteOffset > remoteNextByteOffset:
// For completeness, this case is not possible because it would require retrying
// a 400 status code which is not allowed.
//
// Get remote offset from API
long remoteNextByteOffset =
getOptions().getStorageRpcV1().getCurrentUploadOffset(getUploadId());
long localNextByteOffset = getPosition();
int driftOffset = (int) (remoteNextByteOffset - localNextByteOffset);
int retryChunkLength = length - driftOffset;

if (localNextByteOffset == 0 && remoteNextByteOffset == 0
|| localNextByteOffset == remoteNextByteOffset) {
// Case 1 and 2
storageObject =
getOptions()
.getStorageRpcV1()
.writeWithResponse(
getUploadId(), getBuffer(), 0, getPosition(), length, last);
} else if (localNextByteOffset < remoteNextByteOffset
&& driftOffset < getChunkSize()) {
// Case 3
storageObject =
getOptions()
.getStorageRpcV1()
.writeWithResponse(
getUploadId(),
getBuffer(),
driftOffset,
remoteNextByteOffset,
retryChunkLength,
last);
} else if (localNextByteOffset < remoteNextByteOffset
&& driftOffset == getChunkSize()) {
// Case 4
// Continue to next chunk
retrying = false;
return;
} else {
// Case 5
StringBuilder sb = new StringBuilder();
sb.append(
"Remote offset has progressed beyond starting byte offset of next chunk");
sb.append(
"and may a symptom of multiple clients uploading to the same upload session.\n\n");
sb.append("For debugging purposes:\n");
sb.append("uploadId: ").append(getUploadId()).append('\n');
sb.append("localNextByteOffset: ").append(localNextByteOffset).append('\n');
sb.append("remoteNextByteOffset: ").append(remoteNextByteOffset).append('\n');
sb.append("driftOffset: ").append(driftOffset).append("\n\n");
throw new StorageException(0, sb.toString());
}
}
// Request was successful and retrying state is now disabled.
retrying = false;
}
}),
getOptions().getRetrySettings(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,51 @@ public void write(
writeWithResponse(uploadId, toWrite, toWriteOffset, destOffset, length, last);
}

@Override
public long getCurrentUploadOffset(String uploadId) {
try {
GenericUrl url = new GenericUrl(uploadId);
HttpRequest httpRequest = storage.getRequestFactory().buildPutRequest(url, null);

httpRequest.getHeaders().setContentRange("bytes */*");
// Turn off automatic redirects.
// HTTP 308 are returned if upload is incomplete.
// See: https://cloud.google.com/storage/docs/performing-resumable-uploads
httpRequest.setFollowRedirects(false);

HttpResponse response = null;
try {
response = httpRequest.execute();
int code = response.getStatusCode();
String message = response.getStatusMessage();
if (code == 201 || code == 200) {
throw new StorageException(0, "Resumable upload is already complete.");
}
StringBuilder sb = new StringBuilder();
sb.append("Not sure what occurred. Here's debugging information:\n");
sb.append("Response:\n").append(response.toString()).append("\n\n");
throw new StorageException(0, sb.toString());
} catch (HttpResponseException ex) {
int code = ex.getStatusCode();
if (code == 308 && ex.getHeaders().getRange() == null) {
// No progress has been made.
return 0;
} else {
// API returns last byte received offset
String range = ex.getHeaders().getRange();
// Return next byte offset by adding 1 to last byte received offset
return Long.parseLong(range.substring(range.indexOf("-") + 1)) + 1;
}
} finally {
if (response != null) {
response.disconnect();
}
}
} catch (IOException ex) {
throw translate(ex);
}
}

@Override
public StorageObject writeWithResponse(
String uploadId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,15 @@ void write(
int length,
boolean last);

/**
* Requests current byte offset from Cloud Storage API. Used to recover from a failure in some
* bytes were committed successfully to the open resumable session.
*
* @param uploadId resumable upload ID URL
* @throws StorageException upon failure
*/
long getCurrentUploadOffset(String uploadId);

/**
* Writes the provided bytes to a storage object at the provided location. If {@code last=true}
* returns metadata of the updated object, otherwise returns null.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ public void write(
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public long getCurrentUploadOffset(String uploadId) {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public StorageObject writeWithResponse(
String uploadId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,66 @@ public void testWriteWithoutFlush() throws IOException {
assertEquals(MIN_CHUNK_SIZE, writer.write(ByteBuffer.allocate(MIN_CHUNK_SIZE)));
}

@Test
public void testWriteWithFlushRetryChunk() throws IOException {
StorageException exception = new StorageException(new SocketException("Socket closed"));
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture<byte[]> capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
capture(capturedBuffer),
eq(0),
eq(0L),
eq(MIN_CHUNK_SIZE),
eq(false))).andThrow(exception);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(0L);
expect(storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
capture(capturedBuffer),
eq(0),
eq(0L),
eq(MIN_CHUNK_SIZE),
eq(false))).andReturn(null);
replay(storageRpcMock);
writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);
writer.setChunkSize(MIN_CHUNK_SIZE);
assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
assertTrue(writer.isOpen());
assertNull(writer.getStorageObject());
assertArrayEquals(buffer.array(), capturedBuffer.getValue());
}

@Test
public void testWriteWithFlushRetryChunkWithDrift() throws IOException {
StorageException exception = new StorageException(new SocketException("Socket closed"));
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
Capture<byte[]> capturedBuffer = Capture.newInstance();
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
expect(storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
capture(capturedBuffer),
eq(0),
eq(0L),
eq(MIN_CHUNK_SIZE),
eq(false))).andThrow(exception);
expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(10L);
expect(storageRpcMock.writeWithResponse(
eq(UPLOAD_ID),
capture(capturedBuffer),
eq(10),
eq(10L),
eq(MIN_CHUNK_SIZE - 10),
eq(false))).andReturn(null);
replay(storageRpcMock);
writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS);
writer.setChunkSize(MIN_CHUNK_SIZE);
assertEquals(MIN_CHUNK_SIZE, writer.write(buffer));
assertTrue(writer.isOpen());
assertNull(writer.getStorageObject());
assertArrayEquals(buffer.array(), capturedBuffer.getValue());
}

@Test
public void testWriteWithFlush() throws IOException {
expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);
Expand Down

0 comments on commit 0fdb9d7

Please sign in to comment.