Skip to content

Commit

Permalink
Merge 5446314 into c98413d
Browse files Browse the repository at this point in the history
  • Loading branch information
frankyn authored Feb 26, 2021
2 parents c98413d + 5446314 commit fbea777
Show file tree
Hide file tree
Showing 3 changed files with 396 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class BlobWriteChannel extends BaseWriteChannel<StorageOptions, BlobInfo> {
// TODO: I don't think this is thread safe, and there's probably a better way to detect a retry
// occuring.
private boolean retrying = false;
private boolean checkingForLastChunk = false;

boolean isRetrying() {
return retrying;
Expand All @@ -64,129 +65,141 @@ StorageObject getStorageObject() {
return storageObject;
}

private StorageObject transmitChunk(
int chunkOffset, int chunkLength, long position, boolean last) {
return getOptions()
.getStorageRpcV1()
.writeWithResponse(getUploadId(), getBuffer(), chunkOffset, position, chunkLength, last);
}

private long getRemotePosition() {
return getOptions().getStorageRpcV1().getCurrentUploadOffset(getUploadId());
}

private StorageObject getRemoteStorageObject() {
return getOptions().getStorageRpcV1().get(getEntity().toPb(), null);
}

private StorageException unrecoverableState(
int chunkOffset, int chunkLength, long localPosition, long remotePosition, boolean last) {
StringBuilder sb = new StringBuilder();
sb.append("Unable to recover in upload.\n");
sb.append(
"This may be a symptom of multiple clients uploading to the same upload session.\n\n");
sb.append("For debugging purposes:\n");
sb.append("uploadId: ").append(getUploadId()).append('\n');
sb.append("chunkOffset: ").append(chunkOffset).append('\n');
sb.append("chunkLength: ").append(chunkLength).append('\n');
sb.append("localOffset: ").append(localPosition).append('\n');
sb.append("remoteOffset: ").append(remotePosition).append('\n');
sb.append("lastChunk: ").append(last).append("\n\n");
return new StorageException(0, sb.toString());
}

// Retriable interruption occurred.
// Variables:
// chunk = getBuffer()
// localNextByteOffset == getPosition()
// chunkSize = getChunkSize()
//
// Case 1: localNextByteOffset == remoteNextByteOffset:
// Retrying the entire chunk
//
// Case 2: localNextByteOffset < remoteNextByteOffset
// && driftOffset < chunkSize:
// Upload progressed and localNextByteOffset is not in-sync with
// remoteNextByteOffset and driftOffset is less than chunkSize.
// driftOffset must be less than chunkSize for it to retry using
// chunk maintained in memory.
// Find the driftOffset by subtracting localNextByteOffset from
// remoteNextByteOffset.
// Use driftOffset to determine where to restart from using the chunk in
// memory.
//
// Case 3: localNextByteOffset < remoteNextByteOffset
// && driftOffset == chunkSize:
// Special case of Case 2.
// If chunkSize is equal to driftOffset then remoteNextByteOffset has moved on
// to the next chunk.
//
// Case 4: localNextByteOffset < remoteNextByteOffset
// && driftOffset > chunkSize:
// Throw exception as remoteNextByteOffset has drifted beyond the retriable
// chunk maintained in memory. This is not possible unless there's multiple
// clients uploading to the same resumable upload session.
//
// Case 5: localNextByteOffset > remoteNextByteOffset:
// For completeness, this case is not possible because it would require retrying
// a 400 status code which is not allowed.
//
// Case 6: remoteNextByteOffset==-1 && last == true
// Upload is complete and retry occurred in the "last" chunk. Data sent was
// received by the service.
//
// Case 7: remoteNextByteOffset==-1 && last == false && !checkingForLastChunk
// Not last chunk and are not checkingForLastChunk, allow for the client to
// catch up to final chunk which meets
// Case 6.
//
// Case 8: remoteNextByteOffset==-1 && last == false && checkingForLastChunk
// Not last chunk and checkingForLastChunk means this is the second time we
// hit this case, meaning the upload was completed by a different client.
//
// Case 9: Only possible if the client local offset continues beyond the remote
// offset which is not possible.
//
@Override
protected void flushBuffer(final int length, final boolean last) {
protected void flushBuffer(final int length, final boolean lastChunk) {
try {
runWithRetries(
callable(
new Runnable() {
@Override
public void run() {
// Get remote offset from API
final long localPosition = getPosition();
// For each request it should be possible to retry from its location in this code
final long remotePosition = isRetrying() ? getRemotePosition() : getPosition();
final int chunkOffset = (int) (remotePosition - localPosition);
final int chunkLength = length - chunkOffset;
final boolean uploadAlreadyComplete = remotePosition == -1;
// Enable isRetrying state to reduce number of calls to getRemotePosition()
if (!isRetrying()) {
// Enable isRetrying state to reduce number of calls to getCurrentUploadOffset()
retrying = true;
}
if (uploadAlreadyComplete && lastChunk) {
// Case 6
// Request object metadata if not available
if (storageObject == null) {
storageObject = getRemoteStorageObject();
}
// Verify that with the final chunk we match the blob length
if (storageObject.getSize().longValue() != getPosition() + length) {
throw unrecoverableState(
chunkOffset, chunkLength, localPosition, remotePosition, lastChunk);
}
retrying = false;
} else if (uploadAlreadyComplete && !lastChunk && !checkingForLastChunk) {
// Case 7
// Make sure this is the second to last chunk.
checkingForLastChunk = true;
// Continue onto next chunk in case this is the last chunk
} else if (localPosition <= remotePosition && chunkOffset < getChunkSize()) {
// Case 1 && Case 2
// We are in a position to send a chunk
storageObject =
getOptions()
.getStorageRpcV1()
.writeWithResponse(
getUploadId(), getBuffer(), 0, getPosition(), length, last);
transmitChunk(chunkOffset, chunkLength, remotePosition, lastChunk);
retrying = false;
} else if (localPosition < remotePosition && chunkOffset == getChunkSize()) {
// Case 3
// Continue to next chunk to catch up with remotePosition we are one chunk
// behind
retrying = false;
} else {
// Retriable interruption occurred.
// Variables:
// chunk = getBuffer()
// localNextByteOffset == getPosition()
// chunkSize = getChunkSize()
//
// Case 1: localNextByteOffset == 0 && remoteNextByteOffset == 0:
// we are retrying from first chunk start from 0 offset.
//
// Case 2: localNextByteOffset == remoteNextByteOffset:
// Special case of Case 1 when a chunk is retried.
//
// Case 3: localNextByteOffset < remoteNextByteOffset
// && driftOffset < chunkSize:
// Upload progressed and localNextByteOffset is not in-sync with
// remoteNextByteOffset and driftOffset is less than chunkSize.
// driftOffset must be less than chunkSize for it to retry using
// chunk maintained in memory.
// Find the driftOffset by subtracting localNextByteOffset from
// remoteNextByteOffset.
// Use driftOffset to determine where to restart from using the chunk in
// memory.
//
// Case 4: localNextByteOffset < remoteNextByteOffset
// && driftOffset == chunkSize:
// Special case of Case 3.
// If chunkSize is equal to driftOffset then remoteNextByteOffset has moved on
// to the next chunk.
//
// Case 5: localNextByteOffset < remoteNextByteOffset
// && driftOffset > chunkSize:
// Throw exception as remoteNextByteOffset has drifted beyond the retriable
// chunk maintained in memory. This is not possible unless there's multiple
// clients uploading to the same resumable upload session.
//
// Case 6: localNextByteOffset > remoteNextByteOffset:
// For completeness, this case is not possible because it would require retrying
// a 400 status code which is not allowed.
//
// Case 7: remoteNextByteOffset==-1 && last == true
// Upload is complete and retry occurred in the "last" chunk. Data sent was
// received by the service.
//
// Case 8: remoteNextByteOffset==-1 && last == false
// Upload was completed by another client because this retry did not occur
// during the last chunk.
//
// Get remote offset from API
long remoteNextByteOffset =
getOptions().getStorageRpcV1().getCurrentUploadOffset(getUploadId());
long localNextByteOffset = getPosition();
int driftOffset = (int) (remoteNextByteOffset - localNextByteOffset);
int retryChunkLength = length - driftOffset;

if (localNextByteOffset == 0 && remoteNextByteOffset == 0
|| localNextByteOffset == remoteNextByteOffset) {
// Case 1 and 2
storageObject =
getOptions()
.getStorageRpcV1()
.writeWithResponse(
getUploadId(), getBuffer(), 0, getPosition(), length, last);
} else if (localNextByteOffset < remoteNextByteOffset
&& driftOffset < getChunkSize()) {
// Case 3
storageObject =
getOptions()
.getStorageRpcV1()
.writeWithResponse(
getUploadId(),
getBuffer(),
driftOffset,
remoteNextByteOffset,
retryChunkLength,
last);
} else if (localNextByteOffset < remoteNextByteOffset
&& driftOffset == getChunkSize()) {
// Case 4
// Continue to next chunk
retrying = false;
return;
} else if (localNextByteOffset < remoteNextByteOffset
&& driftOffset > getChunkSize()) {
// Case 5
StringBuilder sb = new StringBuilder();
sb.append(
"Remote offset has progressed beyond starting byte offset of next chunk.");
sb.append(
"This may be a symptom of multiple clients uploading to the same upload session.\n\n");
sb.append("For debugging purposes:\n");
sb.append("uploadId: ").append(getUploadId()).append('\n');
sb.append("localNextByteOffset: ").append(localNextByteOffset).append('\n');
sb.append("remoteNextByteOffset: ").append(remoteNextByteOffset).append('\n');
sb.append("driftOffset: ").append(driftOffset).append("\n\n");
throw new StorageException(0, sb.toString());
} else if (remoteNextByteOffset == -1 && last) {
// Case 7
retrying = false;
return;
} else if (remoteNextByteOffset == -1 && !last) {
// Case 8
throw new StorageException(0, "Resumable upload is already complete.");
}
// Case 4 && Case 8 && Case 9
throw unrecoverableState(
chunkOffset, chunkLength, localPosition, remotePosition, lastChunk);
}
// Request was successful and retrying state is now disabled.
retrying = false;
}
}),
getOptions().getRetrySettings(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseException;
import com.google.api.client.http.HttpStatusCodes;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.http.InputStreamContent;
import com.google.api.client.http.json.JsonHttpContent;
Expand Down Expand Up @@ -765,7 +766,8 @@ public long getCurrentUploadOffset(String uploadId) {
try {
response = httpRequest.execute();
int code = response.getStatusCode();
if (code == 201 || code == 200) {
if (HttpStatusCodes.isSuccess(code)) {
// Upload completed successfully
return -1;
}
StringBuilder sb = new StringBuilder();
Expand All @@ -774,20 +776,18 @@ public long getCurrentUploadOffset(String uploadId) {
throw new StorageException(0, sb.toString());
} catch (HttpResponseException ex) {
int code = ex.getStatusCode();
if (code == 308 && ex.getHeaders().getRange() == null) {
// No progress has been made.
return 0;
} else if (code == 308 && ex.getHeaders().getRange() != null) {
if (code == 308) {
if (ex.getHeaders().getRange() == null) {
// No progress has been made.
return 0;
}
// API returns last byte received offset
String range = ex.getHeaders().getRange();
// Return next byte offset by adding 1 to last byte received offset
return Long.parseLong(range.substring(range.indexOf("-") + 1)) + 1;
} else {
// Not certain what went wrong
StringBuilder sb = new StringBuilder();
sb.append("Not sure what occurred. Here's debugging information:\n");
sb.append("Response:\n").append(ex.toString()).append("\n\n");
throw new StorageException(0, sb.toString());
// Something else occurred like a 5xx so translate and throw.
throw translate(ex);
}
} finally {
if (response != null) {
Expand Down
Loading

0 comments on commit fbea777

Please sign in to comment.