Skip to content

Commit

Permalink
for manage resumeable signedURL uploads googleapis#2462
Browse files Browse the repository at this point in the history
  • Loading branch information
abhinav-qlogic committed Apr 2, 2019
1 parent acf17b3 commit d814d11
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ public String open(StorageObject object, Map<Option, ?> options) throws StorageE
}

@Override
public String getUploadId(String signURL) {
public String open(String signURL) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class BlobWriteChannel extends BaseWriteChannel<StorageOptions, BlobInfo> {
}

BlobWriteChannel(StorageOptions options, URL signURL) {
this(options, getUploadId(signURL.toString(), options));
this(options, open(signURL.toString(), options));
}

BlobWriteChannel(StorageOptions options, BlobInfo blobInfo, String uploadId) {
Expand Down Expand Up @@ -92,13 +92,13 @@ public String call() {
}
}

private static String getUploadId(final String signURL, final StorageOptions options) {
private static String open(final String signURL, final StorageOptions options) {
try {
return runWithRetries(
new Callable<String>() {
@Override
public String call() {
return options.getStorageRpcV1().getUploadId(signURL);
return options.getStorageRpcV1().open(signURL);
}
},
options.getRetrySettings(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@
import com.google.api.gax.paging.Page;
import com.google.auth.ServiceAccountSigner;
import com.google.auth.ServiceAccountSigner.SigningException;
import com.google.cloud.*;
import com.google.cloud.FieldSelector;
import com.google.cloud.FieldSelector.Helper;
import com.google.cloud.Policy;
import com.google.cloud.ReadChannel;
import com.google.cloud.Service;
import com.google.cloud.Tuple;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Acl.Entity;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.collect.ImmutableList;
Expand All @@ -34,7 +39,14 @@
import java.io.Serializable;
import java.net.URL;
import java.security.Key;
import java.util.*;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ public String open(StorageObject object, Map<Option, ?> options) {
}

@Override
public String getUploadId(String signURL) {
public String open(String signURL) {
Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_OPEN);
Scope scope = tracer.withSpan(span);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ StorageObject compose(
*
* @throws StorageException upon failure
*/
String getUploadId(String signURL);
String open(String signURL);

/**
* Writes the provided bytes to a storage object at the provided location.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,15 +270,15 @@ public void testStateEquals() {

@Test
public void testWriteWithSignURLAndWithoutFlush() throws IOException {
expect(storageRpcMock.getUploadId(SIGN_URL)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(SIGN_URL)).andReturn(UPLOAD_ID);
replay(storageRpcMock);
writer = new BlobWriteChannel(options, new URL(SIGN_URL));
assertEquals(MIN_CHUNK_SIZE, writer.write(ByteBuffer.allocate(MIN_CHUNK_SIZE)));
}

@Test
public void testWriteWithSignURLAndWithFlush() throws IOException {
expect(storageRpcMock.getUploadId(SIGN_URL)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(SIGN_URL)).andReturn(UPLOAD_ID);
Capture<byte[]> capturedBuffer = Capture.newInstance();
storageRpcMock.write(
eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(CUSTOM_CHUNK_SIZE), eq(false));
Expand All @@ -292,7 +292,7 @@ public void testWriteWithSignURLAndWithFlush() throws IOException {

@Test
public void testWriteWithSignURLAndFlush() throws IOException {
expect(storageRpcMock.getUploadId(SIGN_URL)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(SIGN_URL)).andReturn(UPLOAD_ID);
Capture<byte[]> capturedBuffer = Capture.newInstance();
storageRpcMock.write(
eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(DEFAULT_CHUNK_SIZE), eq(false));
Expand All @@ -313,7 +313,7 @@ public void testWriteWithSignURLAndFlush() throws IOException {

@Test
public void testCloseWithSignURLWithoutFlush() throws IOException {
expect(storageRpcMock.getUploadId(SIGN_URL)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(SIGN_URL)).andReturn(UPLOAD_ID);
Capture<byte[]> capturedBuffer = Capture.newInstance();
storageRpcMock.write(eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true));
replay(storageRpcMock);
Expand All @@ -326,7 +326,7 @@ public void testCloseWithSignURLWithoutFlush() throws IOException {

@Test
public void testCloseWithSignURLWithFlush() throws IOException {
expect(storageRpcMock.getUploadId(SIGN_URL)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(SIGN_URL)).andReturn(UPLOAD_ID);
Capture<byte[]> capturedBuffer = Capture.newInstance();
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
storageRpcMock.write(
Expand All @@ -343,7 +343,7 @@ public void testCloseWithSignURLWithFlush() throws IOException {

@Test
public void testWriteWithSignURLClosed() throws IOException {
expect(storageRpcMock.getUploadId(SIGN_URL)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(SIGN_URL)).andReturn(UPLOAD_ID);
Capture<byte[]> capturedBuffer = Capture.newInstance();
storageRpcMock.write(eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true));
replay(storageRpcMock);
Expand All @@ -359,7 +359,7 @@ public void testWriteWithSignURLClosed() throws IOException {

@Test
public void testSaveAndRestoreWithSignURL() throws IOException {
expect(storageRpcMock.getUploadId(SIGN_URL)).andReturn(UPLOAD_ID);
expect(storageRpcMock.open(SIGN_URL)).andReturn(UPLOAD_ID);
Capture<byte[]> capturedBuffer = Capture.newInstance(CaptureType.ALL);
Capture<Long> capturedPosition = Capture.newInstance(CaptureType.ALL);
storageRpcMock.write(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2842,7 +2842,7 @@ public void testRuntimeException() {

@Test
public void testWriterWithSignURL() throws MalformedURLException {
EasyMock.expect(storageRpcMock.getUploadId(SIGN_URL)).andReturn("upload-id");
EasyMock.expect(storageRpcMock.open(SIGN_URL)).andReturn("upload-id");
EasyMock.replay(storageRpcMock);
initializeService();
WriteChannel writer = new BlobWriteChannel(options, new URL(SIGN_URL));
Expand Down

0 comments on commit d814d11

Please sign in to comment.