Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Serializable from BlobReadChannel and BlobWriteChannel #261

Merged
merged 7 commits into from
Oct 19, 2015
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2015 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.gcloud;

/**
* A common interface for restorable states. Implementations of {@code RestorableState} are capable
* of saving the state of an object to restore it for later use.
*
* Implementations of this class must implement {@link java.io.Serializable} to ensure that the
* state of a the object can be correctly serialized.
*/
public interface RestorableState<T> {

/**
* Returns an object whose internal state reflects the one saved in the invocation object.
*/
T restore();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package com.google.gcloud.storage;

import com.google.gcloud.RestorableState;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.nio.channels.ReadableByteChannel;

/**
Expand All @@ -28,7 +29,7 @@
*
* This class is @{link Serializable}, which allows incremental reads.
*/
public interface BlobReadChannel extends ReadableByteChannel, Serializable, Closeable {
public interface BlobReadChannel extends ReadableByteChannel, Closeable {

/**
* Overridden to remove IOException.
Expand All @@ -46,4 +47,11 @@ public interface BlobReadChannel extends ReadableByteChannel, Serializable, Clos
*/
void chunkSize(int chunkSize);

/**
* Saves the read channel state.
*
* @return a {@link RestorableState} object that contains the read channel state and can restore
* it afterwards. State object must implement {@link java.io.Serializable}.
*/
public RestorableState<BlobReadChannel> save();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@
import static com.google.gcloud.RetryHelper.runWithRetries;

import com.google.api.services.storage.model.StorageObject;
import com.google.common.base.MoreObjects;
import com.google.gcloud.RestorableState;
import com.google.gcloud.RetryHelper;
import com.google.gcloud.spi.StorageRpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;

/**
Expand All @@ -35,7 +37,6 @@
class BlobReadChannelImpl implements BlobReadChannel {

private static final int DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024;
private static final long serialVersionUID = 4821762590742862669L;

private final StorageOptions serviceOptions;
private final BlobId blob;
Expand All @@ -45,38 +46,33 @@ class BlobReadChannelImpl implements BlobReadChannel {
private boolean endOfStream;
private int chunkSize = DEFAULT_CHUNK_SIZE;

private transient StorageRpc storageRpc;
private transient StorageObject storageObject;
private transient int bufferPos;
private transient byte[] buffer;
private final StorageRpc storageRpc;
private final StorageObject storageObject;
private int bufferPos;
private byte[] buffer;

BlobReadChannelImpl(StorageOptions serviceOptions, BlobId blob,
Map<StorageRpc.Option, ?> requestOptions) {
this.serviceOptions = serviceOptions;
this.blob = blob;
this.requestOptions = requestOptions;
isOpen = true;
initTransients();
storageRpc = serviceOptions.storageRpc();
storageObject = blob.toPb();
}

private void writeObject(ObjectOutputStream out) throws IOException {
@Override
public RestorableState<BlobReadChannel> save() {
StateImpl.Builder builder = StateImpl.builder(serviceOptions, blob, requestOptions)
.position(position)
.isOpen(isOpen)
.endOfStream(endOfStream)
.chunkSize(chunkSize);
if (buffer != null) {
position += bufferPos;
buffer = null;
bufferPos = 0;
endOfStream = false;
builder.position(position + bufferPos);
builder.endOfStream(false);
}
out.defaultWriteObject();
}

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
initTransients();
}

private void initTransients() {
storageRpc = serviceOptions.storageRpc();
storageObject = blob.toPb();
return builder.build();
}

@Override
Expand Down Expand Up @@ -148,4 +144,116 @@ public byte[] call() {
}
return toWrite;
}

static class StateImpl implements RestorableState<BlobReadChannel>, Serializable {

private static final long serialVersionUID = 3889420316004453706L;

private final StorageOptions serviceOptions;
private final BlobId blob;
private final Map<StorageRpc.Option, ?> requestOptions;
private final int position;
private final boolean isOpen;
private final boolean endOfStream;
private final int chunkSize;

StateImpl(Builder builder) {
this.serviceOptions = builder.serviceOptions;
this.blob = builder.blob;
this.requestOptions = builder.requestOptions;
this.position = builder.position;
this.isOpen = builder.isOpen;
this.endOfStream = builder.endOfStream;
this.chunkSize = builder.chunkSize;
}

static class Builder {
private final StorageOptions serviceOptions;
private final BlobId blob;
private final Map<StorageRpc.Option, ?> requestOptions;
private int position;
private boolean isOpen;
private boolean endOfStream;
private int chunkSize;

private Builder(StorageOptions options, BlobId blob, Map<StorageRpc.Option, ?> reqOptions) {
this.serviceOptions = options;
this.blob = blob;
this.requestOptions = reqOptions;
}

Builder position(int position) {
this.position = position;
return this;
}

Builder isOpen(boolean isOpen) {
this.isOpen = isOpen;
return this;
}

Builder endOfStream(boolean endOfStream) {
this.endOfStream = endOfStream;
return this;
}

Builder chunkSize(int chunkSize) {
this.chunkSize = chunkSize;
return this;
}

RestorableState<BlobReadChannel> build() {
return new StateImpl(this);
}
}

static Builder builder(
StorageOptions options, BlobId blob, Map<StorageRpc.Option, ?> reqOptions) {
return new Builder(options, blob, reqOptions);
}

@Override
public BlobReadChannel restore() {
BlobReadChannelImpl channel = new BlobReadChannelImpl(serviceOptions, blob, requestOptions);
channel.position = position;
channel.isOpen = isOpen;
channel.endOfStream = endOfStream;
channel.chunkSize = chunkSize;
return channel;
}

@Override
public int hashCode() {
return Objects.hash(serviceOptions, blob, requestOptions, position, isOpen, endOfStream,
chunkSize);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (!(obj instanceof StateImpl)) {
return false;
}
final StateImpl other = (StateImpl) obj;
return Objects.equals(this.serviceOptions, other.serviceOptions)
&& Objects.equals(this.blob, other.blob)
&& Objects.equals(this.requestOptions, other.requestOptions)
&& this.position == other.position
&& this.isOpen == other.isOpen
&& this.endOfStream == other.endOfStream
&& this.chunkSize == other.chunkSize;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("blob", blob)
.add("position", position)
.add("isOpen", isOpen)
.add("endOfStream", endOfStream)
.toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@

package com.google.gcloud.storage;

import com.google.gcloud.RestorableState;

import java.io.Closeable;
import java.io.Serializable;
import java.nio.channels.WritableByteChannel;

/**
Expand All @@ -27,11 +28,21 @@
* data will only be visible after calling {@link #close()}. This class is serializable, to allow
* incremental writes.
*/
public interface BlobWriteChannel extends WritableByteChannel, Serializable, Closeable {
public interface BlobWriteChannel extends WritableByteChannel, Closeable {

/**
* Sets the minimum size that will be written by a single RPC.
* Written data will be buffered and only flushed upon reaching this size or closing the channel.
*/
void chunkSize(int chunkSize);

/**
* Saves the write channel state so that it can be restored afterwards. The original
* {@code BlobWriteChannel} and the restored one should not both be used. Closing one channel
* causes the other channel to close, subsequent writes will fail.
*
* @return a {@link RestorableState} object that contains the write channel state and can restore
* it afterwards. State object must implement {@link java.io.Serializable}.
*/
public RestorableState<BlobWriteChannel> save();
}
Loading