Skip to content

Commit

Permalink
Merge pull request #390 from mziccard/rewriter-check-blob-update
Browse files Browse the repository at this point in the history
Make BlobReadChannel should fail if content is updated
  • Loading branch information
aozarov committed Nov 19, 2015
2 parents a4c4273 + bfe8924 commit 39829a0
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,8 @@ public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) {
}

@Override
public byte[] read(StorageObject from, Map<Option, ?> options, long position, int bytes)
throws StorageException {
public Tuple<String, byte[]> read(StorageObject from, Map<Option, ?> options, long position,
int bytes) throws StorageException {
try {
Get req = storage.objects()
.get(from.getBucket(), from.getName())
Expand All @@ -439,12 +439,13 @@ public byte[] read(StorageObject from, Map<Option, ?> options, long position, in
.setIfMetagenerationNotMatch(IF_METAGENERATION_NOT_MATCH.getLong(options))
.setIfGenerationMatch(IF_GENERATION_MATCH.getLong(options))
.setIfGenerationNotMatch(IF_GENERATION_NOT_MATCH.getLong(options));
MediaHttpDownloader downloader = req.getMediaHttpDownloader();
downloader.setContentRange(position, Ints.checkedCast(position + bytes - 1));
downloader.setDirectDownloadEnabled(true);
StringBuilder range = new StringBuilder();
range.append("bytes=").append(position).append("-").append(position + bytes - 1);
req.getRequestHeaders().setRange(range.toString());
ByteArrayOutputStream output = new ByteArrayOutputStream();
req.executeMediaAndDownloadTo(output);
return output.toByteArray();
req.executeMedia().download(output);
String etag = req.getLastResponseHeaders().getETag();
return Tuple.of(etag, output.toByteArray());
} catch (IOException ex) {
throw translate(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ StorageObject compose(Iterable<StorageObject> sources, StorageObject target,
byte[] load(StorageObject storageObject, Map<Option, ?> options)
throws StorageException;

byte[] read(StorageObject from, Map<Option, ?> options, long position, int bytes)
Tuple<String, byte[]> read(StorageObject from, Map<Option, ?> options, long position, int bytes)
throws StorageException;

String open(StorageObject object, Map<Option, ?> options) throws StorageException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@
/**
* A channel for reading data from a Google Cloud Storage object.
*
* Implementations of this class may buffer data internally to reduce remote calls.
*
* This class is @{link Serializable}, which allows incremental reads.
* Implementations of this class may buffer data internally to reduce remote calls. This interface
* implements {@link Restorable} to allow saving the reader's state to continue reading afterwards.
*/
public interface BlobReadChannel extends ReadableByteChannel, Closeable,
Restorable<BlobReadChannel> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.gcloud.RestorableState;
import com.google.gcloud.RetryHelper;
import com.google.gcloud.spi.StorageRpc;
import com.google.gcloud.spi.StorageRpc.Tuple;

import java.io.IOException;
import java.io.Serializable;
Expand All @@ -41,6 +42,7 @@ class BlobReadChannelImpl implements BlobReadChannel {
private final StorageOptions serviceOptions;
private final BlobId blob;
private final Map<StorageRpc.Option, ?> requestOptions;
private String lastEtag;
private int position;
private boolean isOpen;
private boolean endOfStream;
Expand Down Expand Up @@ -117,12 +119,19 @@ public int read(ByteBuffer byteBuffer) throws IOException {
}
final int toRead = Math.max(byteBuffer.remaining(), chunkSize);
try {
buffer = runWithRetries(new Callable<byte[]>() {
Tuple<String, byte[]> result = runWithRetries(new Callable<Tuple<String, byte[]>>() {
@Override
public byte[] call() {
public Tuple<String, byte[]> call() {
return storageRpc.read(storageObject, requestOptions, position, toRead);
}
}, serviceOptions.retryParams(), StorageImpl.EXCEPTION_HANDLER);
if (lastEtag != null && !Objects.equals(result.x(), lastEtag)) {
StringBuilder messageBuilder = new StringBuilder();
messageBuilder.append("Blob ").append(blob).append(" was updated while reading");
throw new StorageException(0, messageBuilder.toString(), false);
}
lastEtag = result.x();
buffer = result.y();
} catch (RetryHelper.RetryHelperException e) {
throw StorageException.translateAndThrow(e);
}
Expand Down Expand Up @@ -152,6 +161,7 @@ static class StateImpl implements RestorableState<BlobReadChannel>, Serializable
private final StorageOptions serviceOptions;
private final BlobId blob;
private final Map<StorageRpc.Option, ?> requestOptions;
private final String lastEtag;
private final int position;
private final boolean isOpen;
private final boolean endOfStream;
Expand All @@ -161,6 +171,7 @@ static class StateImpl implements RestorableState<BlobReadChannel>, Serializable
this.serviceOptions = builder.serviceOptions;
this.blob = builder.blob;
this.requestOptions = builder.requestOptions;
this.lastEtag = builder.lastEtag;
this.position = builder.position;
this.isOpen = builder.isOpen;
this.endOfStream = builder.endOfStream;
Expand All @@ -171,6 +182,7 @@ static class Builder {
private final StorageOptions serviceOptions;
private final BlobId blob;
private final Map<StorageRpc.Option, ?> requestOptions;
private String lastEtag;
private int position;
private boolean isOpen;
private boolean endOfStream;
Expand All @@ -182,6 +194,11 @@ private Builder(StorageOptions options, BlobId blob, Map<StorageRpc.Option, ?> r
this.requestOptions = reqOptions;
}

Builder lastEtag(String lastEtag) {
this.lastEtag = lastEtag;
return this;
}

Builder position(int position) {
this.position = position;
return this;
Expand Down Expand Up @@ -215,6 +232,7 @@ static Builder builder(
@Override
public BlobReadChannel restore() {
BlobReadChannelImpl channel = new BlobReadChannelImpl(serviceOptions, blob, requestOptions);
channel.lastEtag = lastEtag;
channel.position = position;
channel.isOpen = isOpen;
channel.endOfStream = endOfStream;
Expand All @@ -224,8 +242,8 @@ public BlobReadChannel restore() {

@Override
public int hashCode() {
return Objects.hash(serviceOptions, blob, requestOptions, position, isOpen, endOfStream,
chunkSize);
return Objects.hash(serviceOptions, blob, requestOptions, lastEtag, position, isOpen,
endOfStream, chunkSize);
}

@Override
Expand All @@ -240,6 +258,7 @@ public boolean equals(Object obj) {
return Objects.equals(this.serviceOptions, other.serviceOptions)
&& Objects.equals(this.blob, other.blob)
&& Objects.equals(this.requestOptions, other.requestOptions)
&& Objects.equals(this.lastEtag, other.lastEtag)
&& this.position == other.position
&& this.isOpen == other.isOpen
&& this.endOfStream == other.endOfStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
* A channel for writing data to a Google Cloud Storage object.
*
* Implementations of this class may further buffer data internally to reduce remote calls. Written
* data will only be visible after calling {@link #close()}. This class is serializable, to allow
* incremental writes.
* data will only be visible after calling {@link #close()}. This interface implements
* {@link Restorable} to allow saving the writer's state to continue writing afterwards.
*/
public interface BlobWriteChannel extends WritableByteChannel, Closeable,
Restorable<BlobWriteChannel> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.io.InputStream;
import java.io.Serializable;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -1405,14 +1406,29 @@ private static void checkContentType(BlobInfo blobInfo) throws IllegalArgumentEx
BatchResponse apply(BatchRequest batchRequest);

/**
* Return a channel for reading the blob's content.
* Return a channel for reading the blob's content. The blob's latest generation is read. If the
* blob changes while reading (i.e. {@link BlobInfo#etag()} changes), subsequent calls to
* {@code blobReadChannel.read(ByteBuffer)} may throw {@link StorageException}.
*
* <p>The {@link BlobSourceOption#generationMatch(long)} option can be provided to ensure that
* {@code blobReadChannel.read(ByteBuffer)} calls will throw {@link StorageException} if blob`s
* generation differs from the expected one.
*
* @throws StorageException upon failure
*/
BlobReadChannel reader(String bucket, String blob, BlobSourceOption... options);

/**
* Return a channel for reading the blob's content.
* Return a channel for reading the blob's content. If {@code blob.generation()} is set
* data corresponding to that generation is read. If {@code blob.generation()} is {@code null}
* the blob's latest generation is read. If the blob changes while reading (i.e.
* {@link BlobInfo#etag()} changes), subsequent calls to {@code blobReadChannel.read(ByteBuffer)}
* may throw {@link StorageException}.
*
* <p>The {@link BlobSourceOption#generationMatch()} and
* {@link BlobSourceOption#generationMatch(long)} options can be used to ensure that
* {@code blobReadChannel.read(ByteBuffer)} calls will throw {@link StorageException} if the
* blob`s generation differs from the expected one.
*
* @throws StorageException upon failure
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertArrayEquals;
Expand All @@ -46,7 +45,7 @@ public class BlobReadChannelImplTest {

private static final String BUCKET_NAME = "b";
private static final String BLOB_NAME = "n";
private static final BlobId BLOB_ID = BlobId.of(BUCKET_NAME, BLOB_NAME);
private static final BlobId BLOB_ID = BlobId.of(BUCKET_NAME, BLOB_NAME, -1L);
private static final Map<StorageRpc.Option, ?> EMPTY_RPC_OPTIONS = ImmutableMap.of();
private static final int DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024;
private static final int CUSTOM_CHUNK_SIZE = 2 * 1024 * 1024;
Expand Down Expand Up @@ -88,7 +87,7 @@ public void testReadBuffered() throws IOException {
ByteBuffer firstReadBuffer = ByteBuffer.allocate(42);
ByteBuffer secondReadBuffer = ByteBuffer.allocate(42);
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
.andReturn(result);
.andReturn(StorageRpc.Tuple.of("etag", result));
replay(storageRpcMock);
reader.read(firstReadBuffer);
reader.read(secondReadBuffer);
Expand All @@ -107,10 +106,11 @@ public void testReadBig() throws IOException {
byte[] secondResult = randomByteArray(DEFAULT_CHUNK_SIZE);
ByteBuffer firstReadBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
ByteBuffer secondReadBuffer = ByteBuffer.allocate(42);
storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE);
expectLastCall().andReturn(firstResult);
storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, DEFAULT_CHUNK_SIZE, CUSTOM_CHUNK_SIZE);
expectLastCall().andReturn(secondResult);
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
.andReturn(StorageRpc.Tuple.of("etag", firstResult));
expect(storageRpcMock.read(
BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, DEFAULT_CHUNK_SIZE, CUSTOM_CHUNK_SIZE))
.andReturn(StorageRpc.Tuple.of("etag", secondResult));
replay(storageRpcMock);
reader.read(firstReadBuffer);
reader.read(secondReadBuffer);
Expand All @@ -125,7 +125,7 @@ public void testReadFinish() throws IOException {
byte[] result = {};
ByteBuffer readBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
.andReturn(result);
.andReturn(StorageRpc.Tuple.of("etag", result));
replay(storageRpcMock);
assertEquals(-1, reader.read(readBuffer));
}
Expand All @@ -137,7 +137,7 @@ public void testSeek() throws IOException {
byte[] result = randomByteArray(DEFAULT_CHUNK_SIZE);
ByteBuffer readBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 42, DEFAULT_CHUNK_SIZE))
.andReturn(result);
.andReturn(StorageRpc.Tuple.of("etag", result));
replay(storageRpcMock);
reader.read(readBuffer);
assertArrayEquals(result, readBuffer.array());
Expand Down Expand Up @@ -166,16 +166,41 @@ public void testReadClosed() {
}
}

@Test
public void testReadGenerationChanged() throws IOException {
BlobId blobId = BlobId.of(BUCKET_NAME, BLOB_NAME);
reader = new BlobReadChannelImpl(options, blobId, EMPTY_RPC_OPTIONS);
byte[] firstResult = randomByteArray(DEFAULT_CHUNK_SIZE);
byte[] secondResult = randomByteArray(DEFAULT_CHUNK_SIZE);
ByteBuffer firstReadBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
ByteBuffer secondReadBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
expect(storageRpcMock.read(blobId.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
.andReturn(StorageRpc.Tuple.of("etag1", firstResult));
expect(storageRpcMock.read(
blobId.toPb(), EMPTY_RPC_OPTIONS, DEFAULT_CHUNK_SIZE, DEFAULT_CHUNK_SIZE))
.andReturn(StorageRpc.Tuple.of("etag2", firstResult));
replay(storageRpcMock);
reader.read(firstReadBuffer);
try {
reader.read(secondReadBuffer);
fail("Expected BlobReadChannel read to throw StorageException");
} catch (StorageException ex) {
StringBuilder messageBuilder = new StringBuilder();
messageBuilder.append("Blob ").append(blobId).append(" was updated while reading");
assertEquals(messageBuilder.toString(), ex.getMessage());
}
}

@Test
public void testSaveAndRestore() throws IOException, ClassNotFoundException {
byte[] firstResult = randomByteArray(DEFAULT_CHUNK_SIZE);
byte[] secondResult = randomByteArray(DEFAULT_CHUNK_SIZE);
ByteBuffer firstReadBuffer = ByteBuffer.allocate(42);
ByteBuffer secondReadBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE);
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
.andReturn(firstResult);
.andReturn(StorageRpc.Tuple.of("etag", firstResult));
expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 42, DEFAULT_CHUNK_SIZE))
.andReturn(secondResult);
.andReturn(StorageRpc.Tuple.of("etag", secondResult));
replay(storageRpcMock);
reader = new BlobReadChannelImpl(options, BLOB_ID, EMPTY_RPC_OPTIONS);
reader.read(firstReadBuffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -731,6 +732,41 @@ public void testReadChannelFail() throws IOException {
assertTrue(storage.delete(BUCKET, blobName));
}

@Test
public void testReadChannelFailUpdatedGeneration() throws IOException {
String blobName = "test-read-blob-fail-updated-generation";
BlobInfo blob = BlobInfo.builder(BUCKET, blobName).build();
Random random = new Random();
int chunkSize = 1024;
int blobSize = 2 * chunkSize;
byte[] content = new byte[blobSize];
random.nextBytes(content);
BlobInfo remoteBlob = storage.create(blob, content);
assertNotNull(remoteBlob);
assertEquals(blobSize, (long) remoteBlob.size());
try (BlobReadChannel reader = storage.reader(blob.blobId())) {
reader.chunkSize(chunkSize);
ByteBuffer readBytes = ByteBuffer.allocate(chunkSize);
int numReadBytes = reader.read(readBytes);
assertEquals(chunkSize, numReadBytes);
assertArrayEquals(Arrays.copyOf(content, chunkSize), readBytes.array());
try (BlobWriteChannel writer = storage.writer(blob)) {
byte[] newContent = new byte[blobSize];
random.nextBytes(newContent);
int numWrittenBytes = writer.write(ByteBuffer.wrap(newContent));
assertEquals(blobSize, numWrittenBytes);
}
readBytes = ByteBuffer.allocate(chunkSize);
reader.read(readBytes);
fail("StorageException was expected");
} catch(StorageException ex) {
StringBuilder messageBuilder = new StringBuilder();
messageBuilder.append("Blob ").append(blob.blobId()).append(" was updated while reading");
assertEquals(messageBuilder.toString(), ex.getMessage());
}
assertTrue(storage.delete(BUCKET, blobName));
}

@Test
public void testWriteChannelFail() throws IOException {
String blobName = "test-write-channel-blob-fail";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1030,7 +1030,7 @@ public void testReaderWithOptions() throws IOException {
byte[] result = new byte[DEFAULT_CHUNK_SIZE];
EasyMock.expect(
storageRpcMock.read(BLOB_INFO2.toPb(), BLOB_SOURCE_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
.andReturn(result);
.andReturn(StorageRpc.Tuple.of("etag", result));
EasyMock.replay(storageRpcMock);
storage = options.service();
BlobReadChannel channel = storage.reader(BUCKET_NAME1, BLOB_NAME2, BLOB_SOURCE_GENERATION,
Expand All @@ -1045,7 +1045,7 @@ public void testReaderWithOptionsFromBlobId() throws IOException {
byte[] result = new byte[DEFAULT_CHUNK_SIZE];
EasyMock.expect(
storageRpcMock.read(BLOB_INFO1.blobId().toPb(), BLOB_SOURCE_OPTIONS, 0, DEFAULT_CHUNK_SIZE))
.andReturn(result);
.andReturn(StorageRpc.Tuple.of("etag", result));
EasyMock.replay(storageRpcMock);
storage = options.service();
BlobReadChannel channel = storage.reader(BLOB_INFO1.blobId(),
Expand Down

0 comments on commit 39829a0

Please sign in to comment.