From b41b88109e13b5ebbd0393d1f264225c12876be6 Mon Sep 17 00:00:00 2001 From: Frank Natividad Date: Thu, 25 Feb 2021 18:34:07 -0800 Subject: [PATCH] fix: retrying get remote offset and recover from last chunk failures. (#726) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [x] Ensure the tests and linter pass - [x] Code coverage does not decrease (if any source code was changed) - [x] Appropriate docs were updated (if necessary) I added three things in this PR: 1. Retries around getting remote offset from GCS 2. Retrying last chunk failures correctly. 3. Updated documentation for the retry cases Fixes #709 #687 ☕️ --- .../cloud/storage/BlobWriteChannel.java | 235 ++++++++------- .../cloud/storage/spi/v1/HttpStorageRpc.java | 20 +- .../cloud/storage/BlobWriteChannelTest.java | 285 ++++++++++++++++-- 3 files changed, 396 insertions(+), 144 deletions(-) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java index ca87933c0f..aa5c3a8118 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannel.java @@ -55,6 +55,7 @@ class BlobWriteChannel extends BaseWriteChannel { // TODO: I don't think this is thread safe, and there's probably a better way to detect a retry // occuring. private boolean retrying = false; + private boolean checkingForLastChunk = false; boolean isRetrying() { return retrying; @@ -64,129 +65,141 @@ StorageObject getStorageObject() { return storageObject; } + private StorageObject transmitChunk( + int chunkOffset, int chunkLength, long position, boolean last) { + return getOptions() + .getStorageRpcV1() + .writeWithResponse(getUploadId(), getBuffer(), chunkOffset, position, chunkLength, last); + } + + private long getRemotePosition() { + return getOptions().getStorageRpcV1().getCurrentUploadOffset(getUploadId()); + } + + private StorageObject getRemoteStorageObject() { + return getOptions().getStorageRpcV1().get(getEntity().toPb(), null); + } + + private StorageException unrecoverableState( + int chunkOffset, int chunkLength, long localPosition, long remotePosition, boolean last) { + StringBuilder sb = new StringBuilder(); + sb.append("Unable to recover in upload.\n"); + sb.append( + "This may be a symptom of multiple clients uploading to the same upload session.\n\n"); + sb.append("For debugging purposes:\n"); + sb.append("uploadId: ").append(getUploadId()).append('\n'); + sb.append("chunkOffset: ").append(chunkOffset).append('\n'); + sb.append("chunkLength: ").append(chunkLength).append('\n'); + sb.append("localOffset: ").append(localPosition).append('\n'); + sb.append("remoteOffset: ").append(remotePosition).append('\n'); + sb.append("lastChunk: ").append(last).append("\n\n"); + return new StorageException(0, sb.toString()); + } + + // Retriable interruption occurred. + // Variables: + // chunk = getBuffer() + // localNextByteOffset == getPosition() + // chunkSize = getChunkSize() + // + // Case 1: localNextByteOffset == remoteNextByteOffset: + // Retrying the entire chunk + // + // Case 2: localNextByteOffset < remoteNextByteOffset + // && driftOffset < chunkSize: + // Upload progressed and localNextByteOffset is not in-sync with + // remoteNextByteOffset and driftOffset is less than chunkSize. + // driftOffset must be less than chunkSize for it to retry using + // chunk maintained in memory. + // Find the driftOffset by subtracting localNextByteOffset from + // remoteNextByteOffset. + // Use driftOffset to determine where to restart from using the chunk in + // memory. + // + // Case 3: localNextByteOffset < remoteNextByteOffset + // && driftOffset == chunkSize: + // Special case of Case 2. + // If chunkSize is equal to driftOffset then remoteNextByteOffset has moved on + // to the next chunk. + // + // Case 4: localNextByteOffset < remoteNextByteOffset + // && driftOffset > chunkSize: + // Throw exception as remoteNextByteOffset has drifted beyond the retriable + // chunk maintained in memory. This is not possible unless there's multiple + // clients uploading to the same resumable upload session. + // + // Case 5: localNextByteOffset > remoteNextByteOffset: + // For completeness, this case is not possible because it would require retrying + // a 400 status code which is not allowed. + // + // Case 6: remoteNextByteOffset==-1 && last == true + // Upload is complete and retry occurred in the "last" chunk. Data sent was + // received by the service. + // + // Case 7: remoteNextByteOffset==-1 && last == false && !checkingForLastChunk + // Not last chunk and are not checkingForLastChunk, allow for the client to + // catch up to final chunk which meets + // Case 6. + // + // Case 8: remoteNextByteOffset==-1 && last == false && checkingForLastChunk + // Not last chunk and checkingForLastChunk means this is the second time we + // hit this case, meaning the upload was completed by a different client. + // + // Case 9: Only possible if the client local offset continues beyond the remote + // offset which is not possible. + // @Override - protected void flushBuffer(final int length, final boolean last) { + protected void flushBuffer(final int length, final boolean lastChunk) { try { runWithRetries( callable( new Runnable() { @Override public void run() { + // Get remote offset from API + final long localPosition = getPosition(); + // For each request it should be possible to retry from its location in this code + final long remotePosition = isRetrying() ? getRemotePosition() : getPosition(); + final int chunkOffset = (int) (remotePosition - localPosition); + final int chunkLength = length - chunkOffset; + final boolean uploadAlreadyComplete = remotePosition == -1; + // Enable isRetrying state to reduce number of calls to getRemotePosition() if (!isRetrying()) { - // Enable isRetrying state to reduce number of calls to getCurrentUploadOffset() retrying = true; + } + if (uploadAlreadyComplete && lastChunk) { + // Case 6 + // Request object metadata if not available + if (storageObject == null) { + storageObject = getRemoteStorageObject(); + } + // Verify that with the final chunk we match the blob length + if (storageObject.getSize().longValue() != getPosition() + length) { + throw unrecoverableState( + chunkOffset, chunkLength, localPosition, remotePosition, lastChunk); + } + retrying = false; + } else if (uploadAlreadyComplete && !lastChunk && !checkingForLastChunk) { + // Case 7 + // Make sure this is the second to last chunk. + checkingForLastChunk = true; + // Continue onto next chunk in case this is the last chunk + } else if (localPosition <= remotePosition && chunkOffset < getChunkSize()) { + // Case 1 && Case 2 + // We are in a position to send a chunk storageObject = - getOptions() - .getStorageRpcV1() - .writeWithResponse( - getUploadId(), getBuffer(), 0, getPosition(), length, last); + transmitChunk(chunkOffset, chunkLength, remotePosition, lastChunk); + retrying = false; + } else if (localPosition < remotePosition && chunkOffset == getChunkSize()) { + // Case 3 + // Continue to next chunk to catch up with remotePosition we are one chunk + // behind + retrying = false; } else { - // Retriable interruption occurred. - // Variables: - // chunk = getBuffer() - // localNextByteOffset == getPosition() - // chunkSize = getChunkSize() - // - // Case 1: localNextByteOffset == 0 && remoteNextByteOffset == 0: - // we are retrying from first chunk start from 0 offset. - // - // Case 2: localNextByteOffset == remoteNextByteOffset: - // Special case of Case 1 when a chunk is retried. - // - // Case 3: localNextByteOffset < remoteNextByteOffset - // && driftOffset < chunkSize: - // Upload progressed and localNextByteOffset is not in-sync with - // remoteNextByteOffset and driftOffset is less than chunkSize. - // driftOffset must be less than chunkSize for it to retry using - // chunk maintained in memory. - // Find the driftOffset by subtracting localNextByteOffset from - // remoteNextByteOffset. - // Use driftOffset to determine where to restart from using the chunk in - // memory. - // - // Case 4: localNextByteOffset < remoteNextByteOffset - // && driftOffset == chunkSize: - // Special case of Case 3. - // If chunkSize is equal to driftOffset then remoteNextByteOffset has moved on - // to the next chunk. - // - // Case 5: localNextByteOffset < remoteNextByteOffset - // && driftOffset > chunkSize: - // Throw exception as remoteNextByteOffset has drifted beyond the retriable - // chunk maintained in memory. This is not possible unless there's multiple - // clients uploading to the same resumable upload session. - // - // Case 6: localNextByteOffset > remoteNextByteOffset: - // For completeness, this case is not possible because it would require retrying - // a 400 status code which is not allowed. - // - // Case 7: remoteNextByteOffset==-1 && last == true - // Upload is complete and retry occurred in the "last" chunk. Data sent was - // received by the service. - // - // Case 8: remoteNextByteOffset==-1 && last == false - // Upload was completed by another client because this retry did not occur - // during the last chunk. - // - // Get remote offset from API - long remoteNextByteOffset = - getOptions().getStorageRpcV1().getCurrentUploadOffset(getUploadId()); - long localNextByteOffset = getPosition(); - int driftOffset = (int) (remoteNextByteOffset - localNextByteOffset); - int retryChunkLength = length - driftOffset; - - if (localNextByteOffset == 0 && remoteNextByteOffset == 0 - || localNextByteOffset == remoteNextByteOffset) { - // Case 1 and 2 - storageObject = - getOptions() - .getStorageRpcV1() - .writeWithResponse( - getUploadId(), getBuffer(), 0, getPosition(), length, last); - } else if (localNextByteOffset < remoteNextByteOffset - && driftOffset < getChunkSize()) { - // Case 3 - storageObject = - getOptions() - .getStorageRpcV1() - .writeWithResponse( - getUploadId(), - getBuffer(), - driftOffset, - remoteNextByteOffset, - retryChunkLength, - last); - } else if (localNextByteOffset < remoteNextByteOffset - && driftOffset == getChunkSize()) { - // Case 4 - // Continue to next chunk - retrying = false; - return; - } else if (localNextByteOffset < remoteNextByteOffset - && driftOffset > getChunkSize()) { - // Case 5 - StringBuilder sb = new StringBuilder(); - sb.append( - "Remote offset has progressed beyond starting byte offset of next chunk."); - sb.append( - "This may be a symptom of multiple clients uploading to the same upload session.\n\n"); - sb.append("For debugging purposes:\n"); - sb.append("uploadId: ").append(getUploadId()).append('\n'); - sb.append("localNextByteOffset: ").append(localNextByteOffset).append('\n'); - sb.append("remoteNextByteOffset: ").append(remoteNextByteOffset).append('\n'); - sb.append("driftOffset: ").append(driftOffset).append("\n\n"); - throw new StorageException(0, sb.toString()); - } else if (remoteNextByteOffset == -1 && last) { - // Case 7 - retrying = false; - return; - } else if (remoteNextByteOffset == -1 && !last) { - // Case 8 - throw new StorageException(0, "Resumable upload is already complete."); - } + // Case 4 && Case 8 && Case 9 + throw unrecoverableState( + chunkOffset, chunkLength, localPosition, remotePosition, lastChunk); } - // Request was successful and retrying state is now disabled. - retrying = false; } }), getOptions().getRetrySettings(), diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java index 74977dc6ec..6df86cb6ad 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java @@ -32,6 +32,7 @@ import com.google.api.client.http.HttpRequestInitializer; import com.google.api.client.http.HttpResponse; import com.google.api.client.http.HttpResponseException; +import com.google.api.client.http.HttpStatusCodes; import com.google.api.client.http.HttpTransport; import com.google.api.client.http.InputStreamContent; import com.google.api.client.http.json.JsonHttpContent; @@ -765,7 +766,8 @@ public long getCurrentUploadOffset(String uploadId) { try { response = httpRequest.execute(); int code = response.getStatusCode(); - if (code == 201 || code == 200) { + if (HttpStatusCodes.isSuccess(code)) { + // Upload completed successfully return -1; } StringBuilder sb = new StringBuilder(); @@ -774,20 +776,18 @@ public long getCurrentUploadOffset(String uploadId) { throw new StorageException(0, sb.toString()); } catch (HttpResponseException ex) { int code = ex.getStatusCode(); - if (code == 308 && ex.getHeaders().getRange() == null) { - // No progress has been made. - return 0; - } else if (code == 308 && ex.getHeaders().getRange() != null) { + if (code == 308) { + if (ex.getHeaders().getRange() == null) { + // No progress has been made. + return 0; + } // API returns last byte received offset String range = ex.getHeaders().getRange(); // Return next byte offset by adding 1 to last byte received offset return Long.parseLong(range.substring(range.indexOf("-") + 1)) + 1; } else { - // Not certain what went wrong - StringBuilder sb = new StringBuilder(); - sb.append("Not sure what occurred. Here's debugging information:\n"); - sb.append("Response:\n").append(ex.toString()).append("\n\n"); - throw new StorageException(0, sb.toString()); + // Something else occurred like a 5xx so translate and throw. + throw translate(ex); } } finally { if (response != null) { diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java index eefee77290..ba60a3c4d6 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/BlobWriteChannelTest.java @@ -41,6 +41,7 @@ import com.google.cloud.storage.spi.v1.StorageRpc; import com.google.common.collect.ImmutableMap; import java.io.IOException; +import java.math.BigInteger; import java.net.MalformedURLException; import java.net.SocketException; import java.net.URL; @@ -69,7 +70,8 @@ public class BlobWriteChannelTest { private static final Random RANDOM = new Random(); private static final String SIGNED_URL = "http://www.test.com/test-bucket/test1.txt?GoogleAccessId=testClient-test@test.com&Expires=1553839761&Signature=MJUBXAZ7"; - + private static final StorageException socketClosedException = + new StorageException(new SocketException("Socket closed")); private StorageOptions options; private StorageRpcFactory rpcFactoryMock; private StorageRpc storageRpcMock; @@ -104,8 +106,8 @@ public void testCreate() { @Test public void testCreateRetryableError() { - StorageException exception = new StorageException(new SocketException("Socket closed")); - expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andThrow(exception); + expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)) + .andThrow(socketClosedException); expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); replay(storageRpcMock); writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS); @@ -136,7 +138,6 @@ public void testWriteWithoutFlush() throws IOException { @Test public void testWriteWithFlushRetryChunk() throws IOException { - StorageException exception = new StorageException(new SocketException("Socket closed")); ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); Capture capturedBuffer = Capture.newInstance(); expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); @@ -148,7 +149,7 @@ public void testWriteWithFlushRetryChunk() throws IOException { eq(0L), eq(MIN_CHUNK_SIZE), eq(false))) - .andThrow(exception); + .andThrow(socketClosedException); expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(0L); expect( storageRpcMock.writeWithResponse( @@ -169,8 +170,45 @@ public void testWriteWithFlushRetryChunk() throws IOException { } @Test - public void testWriteWithFlushRetryChunkWithDrift() throws IOException { - StorageException exception = new StorageException(new SocketException("Socket closed")); + public void testWriteWithRetryFullChunk() throws IOException { + ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); + Capture capturedBuffer = Capture.newInstance(); + expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), (byte[]) anyObject(), eq(0), eq(0L), eq(MIN_CHUNK_SIZE), eq(false))) + .andThrow(socketClosedException); + expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(0L); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), + capture(capturedBuffer), + eq(0), + eq(0L), + eq(MIN_CHUNK_SIZE), + eq(false))) + .andReturn(null); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), + (byte[]) anyObject(), + eq(0), + eq((long) MIN_CHUNK_SIZE), + eq(0), + eq(true))) + .andReturn(BLOB_INFO.toPb()); + replay(storageRpcMock); + writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS); + writer.setChunkSize(MIN_CHUNK_SIZE); + assertEquals(MIN_CHUNK_SIZE, writer.write(buffer)); + writer.close(); + assertFalse(writer.isOpen()); + assertNotNull(writer.getStorageObject()); + assertArrayEquals(buffer.array(), capturedBuffer.getValue()); + } + + @Test + public void testWriteWithRemoteProgressMade() throws IOException { ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); Capture capturedBuffer = Capture.newInstance(); expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); @@ -182,7 +220,8 @@ public void testWriteWithFlushRetryChunkWithDrift() throws IOException { eq(0L), eq(MIN_CHUNK_SIZE), eq(false))) - .andThrow(exception); + .andThrow(socketClosedException); + // Simulate GCS received 10 bytes but not the rest of the chunk expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(10L); expect( storageRpcMock.writeWithResponse( @@ -203,8 +242,7 @@ public void testWriteWithFlushRetryChunkWithDrift() throws IOException { } @Test - public void testWriteWithLastFlushRetryChunkButCompleted() throws IOException { - StorageException exception = new StorageException(new SocketException("Socket closed")); + public void testWriteWithDriftRetryCase4() throws IOException { ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); Capture capturedBuffer = Capture.newInstance(); expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); @@ -215,25 +253,102 @@ public void testWriteWithLastFlushRetryChunkButCompleted() throws IOException { eq(0), eq(0L), eq(MIN_CHUNK_SIZE), - eq(true))) - .andThrow(exception); + eq(false))) + .andThrow(socketClosedException); + expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn((long) MIN_CHUNK_SIZE); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), + capture(capturedBuffer), + eq(0), + eq((long) MIN_CHUNK_SIZE), + eq(MIN_CHUNK_SIZE), + eq(false))) + .andReturn(null); + replay(storageRpcMock); + writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS); + writer.setChunkSize(MIN_CHUNK_SIZE); + assertEquals(MIN_CHUNK_SIZE, writer.write(buffer)); + assertArrayEquals(buffer.array(), capturedBuffer.getValue()); + capturedBuffer.reset(); + buffer.rewind(); + assertEquals(MIN_CHUNK_SIZE, writer.write(buffer)); + assertArrayEquals(buffer.array(), capturedBuffer.getValue()); + assertTrue(writer.isOpen()); + assertNull(writer.getStorageObject()); + } + + @Test + public void testWriteWithUnreachableRemoteOffset() throws IOException { + ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); + Capture capturedBuffer = Capture.newInstance(); + expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), + capture(capturedBuffer), + eq(0), + eq(0L), + eq(MIN_CHUNK_SIZE), + eq(false))) + .andThrow(socketClosedException); + expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(MIN_CHUNK_SIZE + 10L); + replay(storageRpcMock); + writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS); + writer.setChunkSize(MIN_CHUNK_SIZE); + try { + writer.write(buffer); + fail("Expected StorageException"); + } catch (StorageException storageException) { + // expected storageException + } + assertTrue(writer.isOpen()); + assertNull(writer.getStorageObject()); + assertArrayEquals(buffer.array(), capturedBuffer.getValue()); + } + + @Test + public void testWriteWithRetryAndObjectMetadata() throws IOException { + ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); + Capture capturedBuffer = Capture.newInstance(); + expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), + capture(capturedBuffer), + eq(0), + eq(0L), + eq(MIN_CHUNK_SIZE), + eq(false))) + .andThrow(socketClosedException); + expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(10L); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), + capture(capturedBuffer), + eq(10), + eq(10L), + eq(MIN_CHUNK_SIZE - 10), + eq(false))) + .andThrow(socketClosedException); + expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L); + expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L); + expect(storageRpcMock.get(BLOB_INFO.toPb(), null)).andThrow(socketClosedException); expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L); + expect(storageRpcMock.get(BLOB_INFO.toPb(), null)) + .andReturn(BLOB_INFO.toPb().setSize(BigInteger.valueOf(MIN_CHUNK_SIZE))); replay(storageRpcMock); writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS); + writer.setChunkSize(MIN_CHUNK_SIZE); assertEquals(MIN_CHUNK_SIZE, writer.write(buffer)); writer.close(); - assertFalse(writer.isRetrying()); assertFalse(writer.isOpen()); - // Capture captures entire buffer of a chunk even when not completely used. - // Making assert selective up to the size of MIN_CHUNK_SIZE - assertArrayEquals(Arrays.copyOf(capturedBuffer.getValue(), MIN_CHUNK_SIZE), buffer.array()); + assertNotNull(writer.getStorageObject()); + assertArrayEquals(buffer.array(), capturedBuffer.getValue()); } @Test - public void testWriteWithFlushRetryChunkButCompletedByAnotherClient() throws IOException { - StorageException exception = new StorageException(new SocketException("Socket closed")); - StorageException completedException = - new StorageException(0, "Resumable upload is already complete."); + public void testWriteWithUploadCompletedByAnotherClient() throws IOException { ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); Capture capturedBuffer = Capture.newInstance(); expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); @@ -245,21 +360,145 @@ public void testWriteWithFlushRetryChunkButCompletedByAnotherClient() throws IOE eq(0L), eq(MIN_CHUNK_SIZE), eq(false))) - .andThrow(exception); + .andReturn(null); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), + capture(capturedBuffer), + eq(0), + eq((long) MIN_CHUNK_SIZE), + eq(MIN_CHUNK_SIZE), + eq(false))) + .andThrow(socketClosedException); + expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L); expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L); replay(storageRpcMock); writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS); writer.setChunkSize(MIN_CHUNK_SIZE); try { writer.write(buffer); + buffer.rewind(); + writer.write(buffer); + buffer.rewind(); + writer.write(buffer); + fail("Expected completed exception."); + } catch (StorageException ex) { + + } + assertTrue(writer.isOpen()); + } + + @Test + public void testWriteWithLocalOffsetGoingBeyondRemoteOffset() throws IOException { + ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); + Capture capturedBuffer = Capture.newInstance(); + expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), + capture(capturedBuffer), + eq(0), + eq(0L), + eq(MIN_CHUNK_SIZE), + eq(false))) + .andReturn(null); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), + capture(capturedBuffer), + eq(0), + eq((long) MIN_CHUNK_SIZE), + eq(MIN_CHUNK_SIZE), + eq(false))) + .andThrow(socketClosedException); + expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(0L); + replay(storageRpcMock); + writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS); + writer.setChunkSize(MIN_CHUNK_SIZE); + try { + writer.write(buffer); + buffer.rewind(); + writer.write(buffer); + writer.close(); fail("Expected completed exception."); } catch (StorageException ex) { - assertEquals(ex, completedException); } - assertTrue(writer.isRetrying()); assertTrue(writer.isOpen()); } + @Test + public void testGetCurrentUploadOffset() throws IOException { + ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); + Capture capturedBuffer = Capture.newInstance(); + expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), + capture(capturedBuffer), + eq(0), + eq(0L), + eq(MIN_CHUNK_SIZE), + eq(false))) + .andThrow(socketClosedException); + expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andThrow(socketClosedException); + expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(0L); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), + capture(capturedBuffer), + eq(0), + eq(0L), + eq(MIN_CHUNK_SIZE), + eq(false))) + .andReturn(null); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), + (byte[]) anyObject(), + eq(0), + eq((long) MIN_CHUNK_SIZE), + eq(0), + eq(true))) + .andReturn(BLOB_INFO.toPb()); + replay(storageRpcMock); + writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS); + writer.setChunkSize(MIN_CHUNK_SIZE); + assertEquals(MIN_CHUNK_SIZE, writer.write(buffer)); + writer.close(); + assertFalse(writer.isOpen()); + assertNotNull(writer.getStorageObject()); + assertArrayEquals(buffer.array(), capturedBuffer.getValue()); + } + + @Test + public void testWriteWithLastFlushRetryChunkButCompleted() throws IOException { + ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE); + Capture capturedBuffer = Capture.newInstance(); + expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + expect( + storageRpcMock.writeWithResponse( + eq(UPLOAD_ID), + capture(capturedBuffer), + eq(0), + eq(0L), + eq(MIN_CHUNK_SIZE), + eq(true))) + .andThrow(socketClosedException); + expect(storageRpcMock.getCurrentUploadOffset(eq(UPLOAD_ID))).andReturn(-1L); + expect(storageRpcMock.get(BLOB_INFO.toPb(), null)) + .andReturn(BLOB_INFO.toPb().setSize(BigInteger.valueOf(MIN_CHUNK_SIZE))); + replay(storageRpcMock); + writer = new BlobWriteChannel(options, BLOB_INFO, EMPTY_RPC_OPTIONS); + assertEquals(MIN_CHUNK_SIZE, writer.write(buffer)); + writer.close(); + assertFalse(writer.isRetrying()); + assertFalse(writer.isOpen()); + assertNotNull(writer.getStorageObject()); + // Capture captures entire buffer of a chunk even when not completely used. + // Making assert selective up to the size of MIN_CHUNK_SIZE + assertArrayEquals(Arrays.copyOf(capturedBuffer.getValue(), MIN_CHUNK_SIZE), buffer.array()); + } + @Test public void testWriteWithFlush() throws IOException { expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID);