Skip to content

Commit

Permalink
Remove Serializable from BlobRead and BlobWriteChannel
Browse files Browse the repository at this point in the history
- remove serializable from interfaces
- add State interface and save method to channels
- add StateImpl class to channel implementations
- add tests
  • Loading branch information
mziccard committed Oct 15, 2015
1 parent 7115734 commit 029fbe2
Show file tree
Hide file tree
Showing 7 changed files with 443 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.nio.channels.ReadableByteChannel;

/**
Expand All @@ -28,7 +27,7 @@
*
* This class is @{link Serializable}, which allows incremental reads.
*/
public interface BlobReadChannel extends ReadableByteChannel, Serializable, Closeable {
public interface BlobReadChannel extends ReadableByteChannel, Closeable {

/**
* Overridden to remove IOException.
Expand All @@ -46,4 +45,27 @@ public interface BlobReadChannel extends ReadableByteChannel, Serializable, Clos
*/
void chunkSize(int chunkSize);

/**
* Saves the read channel state.
*
* @return an object that contains the read channel state and can restore it afterwards. State
* object must implement {@link java.io.Serializable}.
*/
public State save();

/**
* A common interface for all classes that implement the internal state of a
* {@code BlobReadChannel}.
*
* Implementations of this class must implement {@link java.io.Serializable} to ensure that the
* state of a channel can be correctly serialized.
*/
public interface State {

/**
* Returns a {@code BlobReadChannel} whose internal state reflects the one saved in the
* invocation object.
*/
public BlobReadChannel restore();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
import static com.google.gcloud.RetryHelper.runWithRetries;

import com.google.api.services.storage.model.StorageObject;
import com.google.common.base.MoreObjects;
import com.google.gcloud.RetryHelper;
import com.google.gcloud.spi.StorageRpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;

/**
Expand All @@ -35,7 +36,6 @@
class BlobReadChannelImpl implements BlobReadChannel {

private static final int DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024;
private static final long serialVersionUID = 4821762590742862669L;

private final StorageOptions serviceOptions;
private final BlobId blob;
Expand All @@ -45,10 +45,10 @@ class BlobReadChannelImpl implements BlobReadChannel {
private boolean endOfStream;
private int chunkSize = DEFAULT_CHUNK_SIZE;

private transient StorageRpc storageRpc;
private transient StorageObject storageObject;
private transient int bufferPos;
private transient byte[] buffer;
private StorageRpc storageRpc;
private StorageObject storageObject;
private int bufferPos;
private byte[] buffer;

BlobReadChannelImpl(StorageOptions serviceOptions, BlobId blob,
Map<StorageRpc.Option, ?> requestOptions) {
Expand All @@ -59,19 +59,18 @@ class BlobReadChannelImpl implements BlobReadChannel {
initTransients();
}

private void writeObject(ObjectOutputStream out) throws IOException {
@Override
public State save() {
StateImpl.Builder builder = StateImpl.builder(serviceOptions, blob, requestOptions)
.position(position)
.isOpen(isOpen)
.endOfStream(endOfStream)
.chunkSize(chunkSize);
if (buffer != null) {
position += bufferPos;
buffer = null;
bufferPos = 0;
endOfStream = false;
builder.position(position + bufferPos);
builder.endOfStream(false);
}
out.defaultWriteObject();
}

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
initTransients();
return builder.build();
}

private void initTransients() {
Expand Down Expand Up @@ -148,4 +147,116 @@ public byte[] call() {
}
return toWrite;
}

static class StateImpl implements BlobReadChannel.State, Serializable {

private static final long serialVersionUID = 3889420316004453706L;

private final StorageOptions serviceOptions;
private final BlobId blob;
private final Map<StorageRpc.Option, ?> requestOptions;
private final int position;
private final boolean isOpen;
private final boolean endOfStream;
private final int chunkSize;

StateImpl(Builder builder) {
this.serviceOptions = builder.serviceOptions;
this.blob = builder.blob;
this.requestOptions = builder.requestOptions;
this.position = builder.position;
this.isOpen = builder.isOpen;
this.endOfStream = builder.endOfStream;
this.chunkSize = builder.chunkSize;
}

public static class Builder {
private final StorageOptions serviceOptions;
private final BlobId blob;
private final Map<StorageRpc.Option, ?> requestOptions;
private int position;
private boolean isOpen;
private boolean endOfStream;
private int chunkSize;

private Builder(StorageOptions options, BlobId blob, Map<StorageRpc.Option, ?> reqOptions) {
this.serviceOptions = options;
this.blob = blob;
this.requestOptions = reqOptions;
}

public Builder position(int position) {
this.position = position;
return this;
}

public Builder isOpen(boolean isOpen) {
this.isOpen = isOpen;
return this;
}

public Builder endOfStream(boolean endOfStream) {
this.endOfStream = endOfStream;
return this;
}

public Builder chunkSize(int chunkSize) {
this.chunkSize = chunkSize;
return this;
}

public State build() {
return new StateImpl(this);
}
}

public static Builder builder(
StorageOptions options, BlobId blob, Map<StorageRpc.Option, ?> reqOptions) {
return new Builder(options, blob, reqOptions);
}

@Override
public BlobReadChannel restore() {
BlobReadChannelImpl channel = new BlobReadChannelImpl(serviceOptions, blob, requestOptions);
channel.position = position;
channel.isOpen = isOpen;
channel.endOfStream = endOfStream;
channel.chunkSize = chunkSize;
return channel;
}

@Override
public int hashCode() {
return Objects.hash(serviceOptions, blob, requestOptions, position, isOpen, endOfStream,
chunkSize);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (!(obj instanceof StateImpl)) {
return false;
}
final StateImpl other = (StateImpl) obj;
return Objects.equals(this.serviceOptions, other.serviceOptions) &&
Objects.equals(this.blob, other.blob) &&
Objects.equals(this.requestOptions, other.requestOptions) &&
this.position == other.position &&
this.isOpen == other.isOpen &&
this.endOfStream == other.endOfStream &&
this.chunkSize == other.chunkSize;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("blob", blob)
.add("position", position)
.add("isOpen", isOpen)
.add("endOfStream", endOfStream)
.toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.google.gcloud.storage;

import java.io.Closeable;
import java.io.Serializable;
import java.nio.channels.WritableByteChannel;

/**
Expand All @@ -27,11 +26,38 @@
* data will only be visible after calling {@link #close()}. This class is serializable, to allow
* incremental writes.
*/
public interface BlobWriteChannel extends WritableByteChannel, Serializable, Closeable {
public interface BlobWriteChannel extends WritableByteChannel, Closeable {

/**
* Sets the minimum size that will be written by a single RPC.
* Written data will be buffered and only flushed upon reaching this size or closing the channel.
*/
void chunkSize(int chunkSize);

/**
* Saves the write channel state.
*
* @return an object that contains the write channel state and can restore it afterwards. State
* object must implement {@link java.io.Serializable}.
*/
public State save();

/**
* A common interface for all classes that implement the internal state of a
* {@code BlobWriteChannel}.
*
* Implementations of this class must implement {@link java.io.Serializable} to ensure that the
* state of a channel can be correctly serialized.
*/
public interface State {

/**
* Returns a {@code BlobWriteChannel} whose internal state reflects the one saved in the
* invocation object.
*
* The original {@code BlobWriteChannel} and the restored one should not both be used. Closing
* one channel causes the other channel to close, subsequent writes will fail.
*/
public BlobWriteChannel restore();
}
}
Loading

0 comments on commit 029fbe2

Please sign in to comment.