Skip to content

Commit

Permalink
Storage : Fix manage resumeable signedURL uploads. (#4874)
Browse files Browse the repository at this point in the history
* commit for manage resumeable signedURL uploads #2462

* for manage resumeable signedURL uploads #2462

* fix comment

* fix ITStorageTest case written for upload using signURL

* fix format

* fix BlobWriteChannel constructor changes.

* fix signURL validation.

* fix format

* signurl rename to signedURL , firstnonnull check removed,signedURL validation with googleacessid and expires field also.

* signedURL validation with googleacessid and expires field also.

* fix forsignedURL validation with V4 Signing support.

* fix forproviding example of writing content using signedURL through Writer.

* fix forStorageRpc open method argument change.

* fix forStorageRpc open method doc comment changes.
  • Loading branch information
abhinav-qlogic authored and JesseLovelace committed Apr 12, 2019
1 parent 47884a3 commit 03c1786
Show file tree
Hide file tree
Showing 10 changed files with 289 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,11 @@ public String open(StorageObject object, Map<Option, ?> options) throws StorageE
return fullname(object);
}

@Override
public String open(String signedURL) {
return null;
}

@Override
public void write(
String uploadId, byte[] toWrite, int toWriteOffset, long destOffset, int length, boolean last)
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.RetryHelper;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.spi.v1.StorageRpc;
import java.net.URL;
import java.util.Map;
import java.util.concurrent.Callable;

Expand All @@ -34,10 +35,18 @@ class BlobWriteChannel extends BaseWriteChannel<StorageOptions, BlobInfo> {
this(options, blob, open(options, blob, optionsMap));
}

BlobWriteChannel(StorageOptions options, URL signedURL) {
this(options, open(signedURL, options));
}

BlobWriteChannel(StorageOptions options, BlobInfo blobInfo, String uploadId) {
super(options, blobInfo, uploadId);
}

BlobWriteChannel(StorageOptions options, String uploadId) {
super(options, null, uploadId);
}

@Override
protected void flushBuffer(final int length, final boolean last) {
try {
Expand Down Expand Up @@ -83,6 +92,46 @@ public String call() {
}
}

private static String open(final URL signedURL, final StorageOptions options) {
try {
return runWithRetries(
new Callable<String>() {
@Override
public String call() {
if (!isValidSignedURL(signedURL.getQuery())) {
throw new StorageException(2, "invalid signedURL");
}
return options.getStorageRpcV1().open(signedURL.toString());
}
},
options.getRetrySettings(),
StorageImpl.EXCEPTION_HANDLER,
options.getClock());
} catch (RetryHelper.RetryHelperException e) {
throw StorageException.translateAndThrow(e);
}
}

private static boolean isValidSignedURL(String signedURLQuery) {
boolean isValid = true;
if (signedURLQuery.startsWith("X-Goog-Algorithm=")) {
if (!signedURLQuery.contains("&X-Goog-Credential=")
|| !signedURLQuery.contains("&X-Goog-Date=")
|| !signedURLQuery.contains("&X-Goog-Expires=")
|| !signedURLQuery.contains("&X-Goog-SignedHeaders=")
|| !signedURLQuery.contains("&X-Goog-Signature=")) {
isValid = false;
}
} else if (signedURLQuery.startsWith("GoogleAccessId=")) {
if (!signedURLQuery.contains("&Expires=") || !signedURLQuery.contains("&Signature=")) {
isValid = false;
}
} else {
isValid = false;
}
return isValid;
}

static class StateImpl extends BaseWriteChannel.BaseState<StorageOptions, BlobInfo> {

private static final long serialVersionUID = -9028324143780151286L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2091,6 +2091,27 @@ Blob create(
*/
WriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options);

/**
* Accepts signed URL and return a channel for writing content.
*
* <p>Example of writing content through a writer using signed URL.
*
* <pre>{@code
* String bucketName = "my_unique_bucket";
* String blobName = "my_blob_name";
* BlobId blobId = BlobId.of(bucketName, blobName);
* byte[] content = "Hello, World!".getBytes(UTF_8);
* BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
* URL signedURL = storage.signUrl(blobInfo, 1, TimeUnit.HOURS, Storage.SignUrlOption.httpMethod(HttpMethod.POST));
* try (WriteChannel writer = storage.writer(signedURL)) {
* writer.write(ByteBuffer.wrap(content, 0, content.length));
* }
* }</pre>
*
* @throws StorageException upon failure
*/
WriteChannel writer(URL signedURL);

/**
* Generates a signed URL for a blob. If you have a blob that you want to allow access to for a
* fixed amount of time, you can use this method to generate a URL that is only valid within a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,11 @@ public BlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) {
return writer(targetOptions.x(), targetOptions.y());
}

@Override
public BlobWriteChannel writer(URL signedURL) {
return new BlobWriteChannel(getOptions(), signedURL);
}

private BlobWriteChannel writer(BlobInfo blobInfo, BlobTargetOption... options) {
final Map<StorageRpc.Option, ?> optionsMap = optionMap(blobInfo, options);
return new BlobWriteChannel(getOptions(), blobInfo, optionsMap);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,39 @@ public String open(StorageObject object, Map<Option, ?> options) {
}
}

@Override
public String open(String signedURL) {
Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_OPEN);
Scope scope = tracer.withSpan(span);
try {
GenericUrl url = new GenericUrl(signedURL);
url.set("uploadType", "resumable");
String bytesArrayParameters = "";
byte[] bytesArray = new byte[bytesArrayParameters.length()];
HttpRequestFactory requestFactory = storage.getRequestFactory();
HttpRequest httpRequest =
requestFactory.buildPostRequest(
url, new ByteArrayContent("", bytesArray, 0, bytesArray.length));
HttpHeaders requestHeaders = httpRequest.getHeaders();
requestHeaders.set("X-Upload-Content-Type", "");
requestHeaders.set("x-goog-resumable", "start");
HttpResponse response = httpRequest.execute();
if (response.getStatusCode() != 201) {
GoogleJsonError error = new GoogleJsonError();
error.setCode(response.getStatusCode());
error.setMessage(response.getStatusMessage());
throw translate(error);
}
return response.getHeaders().getLocation();
} catch (IOException ex) {
span.setStatus(Status.UNKNOWN.withDescription(ex.getMessage()));
throw translate(ex);
} finally {
scope.close();
span.end();
}
}

@Override
public RewriteResponse openRewrite(RewriteRequest rewriteRequest) {
Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_OPEN_REWRITE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,13 @@ StorageObject compose(
*/
String open(StorageObject object, Map<Option, ?> options);

/**
* Opens a resumable upload channel for a given signedURL.
*
* @throws StorageException upon failure
*/
String open(String signedURL);

/**
* Writes the provided bytes to a storage object at the provided location.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.SocketException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;
Expand All @@ -60,6 +62,8 @@ public class BlobWriteChannelTest {
private static final int DEFAULT_CHUNK_SIZE = 8 * MIN_CHUNK_SIZE;
private static final int CUSTOM_CHUNK_SIZE = 4 * MIN_CHUNK_SIZE;
private static final Random RANDOM = new Random();
private static final String SIGNED_URL =
"http://www.test.com/test-bucket/test1.txt?GoogleAccessId=testClient-test@test.com&Expires=1553839761&Signature=MJUBXAZ7";

@Rule public ExpectedException thrown = ExpectedException.none();

Expand Down Expand Up @@ -265,6 +269,133 @@ public void testStateEquals() {
assertEquals(state.toString(), state2.toString());
}

@Test
public void testWriteWithSignedURLAndWithoutFlush() throws IOException {
expect(storageRpcMock.open(SIGNED_URL)).andReturn(UPLOAD_ID);
replay(storageRpcMock);
writer = new BlobWriteChannel(options, new URL(SIGNED_URL));
assertEquals(MIN_CHUNK_SIZE, writer.write(ByteBuffer.allocate(MIN_CHUNK_SIZE)));
}

@Test
public void testWriteWithSignedURLAndWithFlush() throws IOException {
expect(storageRpcMock.open(SIGNED_URL)).andReturn(UPLOAD_ID);
Capture<byte[]> capturedBuffer = Capture.newInstance();
storageRpcMock.write(
eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(CUSTOM_CHUNK_SIZE), eq(false));
replay(storageRpcMock);
writer = new BlobWriteChannel(options, new URL(SIGNED_URL));
writer.setChunkSize(CUSTOM_CHUNK_SIZE);
ByteBuffer buffer = randomBuffer(CUSTOM_CHUNK_SIZE);
assertEquals(CUSTOM_CHUNK_SIZE, writer.write(buffer));
assertArrayEquals(buffer.array(), capturedBuffer.getValue());
}

@Test
public void testWriteWithSignedURLAndFlush() throws IOException {
expect(storageRpcMock.open(SIGNED_URL)).andReturn(UPLOAD_ID);
Capture<byte[]> capturedBuffer = Capture.newInstance();
storageRpcMock.write(
eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(DEFAULT_CHUNK_SIZE), eq(false));
replay(storageRpcMock);
writer = new BlobWriteChannel(options, new URL(SIGNED_URL));
ByteBuffer[] buffers = new ByteBuffer[DEFAULT_CHUNK_SIZE / MIN_CHUNK_SIZE];
for (int i = 0; i < buffers.length; i++) {
buffers[i] = randomBuffer(MIN_CHUNK_SIZE);
assertEquals(MIN_CHUNK_SIZE, writer.write(buffers[i]));
}
for (int i = 0; i < buffers.length; i++) {
assertArrayEquals(
buffers[i].array(),
Arrays.copyOfRange(
capturedBuffer.getValue(), MIN_CHUNK_SIZE * i, MIN_CHUNK_SIZE * (i + 1)));
}
}

@Test
public void testCloseWithSignedURLWithoutFlush() throws IOException {
expect(storageRpcMock.open(SIGNED_URL)).andReturn(UPLOAD_ID);
Capture<byte[]> capturedBuffer = Capture.newInstance();
storageRpcMock.write(eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true));
replay(storageRpcMock);
writer = new BlobWriteChannel(options, new URL(SIGNED_URL));
assertTrue(writer.isOpen());
writer.close();
assertArrayEquals(new byte[0], capturedBuffer.getValue());
assertTrue(!writer.isOpen());
}

@Test
public void testCloseWithSignedURLWithFlush() throws IOException {
expect(storageRpcMock.open(SIGNED_URL)).andReturn(UPLOAD_ID);
Capture<byte[]> capturedBuffer = Capture.newInstance();
ByteBuffer buffer = randomBuffer(MIN_CHUNK_SIZE);
storageRpcMock.write(
eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(MIN_CHUNK_SIZE), eq(true));
replay(storageRpcMock);
writer = new BlobWriteChannel(options, new URL(SIGNED_URL));
assertTrue(writer.isOpen());
writer.write(buffer);
writer.close();
assertEquals(DEFAULT_CHUNK_SIZE, capturedBuffer.getValue().length);
assertArrayEquals(buffer.array(), Arrays.copyOf(capturedBuffer.getValue(), MIN_CHUNK_SIZE));
assertTrue(!writer.isOpen());
}

@Test
public void testWriteWithSignedURLClosed() throws IOException {
expect(storageRpcMock.open(SIGNED_URL)).andReturn(UPLOAD_ID);
Capture<byte[]> capturedBuffer = Capture.newInstance();
storageRpcMock.write(eq(UPLOAD_ID), capture(capturedBuffer), eq(0), eq(0L), eq(0), eq(true));
replay(storageRpcMock);
writer = new BlobWriteChannel(options, new URL(SIGNED_URL));
writer.close();
try {
writer.write(ByteBuffer.allocate(MIN_CHUNK_SIZE));
fail("Expected BlobWriteChannel write to throw IOException");
} catch (IOException ex) {
// expected
}
}

@Test
public void testSaveAndRestoreWithSignedURL() throws IOException {
expect(storageRpcMock.open(SIGNED_URL)).andReturn(UPLOAD_ID);
Capture<byte[]> capturedBuffer = Capture.newInstance(CaptureType.ALL);
Capture<Long> capturedPosition = Capture.newInstance(CaptureType.ALL);
storageRpcMock.write(
eq(UPLOAD_ID),
capture(capturedBuffer),
eq(0),
captureLong(capturedPosition),
eq(DEFAULT_CHUNK_SIZE),
eq(false));
expectLastCall().times(2);
replay(storageRpcMock);
ByteBuffer buffer1 = randomBuffer(DEFAULT_CHUNK_SIZE);
ByteBuffer buffer2 = randomBuffer(DEFAULT_CHUNK_SIZE);
writer = new BlobWriteChannel(options, new URL(SIGNED_URL));
assertEquals(DEFAULT_CHUNK_SIZE, writer.write(buffer1));
assertArrayEquals(buffer1.array(), capturedBuffer.getValues().get(0));
assertEquals(new Long(0L), capturedPosition.getValues().get(0));
RestorableState<WriteChannel> writerState = writer.capture();
WriteChannel restoredWriter = writerState.restore();
assertEquals(DEFAULT_CHUNK_SIZE, restoredWriter.write(buffer2));
assertArrayEquals(buffer2.array(), capturedBuffer.getValues().get(1));
assertEquals(new Long(DEFAULT_CHUNK_SIZE), capturedPosition.getValues().get(1));
}

@Test
public void testRuntimeExceptionWithSignedURL() throws MalformedURLException {
String exceptionMessage = "invalid signedURL";
expect(new BlobWriteChannel(options, new URL(SIGNED_URL)))
.andThrow(new RuntimeException(exceptionMessage));
replay(storageRpcMock);
thrown.expect(StorageException.class);
thrown.expectMessage(exceptionMessage);
writer = new BlobWriteChannel(options, new URL(SIGNED_URL));
}

private static ByteBuffer randomBuffer(int size) {
byte[] byteArray = new byte[size];
RANDOM.nextBytes(byteArray);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLDecoder;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -319,6 +320,9 @@ public class StorageImplTest {
+ "EkPPhszldvQTY486uPxyD/D7HdfnGW/Nbw5JUhfvecAdudDEhNAQ3PNabyDMI+TpiHy4NTWOrgdcWrzj6VXcdc"
+ "+uuABnPwRCdcyJ1xl2kOrPksRnp1auNGMLOe4IpEBjGY7baX9UG8+A45MbG0aHmkR59Op/aR9XowIDAQAB";

private static final String SIGNED_URL =
"http://www.test.com/test-bucket/test1.txt?GoogleAccessId=testClient-test@test.com&Expires=1553839761&Signature=MJUBXAZ7";

private static final ApiClock TIME_SOURCE =
new ApiClock() {
@Override
Expand Down Expand Up @@ -2835,4 +2839,14 @@ public void testRuntimeException() {
thrown.expectMessage(exceptionMessage);
storage.get(blob);
}

@Test
public void testWriterWithSignedURL() throws MalformedURLException {
EasyMock.expect(storageRpcMock.open(SIGNED_URL)).andReturn("upload-id");
EasyMock.replay(storageRpcMock);
initializeService();
WriteChannel writer = new BlobWriteChannel(options, new URL(SIGNED_URL));
assertNotNull(writer);
assertTrue(writer.isOpen());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2560,4 +2560,28 @@ public void testEnableAndDisableBucketPolicyOnlyOnExistingBucket() throws Except
RemoteStorageHelper.forceDelete(storage, bpoBucket, 1, TimeUnit.MINUTES);
}
}

@Test
public void testUploadUsingSignedURL() throws Exception {
String blobName = "test-signed-url-upload";
BlobInfo blob = BlobInfo.newBuilder(BUCKET, blobName).build();
assertNotNull(storage.create(blob));
URL signUrl =
storage.signUrl(blob, 1, TimeUnit.HOURS, Storage.SignUrlOption.httpMethod(HttpMethod.POST));
byte[] bytesArrayToUpload = BLOB_STRING_CONTENT.getBytes();
try (WriteChannel writer = storage.writer(signUrl)) {
writer.write(ByteBuffer.wrap(bytesArrayToUpload, 0, bytesArrayToUpload.length));
}

int lengthOfDownLoadBytes = -1;
BlobId blobId = BlobId.of(BUCKET, blobName);
Blob blobToRead = storage.get(blobId);
try (ReadChannel reader = blobToRead.reader()) {
ByteBuffer bytes = ByteBuffer.allocate(64 * 1024);
lengthOfDownLoadBytes = reader.read(bytes);
}

assertEquals(bytesArrayToUpload.length, lengthOfDownLoadBytes);
assertTrue(storage.delete(BUCKET, blobName));
}
}

0 comments on commit 03c1786

Please sign in to comment.