Skip to content

Commit

Permalink
commit for manage resumeable signedURL uploads googleapis#2462
Browse files Browse the repository at this point in the history
  • Loading branch information
abhinav-qlogic committed Apr 1, 2019
1 parent 81f9c46 commit acf17b3
Show file tree
Hide file tree
Showing 8 changed files with 219 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,11 @@ public String open(StorageObject object, Map<Option, ?> options) throws StorageE
return fullname(object);
}

@Override
public String getUploadId(String signURL) {
return null;
}

@Override
public void write(
String uploadId, byte[] toWrite, int toWriteOffset, long destOffset, int length, boolean last)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.RetryHelper;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.spi.v1.StorageRpc;
import java.net.URL;
import java.util.Map;
import java.util.concurrent.Callable;

Expand All @@ -34,10 +35,18 @@ class BlobWriteChannel extends BaseWriteChannel<StorageOptions, BlobInfo> {
this(options, blob, open(options, blob, optionsMap));
}

BlobWriteChannel(StorageOptions options, URL signURL) {
this(options, getUploadId(signURL.toString(), options));
}

BlobWriteChannel(StorageOptions options, BlobInfo blobInfo, String uploadId) {
super(options, blobInfo, uploadId);
}

BlobWriteChannel(StorageOptions options, String uploadId) {
super(options, null, uploadId);
}

@Override
protected void flushBuffer(final int length, final boolean last) {
try {
Expand Down Expand Up @@ -83,6 +92,23 @@ public String call() {
}
}

private static String getUploadId(final String signURL, final StorageOptions options) {
try {
return runWithRetries(
new Callable<String>() {
@Override
public String call() {
return options.getStorageRpcV1().getUploadId(signURL);
}
},
options.getRetrySettings(),
StorageImpl.EXCEPTION_HANDLER,
options.getClock());
} catch (RetryHelper.RetryHelperException e) {
throw StorageException.translateAndThrow(e);
}
}

static class StateImpl extends BaseWriteChannel.BaseState<StorageOptions, BlobInfo> {

private static final long serialVersionUID = -9028324143780151286L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,8 @@
import com.google.api.gax.paging.Page;
import com.google.auth.ServiceAccountSigner;
import com.google.auth.ServiceAccountSigner.SigningException;
import com.google.cloud.FieldSelector;
import com.google.cloud.*;
import com.google.cloud.FieldSelector.Helper;
import com.google.cloud.Policy;
import com.google.cloud.ReadChannel;
import com.google.cloud.Service;
import com.google.cloud.Tuple;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Acl.Entity;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.collect.ImmutableList;
Expand All @@ -39,14 +34,7 @@
import java.io.Serializable;
import java.net.URL;
import java.security.Key;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.*;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -2068,6 +2056,13 @@ Blob create(
*/
WriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options);

/**
* accept signURL and return a channel for writing content.
*
* @throws StorageException upon failure
*/
WriteChannel writer(URL signURL);

/**
* Generates a signed URL for a blob. If you have a blob that you want to allow access to for a
* fixed amount of time, you can use this method to generate a URL that is only valid within a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,12 @@ public BlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) {
return writer(targetOptions.x(), targetOptions.y());
}

@Override
public BlobWriteChannel writer(URL signURL) {
final StorageOptions options = StorageOptions.newBuilder().build();
return new BlobWriteChannel(options, signURL);
}

private BlobWriteChannel writer(BlobInfo blobInfo, BlobTargetOption... options) {
final Map<StorageRpc.Option, ?> optionsMap = optionMap(blobInfo, options);
return new BlobWriteChannel(getOptions(), blobInfo, optionsMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,39 @@ public String open(StorageObject object, Map<Option, ?> options) {
}
}

@Override
public String getUploadId(String signURL) {
Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_OPEN);
Scope scope = tracer.withSpan(span);
try {
GenericUrl url = new GenericUrl(signURL);
url.set("uploadType", "resumable");
String bytesArrayParameters = "";
byte[] bytesArray = new byte[bytesArrayParameters.length()];
HttpRequestFactory requestFactory = storage.getRequestFactory();
HttpRequest httpRequest =
requestFactory.buildPostRequest(
url, new ByteArrayContent("", bytesArray, 0, bytesArray.length));
HttpHeaders requestHeaders = httpRequest.getHeaders();
requestHeaders.set("X-Upload-Content-Type", firstNonNull("", "application/octet-stream"));
requestHeaders.set("x-goog-resumable", "start");
HttpResponse response = httpRequest.execute();
if (response.getStatusCode() != 201) {
GoogleJsonError error = new GoogleJsonError();
error.setCode(response.getStatusCode());
error.setMessage(response.getStatusMessage());
throw translate(error);
}
return response.getHeaders().getLocation();
} catch (IOException ex) {
span.setStatus(Status.UNKNOWN.withDescription(ex.getMessage()));
throw translate(ex);
} finally {
scope.close();
span.end();
}
}

@Override
public RewriteResponse openRewrite(RewriteRequest rewriteRequest) {
Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_OPEN_REWRITE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,13 @@ StorageObject compose(
*/
String open(StorageObject object, Map<Option, ?> options);

/**
* Opens a resumable upload channel for a given storage object.
*
* @throws StorageException upon failure
*/
String getUploadId(String signURL);

/**
* Writes the provided bytes to a storage object at the provided location.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.net.SocketException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;
Expand All @@ -60,6 +61,8 @@ public class BlobWriteChannelTest {
private static final int DEFAULT_CHUNK_SIZE = 8 * MIN_CHUNK_SIZE;
private static final int CUSTOM_CHUNK_SIZE = 4 * MIN_CHUNK_SIZE;
private static final Random RANDOM = new Random();
private static final String SIGN_URL =
"http://www.test.com/test-bucket/test1.txt?GoogleAccessId=testClient-test@test.com&Expires=1553839761&Signature=MJUBXAZ7";

@Rule public ExpectedException thrown = ExpectedException.none();

Expand Down Expand Up @@ -265,6 +268,122 @@ public void testStateEquals() {
assertEquals(state.toString(), state2.toString());
}

@Test
public void testWriteWithSignURLAndWithoutFlush() throws IOException {
expect(storageRpcMock.getUploadId(SIGN_URL)).andReturn(UPLOAD_ID);
replay(storageRpcMock);
writer = new BlobWriteChannel(options, new URL(SIGN_URL));
assertEquals(MIN_CHUNK_SIZE, writer.write(ByteBuffer.allocate(MIN_CHUNK_SIZE)));
}

@Test
public void testWriteWithSignURLAndWithFlush() throws IOException {
expect(storageRpcMock.getUploadId(SIGN_URL)).andReturn(UPLOAD_ID);
Capture<byte[]> capturedBuffer = Capture.newInstance();
storageRpcMock.write(
eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(CUSTOM_CHUNK_SIZE), eq(false));
replay(storageRpcMock);
writer = new BlobWriteChannel(options, new URL(SIGN_URL));
writer.setChunkSize(CUSTOM_CHUNK_SIZE);
ByteBuffer buffer = randomBuffer(CUSTOM_CHUNK_SIZE);
assertEquals(CUSTOM_CHUNK_SIZE, writer.write(buffer));
assertArrayEquals(buffer.array(), capturedBuffer.getValue());
}

@Test
public void testWriteWithSignURLAndFlush() throws IOException {
expect(storageRpcMock.getUploadId(SIGN_URL)).andReturn(UPLOAD_ID);
Capture<byte[]> capturedBuffer = Capture.newInstance();
storageRpcMock.write(
eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(DEFAULT_CHUNK_SIZE), eq(false));
replay(storageRpcMock);
writer = new BlobWriteChannel(options, new URL(SIGN_URL));
ByteBuffer[] buffers = new ByteBuffer[DEFAULT_CHUNK_SIZE / MIN_CHUNK_SIZE];
for (int i = 0; i < buffers.length; i++) {
buffers[i] = randomBuffer(MIN_CHUNK_SIZE);
assertEquals(MIN_CHUNK_SIZE, writer.write(buffers[i]));
}
for (int i = 0; i < buffers.length; i++) {
assertArrayEquals(
buffers[i].array(),
Arrays.copyOfRange(
capturedBuffer.getValue(), MIN_CHUNK_SIZE * i, MIN_CHUNK_SIZE * (i + 1)));
}
}

@Test
public void testCloseWithSignURLWithoutFlush() throws IOException {
expect(storageRpcMock.getUploadId(SIGN_URL)).andReturn(UPLOAD_ID);
Capture<byte[]> capturedBuffer = Capture.newInstance();
storageRpcMock.write(eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true));
replay(storageRpcMock);
writer = new BlobWriteChannel(options, new URL(SIGN_URL));
assertTrue(writer.isOpen());
writer.close();
assertArrayEquals(new byte[0], capturedBuffer.getValue());
assertTrue(!writer.isOpen());
}

@Test
public void testCloseWithSignURLWithFlush() throws IOException {
expect(storageRpcMock.getUploadId(SIGN_URL)).andReturn(UPLOAD_ID);
Capture<byte[]> capturedBuffer = Capture.newInstance();
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
storageRpcMock.write(
eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(MIN_CHUNK_SIZE), eq(true));
replay(storageRpcMock);
writer = new BlobWriteChannel(options, new URL(SIGN_URL));
assertTrue(writer.isOpen());
writer.write(buffer);
writer.close();
assertEquals(DEFAULT_CHUNK_SIZE, capturedBuffer.getValue().length);
assertArrayEquals(buffer.array(), Arrays.copyOf(capturedBuffer.getValue(), MIN_CHUNK_SIZE));
assertTrue(!writer.isOpen());
}

@Test
public void testWriteWithSignURLClosed() throws IOException {
expect(storageRpcMock.getUploadId(SIGN_URL)).andReturn(UPLOAD_ID);
Capture<byte[]> capturedBuffer = Capture.newInstance();
storageRpcMock.write(eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true));
replay(storageRpcMock);
writer = new BlobWriteChannel(options, new URL(SIGN_URL));
writer.close();
try {
writer.write(ByteBuffer.allocate(MIN_CHUNK_SIZE));
fail("Expected BlobWriteChannel write to throw IOException");
} catch (IOException ex) {
// expected
}
}

@Test
public void testSaveAndRestoreWithSignURL() throws IOException {
expect(storageRpcMock.getUploadId(SIGN_URL)).andReturn(UPLOAD_ID);
Capture<byte[]> capturedBuffer = Capture.newInstance(CaptureType.ALL);
Capture<Long> capturedPosition = Capture.newInstance(CaptureType.ALL);
storageRpcMock.write(
eq(UPLOAD_ID),
capture(capturedBuffer),
eq(0),
captureLong(capturedPosition),
eq(DEFAULT_CHUNK_SIZE),
eq(false));
expectLastCall().times(2);
replay(storageRpcMock);
ByteBuffer buffer1 = randomBuffer(DEFAULT_CHUNK_SIZE);
ByteBuffer buffer2 = randomBuffer(DEFAULT_CHUNK_SIZE);
writer = new BlobWriteChannel(options, new URL(SIGN_URL));
assertEquals(DEFAULT_CHUNK_SIZE, writer.write(buffer1));
assertArrayEquals(buffer1.array(), capturedBuffer.getValues().get(0));
assertEquals(new Long(0L), capturedPosition.getValues().get(0));
RestorableState<WriteChannel> writerState = writer.capture();
WriteChannel restoredWriter = writerState.restore();
assertEquals(DEFAULT_CHUNK_SIZE, restoredWriter.write(buffer2));
assertArrayEquals(buffer2.array(), capturedBuffer.getValues().get(1));
assertEquals(new Long(DEFAULT_CHUNK_SIZE), capturedPosition.getValues().get(1));
}

private static ByteBuffer randomBuffer(int size) {
byte[] byteArray = new byte[size];
RANDOM.nextBytes(byteArray);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLDecoder;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -319,6 +320,9 @@ public class StorageImplTest {
+ "EkPPhszldvQTY486uPxyD/D7HdfnGW/Nbw5JUhfvecAdudDEhNAQ3PNabyDMI+TpiHy4NTWOrgdcWrzj6VXcdc"
+ "+uuABnPwRCdcyJ1xl2kOrPksRnp1auNGMLOe4IpEBjGY7baX9UG8+A45MbG0aHmkR59Op/aR9XowIDAQAB";

private static final String SIGN_URL =
"http://www.test.com/test-bucket/test1.txt?GoogleAccessId=testClient-test@test.com&Expires=1553839761&Signature=MJUBXAZ7";

private static final ApiClock TIME_SOURCE =
new ApiClock() {
@Override
Expand Down Expand Up @@ -2835,4 +2839,14 @@ public void testRuntimeException() {
thrown.expectMessage(exceptionMessage);
storage.get(blob);
}

@Test
public void testWriterWithSignURL() throws MalformedURLException {
EasyMock.expect(storageRpcMock.getUploadId(SIGN_URL)).andReturn("upload-id");
EasyMock.replay(storageRpcMock);
initializeService();
WriteChannel writer = new BlobWriteChannel(options, new URL(SIGN_URL));
assertNotNull(writer);
assertTrue(writer.isOpen());
}
}

0 comments on commit acf17b3

Please sign in to comment.