diff --git a/gcloud-java-storage/src/main/java/com/google/gcloud/spi/DefaultStorageRpc.java b/gcloud-java-storage/src/main/java/com/google/gcloud/spi/DefaultStorageRpc.java index 0bd80a9c11ce..4787b0888d7f 100644 --- a/gcloud-java-storage/src/main/java/com/google/gcloud/spi/DefaultStorageRpc.java +++ b/gcloud-java-storage/src/main/java/com/google/gcloud/spi/DefaultStorageRpc.java @@ -429,8 +429,8 @@ public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) { } @Override - public byte[] read(StorageObject from, Map options, long position, int bytes) - throws StorageException { + public Tuple read(StorageObject from, Map options, long position, + int bytes) throws StorageException { try { Get req = storage.objects() .get(from.getBucket(), from.getName()) @@ -439,12 +439,13 @@ public byte[] read(StorageObject from, Map options, long position, in .setIfMetagenerationNotMatch(IF_METAGENERATION_NOT_MATCH.getLong(options)) .setIfGenerationMatch(IF_GENERATION_MATCH.getLong(options)) .setIfGenerationNotMatch(IF_GENERATION_NOT_MATCH.getLong(options)); - MediaHttpDownloader downloader = req.getMediaHttpDownloader(); - downloader.setContentRange(position, Ints.checkedCast(position + bytes - 1)); - downloader.setDirectDownloadEnabled(true); + StringBuilder range = new StringBuilder(); + range.append("bytes=").append(position).append("-").append(position + bytes - 1); + req.getRequestHeaders().setRange(range.toString()); ByteArrayOutputStream output = new ByteArrayOutputStream(); - req.executeMediaAndDownloadTo(output); - return output.toByteArray(); + req.executeMedia().download(output); + String etag = req.getLastResponseHeaders().getETag(); + return Tuple.of(etag, output.toByteArray()); } catch (IOException ex) { throw translate(ex); } diff --git a/gcloud-java-storage/src/main/java/com/google/gcloud/spi/StorageRpc.java b/gcloud-java-storage/src/main/java/com/google/gcloud/spi/StorageRpc.java index 1f708c269ad0..c5fd1b3e2250 100644 --- a/gcloud-java-storage/src/main/java/com/google/gcloud/spi/StorageRpc.java +++ b/gcloud-java-storage/src/main/java/com/google/gcloud/spi/StorageRpc.java @@ -259,7 +259,7 @@ StorageObject compose(Iterable sources, StorageObject target, byte[] load(StorageObject storageObject, Map options) throws StorageException; - byte[] read(StorageObject from, Map options, long position, int bytes) + Tuple read(StorageObject from, Map options, long position, int bytes) throws StorageException; String open(StorageObject object, Map options) throws StorageException; diff --git a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannel.java b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannel.java index 205dc4b97309..54d39649cb70 100644 --- a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannel.java +++ b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannel.java @@ -26,9 +26,8 @@ /** * A channel for reading data from a Google Cloud Storage object. * - * Implementations of this class may buffer data internally to reduce remote calls. - * - * This class is @{link Serializable}, which allows incremental reads. + * Implementations of this class may buffer data internally to reduce remote calls. This interface + * implements {@link Restorable} to allow saving the reader's state to continue reading afterwards. */ public interface BlobReadChannel extends ReadableByteChannel, Closeable, Restorable { diff --git a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannelImpl.java b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannelImpl.java index 09047a642218..8fe6eae66d8f 100644 --- a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannelImpl.java +++ b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannelImpl.java @@ -23,6 +23,7 @@ import com.google.gcloud.RestorableState; import com.google.gcloud.RetryHelper; import com.google.gcloud.spi.StorageRpc; +import com.google.gcloud.spi.StorageRpc.Tuple; import java.io.IOException; import java.io.Serializable; @@ -41,6 +42,7 @@ class BlobReadChannelImpl implements BlobReadChannel { private final StorageOptions serviceOptions; private final BlobId blob; private final Map requestOptions; + private String lastEtag; private int position; private boolean isOpen; private boolean endOfStream; @@ -117,12 +119,19 @@ public int read(ByteBuffer byteBuffer) throws IOException { } final int toRead = Math.max(byteBuffer.remaining(), chunkSize); try { - buffer = runWithRetries(new Callable() { + Tuple result = runWithRetries(new Callable>() { @Override - public byte[] call() { + public Tuple call() { return storageRpc.read(storageObject, requestOptions, position, toRead); } }, serviceOptions.retryParams(), StorageImpl.EXCEPTION_HANDLER); + if (lastEtag != null && !Objects.equals(result.x(), lastEtag)) { + StringBuilder messageBuilder = new StringBuilder(); + messageBuilder.append("Blob ").append(blob).append(" was updated while reading"); + throw new StorageException(0, messageBuilder.toString(), false); + } + lastEtag = result.x(); + buffer = result.y(); } catch (RetryHelper.RetryHelperException e) { throw StorageException.translateAndThrow(e); } @@ -152,6 +161,7 @@ static class StateImpl implements RestorableState, Serializable private final StorageOptions serviceOptions; private final BlobId blob; private final Map requestOptions; + private final String lastEtag; private final int position; private final boolean isOpen; private final boolean endOfStream; @@ -161,6 +171,7 @@ static class StateImpl implements RestorableState, Serializable this.serviceOptions = builder.serviceOptions; this.blob = builder.blob; this.requestOptions = builder.requestOptions; + this.lastEtag = builder.lastEtag; this.position = builder.position; this.isOpen = builder.isOpen; this.endOfStream = builder.endOfStream; @@ -171,6 +182,7 @@ static class Builder { private final StorageOptions serviceOptions; private final BlobId blob; private final Map requestOptions; + private String lastEtag; private int position; private boolean isOpen; private boolean endOfStream; @@ -182,6 +194,11 @@ private Builder(StorageOptions options, BlobId blob, Map r this.requestOptions = reqOptions; } + Builder lastEtag(String lastEtag) { + this.lastEtag = lastEtag; + return this; + } + Builder position(int position) { this.position = position; return this; @@ -215,6 +232,7 @@ static Builder builder( @Override public BlobReadChannel restore() { BlobReadChannelImpl channel = new BlobReadChannelImpl(serviceOptions, blob, requestOptions); + channel.lastEtag = lastEtag; channel.position = position; channel.isOpen = isOpen; channel.endOfStream = endOfStream; @@ -224,8 +242,8 @@ public BlobReadChannel restore() { @Override public int hashCode() { - return Objects.hash(serviceOptions, blob, requestOptions, position, isOpen, endOfStream, - chunkSize); + return Objects.hash(serviceOptions, blob, requestOptions, lastEtag, position, isOpen, + endOfStream, chunkSize); } @Override @@ -240,6 +258,7 @@ public boolean equals(Object obj) { return Objects.equals(this.serviceOptions, other.serviceOptions) && Objects.equals(this.blob, other.blob) && Objects.equals(this.requestOptions, other.requestOptions) + && Objects.equals(this.lastEtag, other.lastEtag) && this.position == other.position && this.isOpen == other.isOpen && this.endOfStream == other.endOfStream diff --git a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannel.java b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannel.java index a6208e5020ae..fe9164532120 100644 --- a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannel.java +++ b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannel.java @@ -26,8 +26,8 @@ * A channel for writing data to a Google Cloud Storage object. * * Implementations of this class may further buffer data internally to reduce remote calls. Written - * data will only be visible after calling {@link #close()}. This class is serializable, to allow - * incremental writes. + * data will only be visible after calling {@link #close()}. This interface implements + * {@link Restorable} to allow saving the writer's state to continue writing afterwards. */ public interface BlobWriteChannel extends WritableByteChannel, Closeable, Restorable { diff --git a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/Storage.java b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/Storage.java index 1fe096524ab7..6e1c33b137fd 100644 --- a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/Storage.java +++ b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/Storage.java @@ -33,6 +33,7 @@ import java.io.InputStream; import java.io.Serializable; import java.net.URL; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -1405,14 +1406,29 @@ private static void checkContentType(BlobInfo blobInfo) throws IllegalArgumentEx BatchResponse apply(BatchRequest batchRequest); /** - * Return a channel for reading the blob's content. + * Return a channel for reading the blob's content. The blob's latest generation is read. If the + * blob changes while reading (i.e. {@link BlobInfo#etag()} changes), subsequent calls to + * {@code blobReadChannel.read(ByteBuffer)} may throw {@link StorageException}. + * + *

The {@link BlobSourceOption#generationMatch(long)} option can be provided to ensure that + * {@code blobReadChannel.read(ByteBuffer)} calls will throw {@link StorageException} if blob`s + * generation differs from the expected one. * * @throws StorageException upon failure */ BlobReadChannel reader(String bucket, String blob, BlobSourceOption... options); /** - * Return a channel for reading the blob's content. + * Return a channel for reading the blob's content. If {@code blob.generation()} is set + * data corresponding to that generation is read. If {@code blob.generation()} is {@code null} + * the blob's latest generation is read. If the blob changes while reading (i.e. + * {@link BlobInfo#etag()} changes), subsequent calls to {@code blobReadChannel.read(ByteBuffer)} + * may throw {@link StorageException}. + * + *

The {@link BlobSourceOption#generationMatch()} and + * {@link BlobSourceOption#generationMatch(long)} options can be used to ensure that + * {@code blobReadChannel.read(ByteBuffer)} calls will throw {@link StorageException} if the + * blob`s generation differs from the expected one. * * @throws StorageException upon failure */ diff --git a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobReadChannelImplTest.java b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobReadChannelImplTest.java index e1f904bf72fe..f99fe893d0d9 100644 --- a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobReadChannelImplTest.java +++ b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobReadChannelImplTest.java @@ -19,7 +19,6 @@ import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; import static org.junit.Assert.assertArrayEquals; @@ -46,7 +45,7 @@ public class BlobReadChannelImplTest { private static final String BUCKET_NAME = "b"; private static final String BLOB_NAME = "n"; - private static final BlobId BLOB_ID = BlobId.of(BUCKET_NAME, BLOB_NAME); + private static final BlobId BLOB_ID = BlobId.of(BUCKET_NAME, BLOB_NAME, -1L); private static final Map EMPTY_RPC_OPTIONS = ImmutableMap.of(); private static final int DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024; private static final int CUSTOM_CHUNK_SIZE = 2 * 1024 * 1024; @@ -88,7 +87,7 @@ public void testReadBuffered() throws IOException { ByteBuffer firstReadBuffer = ByteBuffer.allocate(42); ByteBuffer secondReadBuffer = ByteBuffer.allocate(42); expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE)) - .andReturn(result); + .andReturn(StorageRpc.Tuple.of("etag", result)); replay(storageRpcMock); reader.read(firstReadBuffer); reader.read(secondReadBuffer); @@ -107,10 +106,11 @@ public void testReadBig() throws IOException { byte[] secondResult = randomByteArray(DEFAULT_CHUNK_SIZE); ByteBuffer firstReadBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE); ByteBuffer secondReadBuffer = ByteBuffer.allocate(42); - storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE); - expectLastCall().andReturn(firstResult); - storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, DEFAULT_CHUNK_SIZE, CUSTOM_CHUNK_SIZE); - expectLastCall().andReturn(secondResult); + expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE)) + .andReturn(StorageRpc.Tuple.of("etag", firstResult)); + expect(storageRpcMock.read( + BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, DEFAULT_CHUNK_SIZE, CUSTOM_CHUNK_SIZE)) + .andReturn(StorageRpc.Tuple.of("etag", secondResult)); replay(storageRpcMock); reader.read(firstReadBuffer); reader.read(secondReadBuffer); @@ -125,7 +125,7 @@ public void testReadFinish() throws IOException { byte[] result = {}; ByteBuffer readBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE); expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE)) - .andReturn(result); + .andReturn(StorageRpc.Tuple.of("etag", result)); replay(storageRpcMock); assertEquals(-1, reader.read(readBuffer)); } @@ -137,7 +137,7 @@ public void testSeek() throws IOException { byte[] result = randomByteArray(DEFAULT_CHUNK_SIZE); ByteBuffer readBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE); expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 42, DEFAULT_CHUNK_SIZE)) - .andReturn(result); + .andReturn(StorageRpc.Tuple.of("etag", result)); replay(storageRpcMock); reader.read(readBuffer); assertArrayEquals(result, readBuffer.array()); @@ -166,6 +166,31 @@ public void testReadClosed() { } } + @Test + public void testReadGenerationChanged() throws IOException { + BlobId blobId = BlobId.of(BUCKET_NAME, BLOB_NAME); + reader = new BlobReadChannelImpl(options, blobId, EMPTY_RPC_OPTIONS); + byte[] firstResult = randomByteArray(DEFAULT_CHUNK_SIZE); + byte[] secondResult = randomByteArray(DEFAULT_CHUNK_SIZE); + ByteBuffer firstReadBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE); + ByteBuffer secondReadBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE); + expect(storageRpcMock.read(blobId.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE)) + .andReturn(StorageRpc.Tuple.of("etag1", firstResult)); + expect(storageRpcMock.read( + blobId.toPb(), EMPTY_RPC_OPTIONS, DEFAULT_CHUNK_SIZE, DEFAULT_CHUNK_SIZE)) + .andReturn(StorageRpc.Tuple.of("etag2", firstResult)); + replay(storageRpcMock); + reader.read(firstReadBuffer); + try { + reader.read(secondReadBuffer); + fail("Expected BlobReadChannel read to throw StorageException"); + } catch (StorageException ex) { + StringBuilder messageBuilder = new StringBuilder(); + messageBuilder.append("Blob ").append(blobId).append(" was updated while reading"); + assertEquals(messageBuilder.toString(), ex.getMessage()); + } + } + @Test public void testSaveAndRestore() throws IOException, ClassNotFoundException { byte[] firstResult = randomByteArray(DEFAULT_CHUNK_SIZE); @@ -173,9 +198,9 @@ public void testSaveAndRestore() throws IOException, ClassNotFoundException { ByteBuffer firstReadBuffer = ByteBuffer.allocate(42); ByteBuffer secondReadBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE); expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE)) - .andReturn(firstResult); + .andReturn(StorageRpc.Tuple.of("etag", firstResult)); expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 42, DEFAULT_CHUNK_SIZE)) - .andReturn(secondResult); + .andReturn(StorageRpc.Tuple.of("etag", secondResult)); replay(storageRpcMock); reader = new BlobReadChannelImpl(options, BLOB_ID, EMPTY_RPC_OPTIONS); reader.read(firstReadBuffer); diff --git a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/ITStorageTest.java b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/ITStorageTest.java index 0d4e876271e3..8bc3dfd5e207 100644 --- a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/ITStorageTest.java +++ b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/ITStorageTest.java @@ -49,6 +49,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -731,6 +732,41 @@ public void testReadChannelFail() throws IOException { assertTrue(storage.delete(BUCKET, blobName)); } + @Test + public void testReadChannelFailUpdatedGeneration() throws IOException { + String blobName = "test-read-blob-fail-updated-generation"; + BlobInfo blob = BlobInfo.builder(BUCKET, blobName).build(); + Random random = new Random(); + int chunkSize = 1024; + int blobSize = 2 * chunkSize; + byte[] content = new byte[blobSize]; + random.nextBytes(content); + BlobInfo remoteBlob = storage.create(blob, content); + assertNotNull(remoteBlob); + assertEquals(blobSize, (long) remoteBlob.size()); + try (BlobReadChannel reader = storage.reader(blob.blobId())) { + reader.chunkSize(chunkSize); + ByteBuffer readBytes = ByteBuffer.allocate(chunkSize); + int numReadBytes = reader.read(readBytes); + assertEquals(chunkSize, numReadBytes); + assertArrayEquals(Arrays.copyOf(content, chunkSize), readBytes.array()); + try (BlobWriteChannel writer = storage.writer(blob)) { + byte[] newContent = new byte[blobSize]; + random.nextBytes(newContent); + int numWrittenBytes = writer.write(ByteBuffer.wrap(newContent)); + assertEquals(blobSize, numWrittenBytes); + } + readBytes = ByteBuffer.allocate(chunkSize); + reader.read(readBytes); + fail("StorageException was expected"); + } catch(StorageException ex) { + StringBuilder messageBuilder = new StringBuilder(); + messageBuilder.append("Blob ").append(blob.blobId()).append(" was updated while reading"); + assertEquals(messageBuilder.toString(), ex.getMessage()); + } + assertTrue(storage.delete(BUCKET, blobName)); + } + @Test public void testWriteChannelFail() throws IOException { String blobName = "test-write-channel-blob-fail"; diff --git a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/StorageImplTest.java b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/StorageImplTest.java index 359920314e82..32a466a9d551 100644 --- a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/StorageImplTest.java +++ b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/StorageImplTest.java @@ -1030,7 +1030,7 @@ public void testReaderWithOptions() throws IOException { byte[] result = new byte[DEFAULT_CHUNK_SIZE]; EasyMock.expect( storageRpcMock.read(BLOB_INFO2.toPb(), BLOB_SOURCE_OPTIONS, 0, DEFAULT_CHUNK_SIZE)) - .andReturn(result); + .andReturn(StorageRpc.Tuple.of("etag", result)); EasyMock.replay(storageRpcMock); storage = options.service(); BlobReadChannel channel = storage.reader(BUCKET_NAME1, BLOB_NAME2, BLOB_SOURCE_GENERATION, @@ -1045,7 +1045,7 @@ public void testReaderWithOptionsFromBlobId() throws IOException { byte[] result = new byte[DEFAULT_CHUNK_SIZE]; EasyMock.expect( storageRpcMock.read(BLOB_INFO1.blobId().toPb(), BLOB_SOURCE_OPTIONS, 0, DEFAULT_CHUNK_SIZE)) - .andReturn(result); + .andReturn(StorageRpc.Tuple.of("etag", result)); EasyMock.replay(storageRpcMock); storage = options.service(); BlobReadChannel channel = storage.reader(BLOB_INFO1.blobId(),