diff --git a/gcloud-java-core/src/main/java/com/google/gcloud/RestorableState.java b/gcloud-java-core/src/main/java/com/google/gcloud/RestorableState.java new file mode 100644 index 000000000000..9cd3ee5c3c4c --- /dev/null +++ b/gcloud-java-core/src/main/java/com/google/gcloud/RestorableState.java @@ -0,0 +1,32 @@ +/* + * Copyright 2015 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.gcloud; + +/** + * A common interface for restorable states. Implementations of {@code RestorableState} are capable + * of saving the state of an object to restore it for later use. + * + * Implementations of this class must implement {@link java.io.Serializable} to ensure that the + * state of a the object can be correctly serialized. + */ +public interface RestorableState { + + /** + * Returns an object whose internal state reflects the one saved in the invocation object. + */ + T restore(); +} diff --git a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannel.java b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannel.java index ad1a385d9a83..b004e3d61634 100644 --- a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannel.java +++ b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannel.java @@ -16,9 +16,10 @@ package com.google.gcloud.storage; +import com.google.gcloud.RestorableState; + import java.io.Closeable; import java.io.IOException; -import java.io.Serializable; import java.nio.channels.ReadableByteChannel; /** @@ -28,7 +29,7 @@ * * This class is @{link Serializable}, which allows incremental reads. */ -public interface BlobReadChannel extends ReadableByteChannel, Serializable, Closeable { +public interface BlobReadChannel extends ReadableByteChannel, Closeable { /** * Overridden to remove IOException. @@ -46,4 +47,11 @@ public interface BlobReadChannel extends ReadableByteChannel, Serializable, Clos */ void chunkSize(int chunkSize); + /** + * Saves the read channel state. + * + * @return a {@link RestorableState} object that contains the read channel state and can restore + * it afterwards. State object must implement {@link java.io.Serializable}. + */ + public RestorableState save(); } diff --git a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannelImpl.java b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannelImpl.java index 79fe2fd1e531..7731d04837a6 100644 --- a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannelImpl.java +++ b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobReadChannelImpl.java @@ -19,14 +19,16 @@ import static com.google.gcloud.RetryHelper.runWithRetries; import com.google.api.services.storage.model.StorageObject; +import com.google.common.base.MoreObjects; +import com.google.gcloud.RestorableState; import com.google.gcloud.RetryHelper; import com.google.gcloud.spi.StorageRpc; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; +import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Map; +import java.util.Objects; import java.util.concurrent.Callable; /** @@ -35,7 +37,6 @@ class BlobReadChannelImpl implements BlobReadChannel { private static final int DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024; - private static final long serialVersionUID = 4821762590742862669L; private final StorageOptions serviceOptions; private final BlobId blob; @@ -45,10 +46,10 @@ class BlobReadChannelImpl implements BlobReadChannel { private boolean endOfStream; private int chunkSize = DEFAULT_CHUNK_SIZE; - private transient StorageRpc storageRpc; - private transient StorageObject storageObject; - private transient int bufferPos; - private transient byte[] buffer; + private final StorageRpc storageRpc; + private final StorageObject storageObject; + private int bufferPos; + private byte[] buffer; BlobReadChannelImpl(StorageOptions serviceOptions, BlobId blob, Map requestOptions) { @@ -56,27 +57,22 @@ class BlobReadChannelImpl implements BlobReadChannel { this.blob = blob; this.requestOptions = requestOptions; isOpen = true; - initTransients(); + storageRpc = serviceOptions.storageRpc(); + storageObject = blob.toPb(); } - private void writeObject(ObjectOutputStream out) throws IOException { + @Override + public RestorableState save() { + StateImpl.Builder builder = StateImpl.builder(serviceOptions, blob, requestOptions) + .position(position) + .isOpen(isOpen) + .endOfStream(endOfStream) + .chunkSize(chunkSize); if (buffer != null) { - position += bufferPos; - buffer = null; - bufferPos = 0; - endOfStream = false; + builder.position(position + bufferPos); + builder.endOfStream(false); } - out.defaultWriteObject(); - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - initTransients(); - } - - private void initTransients() { - storageRpc = serviceOptions.storageRpc(); - storageObject = blob.toPb(); + return builder.build(); } @Override @@ -148,4 +144,116 @@ public byte[] call() { } return toWrite; } + + static class StateImpl implements RestorableState, Serializable { + + private static final long serialVersionUID = 3889420316004453706L; + + private final StorageOptions serviceOptions; + private final BlobId blob; + private final Map requestOptions; + private final int position; + private final boolean isOpen; + private final boolean endOfStream; + private final int chunkSize; + + StateImpl(Builder builder) { + this.serviceOptions = builder.serviceOptions; + this.blob = builder.blob; + this.requestOptions = builder.requestOptions; + this.position = builder.position; + this.isOpen = builder.isOpen; + this.endOfStream = builder.endOfStream; + this.chunkSize = builder.chunkSize; + } + + static class Builder { + private final StorageOptions serviceOptions; + private final BlobId blob; + private final Map requestOptions; + private int position; + private boolean isOpen; + private boolean endOfStream; + private int chunkSize; + + private Builder(StorageOptions options, BlobId blob, Map reqOptions) { + this.serviceOptions = options; + this.blob = blob; + this.requestOptions = reqOptions; + } + + Builder position(int position) { + this.position = position; + return this; + } + + Builder isOpen(boolean isOpen) { + this.isOpen = isOpen; + return this; + } + + Builder endOfStream(boolean endOfStream) { + this.endOfStream = endOfStream; + return this; + } + + Builder chunkSize(int chunkSize) { + this.chunkSize = chunkSize; + return this; + } + + RestorableState build() { + return new StateImpl(this); + } + } + + static Builder builder( + StorageOptions options, BlobId blob, Map reqOptions) { + return new Builder(options, blob, reqOptions); + } + + @Override + public BlobReadChannel restore() { + BlobReadChannelImpl channel = new BlobReadChannelImpl(serviceOptions, blob, requestOptions); + channel.position = position; + channel.isOpen = isOpen; + channel.endOfStream = endOfStream; + channel.chunkSize = chunkSize; + return channel; + } + + @Override + public int hashCode() { + return Objects.hash(serviceOptions, blob, requestOptions, position, isOpen, endOfStream, + chunkSize); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (!(obj instanceof StateImpl)) { + return false; + } + final StateImpl other = (StateImpl) obj; + return Objects.equals(this.serviceOptions, other.serviceOptions) + && Objects.equals(this.blob, other.blob) + && Objects.equals(this.requestOptions, other.requestOptions) + && this.position == other.position + && this.isOpen == other.isOpen + && this.endOfStream == other.endOfStream + && this.chunkSize == other.chunkSize; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("blob", blob) + .add("position", position) + .add("isOpen", isOpen) + .add("endOfStream", endOfStream) + .toString(); + } + } } diff --git a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannel.java b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannel.java index 20b2ce087632..be3ef2293ec3 100644 --- a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannel.java +++ b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannel.java @@ -16,8 +16,9 @@ package com.google.gcloud.storage; +import com.google.gcloud.RestorableState; + import java.io.Closeable; -import java.io.Serializable; import java.nio.channels.WritableByteChannel; /** @@ -27,11 +28,21 @@ * data will only be visible after calling {@link #close()}. This class is serializable, to allow * incremental writes. */ -public interface BlobWriteChannel extends WritableByteChannel, Serializable, Closeable { +public interface BlobWriteChannel extends WritableByteChannel, Closeable { /** * Sets the minimum size that will be written by a single RPC. * Written data will be buffered and only flushed upon reaching this size or closing the channel. */ void chunkSize(int chunkSize); + + /** + * Saves the write channel state so that it can be restored afterwards. The original + * {@code BlobWriteChannel} and the restored one should not both be used. Closing one channel + * causes the other channel to close, subsequent writes will fail. + * + * @return a {@link RestorableState} object that contains the write channel state and can restore + * it afterwards. State object must implement {@link java.io.Serializable}. + */ + public RestorableState save(); } diff --git a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java index 8cb95a797cf6..1c841d1dfc6a 100644 --- a/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java +++ b/gcloud-java-storage/src/main/java/com/google/gcloud/storage/BlobWriteChannelImpl.java @@ -20,15 +20,17 @@ import static java.util.concurrent.Executors.callable; import com.google.api.services.storage.model.StorageObject; +import com.google.common.base.MoreObjects; +import com.google.gcloud.RestorableState; import com.google.gcloud.RetryHelper; import com.google.gcloud.spi.StorageRpc; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; +import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Map; +import java.util.Objects; /** * Default implementation for BlobWriteChannel. @@ -48,26 +50,43 @@ class BlobWriteChannelImpl implements BlobWriteChannel { private boolean isOpen = true; private int chunkSize = DEFAULT_CHUNK_SIZE; - private transient StorageRpc storageRpc; - private transient StorageObject storageObject; + private final StorageRpc storageRpc; + private final StorageObject storageObject; BlobWriteChannelImpl(StorageOptions options, BlobInfo blobInfo, Map optionsMap) { this.options = options; this.blobInfo = blobInfo; - initTransients(); + storageRpc = options.storageRpc(); + storageObject = blobInfo.toPb(); uploadId = storageRpc.open(storageObject, optionsMap); } - private void writeObject(ObjectOutputStream out) throws IOException { + BlobWriteChannelImpl(StorageOptions options, BlobInfo blobInfo, String uploadId) { + this.options = options; + this.blobInfo = blobInfo; + this.uploadId = uploadId; + storageRpc = options.storageRpc(); + storageObject = blobInfo.toPb(); + } + + @Override + public RestorableState save() { + byte[] bufferToSave = null; if (isOpen) { - flush(true); + flush(); + bufferToSave = Arrays.copyOf(buffer, limit); } - out.defaultWriteObject(); + return StateImpl.builder(options, blobInfo, uploadId) + .position(position) + .buffer(bufferToSave) + .isOpen(isOpen) + .chunkSize(chunkSize) + .build(); } - private void flush(boolean compact) { - if (limit >= chunkSize || compact && limit >= MIN_CHUNK_SIZE) { + private void flush() { + if (limit >= chunkSize) { final int length = limit - limit % MIN_CHUNK_SIZE; try { runWithRetries(callable(new Runnable() { @@ -81,24 +100,12 @@ public void run() { } position += length; limit -= length; - byte[] temp = new byte[compact ? limit : chunkSize]; + byte[] temp = new byte[chunkSize]; System.arraycopy(buffer, length, temp, 0, limit); buffer = temp; } } - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - if (isOpen) { - initTransients(); - } - } - - private void initTransients() { - storageRpc = options.storageRpc(); - storageObject = blobInfo.toPb(); - } - private void validateOpen() throws IOException { if (!isOpen) { throw new IOException("stream is closed"); @@ -117,7 +124,7 @@ public int write(ByteBuffer byteBuffer) throws IOException { byteBuffer.get(buffer, limit, toWrite); } limit += toWrite; - flush(false); + flush(); return toWrite; } @@ -150,4 +157,118 @@ public void chunkSize(int chunkSize) { chunkSize = (chunkSize / MIN_CHUNK_SIZE) * MIN_CHUNK_SIZE; this.chunkSize = Math.max(MIN_CHUNK_SIZE, chunkSize); } + + static class StateImpl implements RestorableState, Serializable { + + private static final long serialVersionUID = 8541062465055125619L; + + private final StorageOptions serviceOptions; + private final BlobInfo blobInfo; + private final String uploadId; + private final int position; + private final byte[] buffer; + private final boolean isOpen; + private final int chunkSize; + + StateImpl(Builder builder) { + this.serviceOptions = builder.serviceOptions; + this.blobInfo = builder.blobInfo; + this.uploadId = builder.uploadId; + this.position = builder.position; + this.buffer = builder.buffer; + this.isOpen = builder.isOpen; + this.chunkSize = builder.chunkSize; + } + + static class Builder { + private final StorageOptions serviceOptions; + private final BlobInfo blobInfo; + private final String uploadId; + private int position; + private byte[] buffer; + private boolean isOpen; + private int chunkSize; + + private Builder(StorageOptions options, BlobInfo blobInfo, String uploadId) { + this.serviceOptions = options; + this.blobInfo = blobInfo; + this.uploadId = uploadId; + } + + Builder position(int position) { + this.position = position; + return this; + } + + Builder buffer(byte[] buffer) { + this.buffer = buffer; + return this; + } + + Builder isOpen(boolean isOpen) { + this.isOpen = isOpen; + return this; + } + + Builder chunkSize(int chunkSize) { + this.chunkSize = chunkSize; + return this; + } + + RestorableState build() { + return new StateImpl(this); + } + } + + static Builder builder(StorageOptions options, BlobInfo blobInfo, String uploadId) { + return new Builder(options, blobInfo, uploadId); + } + + @Override + public BlobWriteChannel restore() { + BlobWriteChannelImpl channel = new BlobWriteChannelImpl(serviceOptions, blobInfo, uploadId); + if (buffer != null) { + channel.buffer = buffer.clone(); + channel.limit = buffer.length; + } + channel.position = position; + channel.isOpen = isOpen; + channel.chunkSize = chunkSize; + return channel; + } + + @Override + public int hashCode() { + return Objects.hash(serviceOptions, blobInfo, uploadId, position, isOpen, chunkSize, + Arrays.hashCode(buffer)); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (!(obj instanceof StateImpl)) { + return false; + } + final StateImpl other = (StateImpl) obj; + return Objects.equals(this.serviceOptions, other.serviceOptions) + && Objects.equals(this.blobInfo, other.blobInfo) + && Objects.equals(this.uploadId, other.uploadId) + && Objects.deepEquals(this.buffer, other.buffer) + && this.position == other.position + && this.isOpen == other.isOpen + && this.chunkSize == other.chunkSize; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("blobInfo", blobInfo) + .add("uploadId", uploadId) + .add("position", position) + .add("isOpen", isOpen) + .toString(); + } + } } diff --git a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobReadChannelImplTest.java b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobReadChannelImplTest.java index c5c9a0e48612..e8ed915581b8 100644 --- a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobReadChannelImplTest.java +++ b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobReadChannelImplTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.fail; import com.google.common.collect.ImmutableMap; +import com.google.gcloud.RestorableState; import com.google.gcloud.RetryParams; import com.google.gcloud.spi.StorageRpc; @@ -180,6 +181,46 @@ public void testReadClosed() { } } + @Test + public void testSaveAndRestore() throws IOException, ClassNotFoundException { + EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock).times(2); + EasyMock.expect(optionsMock.retryParams()).andReturn(RetryParams.noRetries()).times(2); + EasyMock.replay(optionsMock); + byte[] firstResult = randomByteArray(DEFAULT_CHUNK_SIZE); + byte[] secondResult = randomByteArray(DEFAULT_CHUNK_SIZE); + ByteBuffer firstReadBuffer = ByteBuffer.allocate(42); + ByteBuffer secondReadBuffer = ByteBuffer.allocate(DEFAULT_CHUNK_SIZE); + EasyMock + .expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 0, DEFAULT_CHUNK_SIZE)) + .andReturn(firstResult); + EasyMock + .expect(storageRpcMock.read(BLOB_ID.toPb(), EMPTY_RPC_OPTIONS, 42, DEFAULT_CHUNK_SIZE)) + .andReturn(secondResult); + EasyMock.replay(storageRpcMock); + reader = new BlobReadChannelImpl(optionsMock, BLOB_ID, EMPTY_RPC_OPTIONS); + reader.read(firstReadBuffer); + RestorableState readerState = reader.save(); + BlobReadChannel restoredReader = readerState.restore(); + restoredReader.read(secondReadBuffer); + assertArrayEquals(Arrays.copyOf(firstResult, firstReadBuffer.capacity()), + firstReadBuffer.array()); + assertArrayEquals(secondResult, secondReadBuffer.array()); + } + + @Test + public void testStateEquals() { + EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock).times(2); + EasyMock.replay(optionsMock); + EasyMock.replay(storageRpcMock); + reader = new BlobReadChannelImpl(optionsMock, BLOB_ID, EMPTY_RPC_OPTIONS); + BlobReadChannel secondReader = new BlobReadChannelImpl(optionsMock, BLOB_ID, EMPTY_RPC_OPTIONS); + RestorableState state = reader.save(); + RestorableState secondState = secondReader.save(); + assertEquals(state, secondState); + assertEquals(state.hashCode(), secondState.hashCode()); + assertEquals(state.toString(), secondState.toString()); + } + private static byte[] randomByteArray(int size) { byte[] byteArray = new byte[size]; RANDOM.nextBytes(byteArray); diff --git a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobWriteChannelImplTest.java b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobWriteChannelImplTest.java index 54135cc9990d..ab3f7a000d90 100644 --- a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobWriteChannelImplTest.java +++ b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/BlobWriteChannelImplTest.java @@ -23,11 +23,14 @@ import static org.junit.Assert.fail; import com.google.common.collect.ImmutableMap; +import com.google.gcloud.RestorableState; import com.google.gcloud.RetryParams; import com.google.gcloud.spi.StorageRpc; import org.easymock.Capture; +import org.easymock.CaptureType; import org.easymock.EasyMock; +import org.junit.After; import org.junit.Test; import org.junit.Before; @@ -36,7 +39,6 @@ import java.util.Arrays; import java.util.Map; import java.util.Random; -import org.junit.After; public class BlobWriteChannelImplTest { @@ -192,6 +194,74 @@ public void testWriteClosed() throws IOException { } } + @Test + public void testSaveAndRestore() throws IOException { + EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock).times(2); + EasyMock.expect(optionsMock.retryParams()).andReturn(RetryParams.noRetries()).times(2); + EasyMock.replay(optionsMock); + EasyMock.expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + Capture capturedBuffer = Capture.newInstance(CaptureType.ALL); + Capture capturedPosition = Capture.newInstance(CaptureType.ALL); + storageRpcMock.write(EasyMock.eq(UPLOAD_ID), EasyMock.capture(capturedBuffer), EasyMock.eq(0), + EasyMock.eq(BLOB_INFO.toPb()), EasyMock.captureLong(capturedPosition), + EasyMock.eq(DEFAULT_CHUNK_SIZE), EasyMock.eq(false)); + EasyMock.expectLastCall().times(2); + EasyMock.replay(storageRpcMock); + ByteBuffer buffer1 = randomBuffer(DEFAULT_CHUNK_SIZE); + ByteBuffer buffer2 = randomBuffer(DEFAULT_CHUNK_SIZE); + writer = new BlobWriteChannelImpl(optionsMock, BLOB_INFO, EMPTY_RPC_OPTIONS); + assertEquals(DEFAULT_CHUNK_SIZE, writer.write(buffer1)); + assertArrayEquals(buffer1.array(), capturedBuffer.getValues().get(0)); + assertEquals(new Long(0L), capturedPosition.getValues().get(0)); + RestorableState writerState = writer.save(); + BlobWriteChannel restoredWriter = writerState.restore(); + assertEquals(DEFAULT_CHUNK_SIZE, restoredWriter.write(buffer2)); + assertArrayEquals(buffer2.array(), capturedBuffer.getValues().get(1)); + assertEquals(new Long(DEFAULT_CHUNK_SIZE), capturedPosition.getValues().get(1)); + } + + @Test + public void testSaveAndRestoreClosed() throws IOException { + EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock).times(2); + EasyMock.expect(optionsMock.retryParams()).andReturn(RetryParams.noRetries()); + EasyMock.replay(optionsMock); + EasyMock.expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID); + Capture capturedBuffer = Capture.newInstance(); + storageRpcMock.write(EasyMock.eq(UPLOAD_ID), EasyMock.capture(capturedBuffer), EasyMock.eq(0), + EasyMock.eq(BLOB_INFO.toPb()), EasyMock.eq(0L), EasyMock.eq(0), EasyMock.eq(true)); + EasyMock.expectLastCall(); + EasyMock.replay(storageRpcMock); + writer = new BlobWriteChannelImpl(optionsMock, BLOB_INFO, EMPTY_RPC_OPTIONS); + writer.close(); + RestorableState writerState = writer.save(); + RestorableState expectedWriterState = + BlobWriteChannelImpl.StateImpl.builder(optionsMock, BLOB_INFO, UPLOAD_ID) + .buffer(null) + .chunkSize(DEFAULT_CHUNK_SIZE) + .isOpen(false) + .position(0) + .build(); + BlobWriteChannel restoredWriter = writerState.restore(); + assertArrayEquals(new byte[0], capturedBuffer.getValue()); + assertEquals(expectedWriterState, restoredWriter.save()); + } + + @Test + public void testStateEquals() { + EasyMock.expect(optionsMock.storageRpc()).andReturn(storageRpcMock).times(2); + EasyMock.replay(optionsMock); + EasyMock.expect(storageRpcMock.open(BLOB_INFO.toPb(), EMPTY_RPC_OPTIONS)).andReturn(UPLOAD_ID) + .times(2); + EasyMock.replay(storageRpcMock); + writer = new BlobWriteChannelImpl(optionsMock, BLOB_INFO, EMPTY_RPC_OPTIONS); + BlobWriteChannel writer2 = new BlobWriteChannelImpl(optionsMock, BLOB_INFO, EMPTY_RPC_OPTIONS); + RestorableState state = writer.save(); + RestorableState state2 = writer2.save(); + assertEquals(state, state2); + assertEquals(state.hashCode(), state2.hashCode()); + assertEquals(state.toString(), state2.toString()); + } + private static ByteBuffer randomBuffer(int size) { byte[] byteArray = new byte[size]; RANDOM.nextBytes(byteArray); diff --git a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/ITStorageTest.java b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/ITStorageTest.java index 775d81f28fa8..7469b0bb7fb8 100644 --- a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/ITStorageTest.java +++ b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/ITStorageTest.java @@ -25,6 +25,7 @@ import static org.junit.Assert.fail; import com.google.common.collect.ImmutableList; +import com.google.gcloud.RestorableState; import com.google.gcloud.storage.testing.RemoteGcsHelper; import java.io.ByteArrayInputStream; @@ -35,7 +36,6 @@ import java.net.URLConnection; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.Calendar; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutionException; @@ -399,6 +399,35 @@ public void testReadAndWriteChannels() throws UnsupportedEncodingException, IOEx assertTrue(storage.delete(bucket, blobName)); } + @Test + public void testReadAndWriteSaveChannels() throws UnsupportedEncodingException, IOException { + String blobName = "test-read-and-write-save-channels-blob"; + BlobInfo blob = BlobInfo.builder(bucket, blobName).build(); + byte[] stringBytes; + BlobWriteChannel writer = storage.writer(blob); + stringBytes = BLOB_STRING_CONTENT.getBytes(UTF_8); + writer.write(ByteBuffer.wrap(BLOB_BYTE_CONTENT)); + RestorableState writerState = writer.save(); + BlobWriteChannel secondWriter = writerState.restore(); + secondWriter.write(ByteBuffer.wrap(stringBytes)); + secondWriter.close(); + ByteBuffer readBytes; + ByteBuffer readStringBytes; + BlobReadChannel reader = storage.reader(blob.blobId()); + reader.chunkSize(BLOB_BYTE_CONTENT.length); + readBytes = ByteBuffer.allocate(BLOB_BYTE_CONTENT.length); + reader.read(readBytes); + RestorableState readerState = reader.save(); + BlobReadChannel secondReader = readerState.restore(); + readStringBytes = ByteBuffer.allocate(stringBytes.length); + secondReader.read(readStringBytes); + reader.close(); + secondReader.close(); + assertArrayEquals(BLOB_BYTE_CONTENT, readBytes.array()); + assertEquals(BLOB_STRING_CONTENT, new String(readStringBytes.array(), UTF_8)); + assertTrue(storage.delete(bucket, blobName)); + } + @Test public void testReadChannelFail() throws UnsupportedEncodingException, IOException { String blobName = "test-read-channel-blob-fail"; diff --git a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/SerializationTest.java b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/SerializationTest.java index 2dd1b8cbf895..edda4ed17e25 100644 --- a/gcloud-java-storage/src/test/java/com/google/gcloud/storage/SerializationTest.java +++ b/gcloud-java-storage/src/test/java/com/google/gcloud/storage/SerializationTest.java @@ -19,8 +19,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotSame; +import com.google.common.collect.ImmutableMap; import com.google.gcloud.AuthCredentials; +import com.google.gcloud.RestorableState; import com.google.gcloud.RetryParams; +import com.google.gcloud.spi.StorageRpc; import com.google.gcloud.storage.Acl.Project.ProjectRole; import org.junit.Test; @@ -32,6 +35,7 @@ import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.Collections; +import java.util.Map; public class SerializationTest { @@ -64,6 +68,7 @@ public class SerializationTest { Storage.BucketSourceOption.metagenerationMatch(1); private static final Storage.BucketTargetOption BUCKET_TARGET_OPTIONS = Storage.BucketTargetOption.metagenerationNotMatch(); + private static final Map EMPTY_RPC_OPTIONS = ImmutableMap.of(); @Test public void testServiceOptions() throws Exception { @@ -100,8 +105,40 @@ public void testModelAndRequests() throws Exception { } } + @Test + public void testReadChannelState() throws IOException, ClassNotFoundException { + StorageOptions options = StorageOptions.builder() + .projectId("p2") + .retryParams(RetryParams.getDefaultInstance()) + .authCredentials(AuthCredentials.noCredentials()) + .build(); + BlobReadChannel reader = + new BlobReadChannelImpl(options, BlobId.of("b", "n"), EMPTY_RPC_OPTIONS); + RestorableState state = reader.save(); + RestorableState deserializedState = serializeAndDeserialize(state); + assertEquals(state, deserializedState); + assertEquals(state.hashCode(), deserializedState.hashCode()); + assertEquals(state.toString(), deserializedState.toString()); + } + + @Test + public void testWriteChannelState() throws IOException, ClassNotFoundException { + StorageOptions options = StorageOptions.builder() + .projectId("p2") + .retryParams(RetryParams.getDefaultInstance()) + .authCredentials(AuthCredentials.noCredentials()) + .build(); + BlobWriteChannelImpl writer = new BlobWriteChannelImpl( + options, BlobInfo.builder(BlobId.of("b", "n")).build(), "upload-id"); + RestorableState state = writer.save(); + RestorableState deserializedState = serializeAndDeserialize(state); + assertEquals(state, deserializedState); + assertEquals(state.hashCode(), deserializedState.hashCode()); + assertEquals(state.toString(), deserializedState.toString()); + } + @SuppressWarnings("unchecked") - private T serializeAndDeserialize(T obj) + private T serializeAndDeserialize(T obj) throws IOException, ClassNotFoundException { ByteArrayOutputStream bytes = new ByteArrayOutputStream(); try (ObjectOutputStream output = new ObjectOutputStream(bytes)) {