Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Serializable from BlobReadChannel and BlobWriteChannel #261

Merged
merged 7 commits into from
Oct 19, 2015
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.nio.channels.ReadableByteChannel;

/**
Expand All @@ -28,7 +27,7 @@
*
* This class is @{link Serializable}, which allows incremental reads.
*/
public interface BlobReadChannel extends ReadableByteChannel, Serializable, Closeable {
public interface BlobReadChannel extends ReadableByteChannel, Closeable {

/**
* Overridden to remove IOException.
Expand All @@ -46,4 +45,27 @@ public interface BlobReadChannel extends ReadableByteChannel, Serializable, Clos
*/
void chunkSize(int chunkSize);

/**
* Saves the read channel state.
*
* @return an object that contains the read channel state and can restore it afterwards. State
* object must implement {@link java.io.Serializable}.
*/
public State save();

This comment was marked as spam.


/**
* A common interface for all classes that implement the internal state of a
* {@code BlobReadChannel}.
*
* Implementations of this class must implement {@link java.io.Serializable} to ensure that the
* state of a channel can be correctly serialized.
*/
public interface State {

This comment was marked as spam.

This comment was marked as spam.


/**
* Returns a {@code BlobReadChannel} whose internal state reflects the one saved in the
* invocation object.
*/
public BlobReadChannel restore();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@
import static com.google.gcloud.RetryHelper.runWithRetries;

import com.google.api.services.storage.model.StorageObject;
import com.google.common.base.MoreObjects;
import com.google.gcloud.RetryHelper;
import com.google.gcloud.spi.StorageRpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;

/**
Expand All @@ -35,7 +36,6 @@
class BlobReadChannelImpl implements BlobReadChannel {

private static final int DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024;
private static final long serialVersionUID = 4821762590742862669L;

private final StorageOptions serviceOptions;
private final BlobId blob;
Expand All @@ -45,10 +45,10 @@ class BlobReadChannelImpl implements BlobReadChannel {
private boolean endOfStream;
private int chunkSize = DEFAULT_CHUNK_SIZE;

private transient StorageRpc storageRpc;
private transient StorageObject storageObject;
private transient int bufferPos;
private transient byte[] buffer;
private StorageRpc storageRpc;
private StorageObject storageObject;
private int bufferPos;
private byte[] buffer;

BlobReadChannelImpl(StorageOptions serviceOptions, BlobId blob,
Map<StorageRpc.Option, ?> requestOptions) {
Expand All @@ -59,19 +59,18 @@ class BlobReadChannelImpl implements BlobReadChannel {
initTransients();

This comment was marked as spam.

}

private void writeObject(ObjectOutputStream out) throws IOException {
@Override
public State save() {
StateImpl.Builder builder = StateImpl.builder(serviceOptions, blob, requestOptions)
.position(position)
.isOpen(isOpen)
.endOfStream(endOfStream)
.chunkSize(chunkSize);
if (buffer != null) {
position += bufferPos;
buffer = null;
bufferPos = 0;
endOfStream = false;
builder.position(position + bufferPos);
builder.endOfStream(false);
}
out.defaultWriteObject();
}

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
initTransients();
return builder.build();
}

private void initTransients() {
Expand Down Expand Up @@ -148,4 +147,116 @@ public byte[] call() {
}
return toWrite;
}

static class StateImpl implements BlobReadChannel.State, Serializable {

private static final long serialVersionUID = 3889420316004453706L;

private final StorageOptions serviceOptions;
private final BlobId blob;
private final Map<StorageRpc.Option, ?> requestOptions;
private final int position;
private final boolean isOpen;
private final boolean endOfStream;
private final int chunkSize;

StateImpl(Builder builder) {
this.serviceOptions = builder.serviceOptions;
this.blob = builder.blob;
this.requestOptions = builder.requestOptions;
this.position = builder.position;
this.isOpen = builder.isOpen;
this.endOfStream = builder.endOfStream;
this.chunkSize = builder.chunkSize;
}

public static class Builder {
private final StorageOptions serviceOptions;
private final BlobId blob;
private final Map<StorageRpc.Option, ?> requestOptions;
private int position;
private boolean isOpen;
private boolean endOfStream;
private int chunkSize;

private Builder(StorageOptions options, BlobId blob, Map<StorageRpc.Option, ?> reqOptions) {
this.serviceOptions = options;
this.blob = blob;
this.requestOptions = reqOptions;
}

public Builder position(int position) {
this.position = position;
return this;
}

public Builder isOpen(boolean isOpen) {
this.isOpen = isOpen;
return this;
}

public Builder endOfStream(boolean endOfStream) {
this.endOfStream = endOfStream;
return this;
}

public Builder chunkSize(int chunkSize) {
this.chunkSize = chunkSize;
return this;
}

public State build() {
return new StateImpl(this);
}
}

public static Builder builder(
StorageOptions options, BlobId blob, Map<StorageRpc.Option, ?> reqOptions) {
return new Builder(options, blob, reqOptions);
}

@Override
public BlobReadChannel restore() {
BlobReadChannelImpl channel = new BlobReadChannelImpl(serviceOptions, blob, requestOptions);
channel.position = position;
channel.isOpen = isOpen;
channel.endOfStream = endOfStream;
channel.chunkSize = chunkSize;
return channel;
}

@Override
public int hashCode() {
return Objects.hash(serviceOptions, blob, requestOptions, position, isOpen, endOfStream,
chunkSize);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (!(obj instanceof StateImpl)) {
return false;
}
final StateImpl other = (StateImpl) obj;
return Objects.equals(this.serviceOptions, other.serviceOptions) &&

This comment was marked as spam.

Objects.equals(this.blob, other.blob) &&
Objects.equals(this.requestOptions, other.requestOptions) &&
this.position == other.position &&
this.isOpen == other.isOpen &&
this.endOfStream == other.endOfStream &&
this.chunkSize == other.chunkSize;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("blob", blob)
.add("position", position)
.add("isOpen", isOpen)
.add("endOfStream", endOfStream)
.toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.google.gcloud.storage;

import java.io.Closeable;
import java.io.Serializable;
import java.nio.channels.WritableByteChannel;

/**
Expand All @@ -27,11 +26,38 @@
* data will only be visible after calling {@link #close()}. This class is serializable, to allow
* incremental writes.
*/
public interface BlobWriteChannel extends WritableByteChannel, Serializable, Closeable {
public interface BlobWriteChannel extends WritableByteChannel, Closeable {

/**
* Sets the minimum size that will be written by a single RPC.
* Written data will be buffered and only flushed upon reaching this size or closing the channel.
*/
void chunkSize(int chunkSize);

/**
* Saves the write channel state.
*
* @return an object that contains the write channel state and can restore it afterwards. State
* object must implement {@link java.io.Serializable}.
*/
public State save();

/**
* A common interface for all classes that implement the internal state of a
* {@code BlobWriteChannel}.
*
* Implementations of this class must implement {@link java.io.Serializable} to ensure that the
* state of a channel can be correctly serialized.
*/
public interface State {

/**
* Returns a {@code BlobWriteChannel} whose internal state reflects the one saved in the
* invocation object.
*
* The original {@code BlobWriteChannel} and the restored one should not both be used. Closing
* one channel causes the other channel to close, subsequent writes will fail.
*/
public BlobWriteChannel restore();
}
}
Loading