Skip to content

Commit

Permalink
[BEAM-5437] Fix tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
David Moravek committed Sep 22, 2018
1 parent 0d100b1 commit 46cd74b
Show file tree
Hide file tree
Showing 11 changed files with 41 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static TestPipeline createTestPipeline() {
final TestPipeline testPipeline = TestPipeline.fromOptions(pipelineOptions);
testPipeline
.getCoderRegistry()
.registerCoderForClass(Object.class, KryoCoder.withoutClassRegistration());
.registerCoderForClass(Object.class, KryoCoder.of(pipelineOptions));
return testPipeline;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ public <T> void execute(TestCase<T> tc) {
final EuphoriaOptions euphoriaOptions = pipelineOptions.as(EuphoriaOptions.class);
euphoriaOptions.setAccumulatorProviderFactory(accumulatorProvider);
final Pipeline pipeline = TestPipeline.create(pipelineOptions);
pipeline
.getCoderRegistry()
.registerCoderForClass(Object.class, KryoCoder.withoutClassRegistration());
pipeline.getCoderRegistry().registerCoderForClass(Object.class, KryoCoder.of(pipelineOptions));
final Dataset<T> output = tc.getOutput(pipeline);
tc.validate(output);
pipeline.run().waitUntilFinish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.MapElements;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.RightJoin;
import org.apache.beam.sdk.extensions.kryo.KryoCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
Expand Down Expand Up @@ -799,7 +800,7 @@ public boolean isCompatible(WindowFn<?, ?> other) {

@Override
public Coder<BoundedWindow> windowCoder() {
return KryoCoder.withoutClassRegistration();
return KryoCoder.of(PipelineOptionsFactory.create());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Sums;
import org.apache.beam.sdk.extensions.euphoria.core.testkit.accumulators.SnapshotProvider;
import org.apache.beam.sdk.extensions.kryo.KryoCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
Expand Down Expand Up @@ -652,7 +653,7 @@ public boolean isCompatible(WindowFn<?, ?> other) {

@Override
public Coder<CountWindow> windowCoder() {
return KryoCoder.withoutClassRegistration();
return KryoCoder.of(PipelineOptionsFactory.create());
}

@Override
Expand Down Expand Up @@ -764,7 +765,7 @@ public boolean isCompatible(WindowFn<?, ?> other) {

@Override
public Coder<UniqueWindow> windowCoder() {
return KryoCoder.withoutClassRegistration();
return KryoCoder.of(PipelineOptionsFactory.create());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class BeamMetricsTranslationTest {
public void setup() {
testPipeline
.getCoderRegistry()
.registerCoderForClass(Object.class, KryoCoder.withoutClassRegistration());
.registerCoderForClass(Object.class, KryoCoder.of(testPipeline.getOptions()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class EuphoriaTest implements Serializable {
public void setup() {
pipeline
.getCoderRegistry()
.registerCoderForClass(Object.class, KryoCoder.withoutClassRegistration());
.registerCoderForClass(Object.class, KryoCoder.of(pipeline.getOptions()));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public class KryoCoder<T> extends AtomicCoder<T> {
* Create a new {@link KryoCoder}.
*
* @param pipelineOptions Options used for coder setup. See {@link KryoOptions} for more details.
* @param <T> type of element this class should decode/encode
* {@link Kryo} instance used by returned {@link KryoCoder}
* @param <T> type of element this class should decode/encode {@link Kryo} instance used by
* returned {@link KryoCoder}
* @return Newly created a {@link KryoCoder}
*/
public static <T> KryoCoder<T> of(PipelineOptions pipelineOptions) {
Expand All @@ -59,9 +59,10 @@ public static <T> KryoCoder<T> of(PipelineOptions pipelineOptions) {
* Create a new {@link KryoCoder}.
*
* @param pipelineOptions Options used for coder setup. See {@link KryoOptions} for more details.
* @param registrars {@link KryoRegistrar}s which are used to register classes with underlying kryo instance
* @param <T> type of element this class should decode/encode
* {@link Kryo} instance used by returned {@link KryoCoder}
* @param registrars {@link KryoRegistrar}s which are used to register classes with underlying
* kryo instance
* @param <T> type of element this class should decode/encode {@link Kryo} instance used by
* returned {@link KryoCoder}
* @return Newly created a {@link KryoCoder}
*/
public static <T> KryoCoder<T> of(PipelineOptions pipelineOptions, KryoRegistrar... registrars) {
Expand All @@ -72,9 +73,10 @@ public static <T> KryoCoder<T> of(PipelineOptions pipelineOptions, KryoRegistrar
* Create a new {@link KryoCoder}.
*
* @param pipelineOptions Options used for coder setup. See {@link KryoOptions} for more details.
* @param registrars {@link KryoRegistrar}s which are used to register classes with underlying kryo instance
* @param <T> type of element this class should decode/encode
* {@link Kryo} instance used by returned {@link KryoCoder}
* @param registrars {@link KryoRegistrar}s which are used to register classes with underlying
* kryo instance
* @param <T> type of element this class should decode/encode {@link Kryo} instance used by
* returned {@link KryoCoder}
* @return Newly created a {@link KryoCoder}
*/
public static <T> KryoCoder<T> of(
Expand All @@ -91,19 +93,13 @@ public static <T> KryoCoder<T> of(
/** Serializable wrapper for {@link KryoOptions}. */
static class SerializableOptions implements Serializable {

/**
* Size of input and output buffer.
*/
/** Size of input and output buffer. */
private final int bufferSize;

/**
* Enables kryo reference tracking.
*/
/** Enables kryo reference tracking. */
private final boolean references;

/**
* Enables kryo required registration.
*/
/** Enables kryo required registration. */
private final boolean registrationRequired;

private SerializableOptions(int bufferSize, boolean references, boolean registrationRequired) {
Expand Down Expand Up @@ -140,14 +136,10 @@ boolean getRegistrationRequired() {
}
}

/**
* Unique id of the {@link KryoCoder} instance.
*/
/** Unique id of the {@link KryoCoder} instance. */
private final String instanceId = UUID.randomUUID().toString();

/**
* Options for underlying kryo instance.
*/
/** Options for underlying kryo instance. */
private final SerializableOptions options;

/** Client-defined class registrations to {@link Kryo}. */
Expand Down Expand Up @@ -190,7 +182,8 @@ public T decode(InputStream inStream) throws IOException {
final InputChunked inputChunked = kryoState.getInputChunked();
inputChunked.setInputStream(inStream);
try {
@SuppressWarnings("unchecked") final T instance = (T) kryoState.getKryo().readClassAndObject(inputChunked);
@SuppressWarnings("unchecked")
final T instance = (T) kryoState.getKryo().readClassAndObject(inputChunked);
return instance;
} catch (KryoException e) {
throw new CoderException("Cannot decode object from input stream.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public static KryoCoderProvider of(PipelineOptions pipelineOptions) {
* Create a new {@link KryoCoderProvider}.
*
* @param pipelineOptions Options used for coder setup. See {@link KryoOptions} for more details.
* @param registrars {@link KryoRegistrar}s which are used to register classes with underlying kryo instance
* @param registrars {@link KryoRegistrar}s which are used to register classes with underlying
* kryo instance
* @return A newly created {@link KryoCoderProvider}
*/
public static KryoCoderProvider of(PipelineOptions pipelineOptions, KryoRegistrar... registrars) {
Expand All @@ -61,7 +62,8 @@ public static KryoCoderProvider of(PipelineOptions pipelineOptions, KryoRegistra
* Create a new {@link KryoCoderProvider}.
*
* @param pipelineOptions Options used for coder setup. See {@link KryoOptions} for more details.
* @param registrars {@link KryoRegistrar}s which are used to register classes with underlying kryo instance
* @param registrars {@link KryoRegistrar}s which are used to register classes with underlying
* kryo instance
* @return A newly created {@link KryoCoderProvider}
*/
public static KryoCoderProvider of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public interface KryoOptions extends PipelineOptions {
void setKryoReferences(boolean references);

@JsonIgnore
@Description("Set to false to disable required registration")
@Default.Boolean(true)
@Description("Set to true to enable required registration")
@Default.Boolean(false)
boolean getKryoRegistrationRequired();

void setKryoRegistrationRequired(boolean registrationRequired);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
import java.util.concurrent.ConcurrentHashMap;
import org.objenesis.strategy.StdInstantiatorStrategy;

/**
* Reusable kryo instance.
*/
/** Reusable kryo instance. */
class KryoState {

private static final Storage STORAGE = new Storage();
Expand All @@ -35,9 +33,7 @@ static KryoState get(KryoCoder<?> coder) {
return STORAGE.getOrCreate(coder);
}

/**
* Caching thread local storage for reusable {@link KryoState}s.
*/
/** Caching thread local storage for reusable {@link KryoState}s. */
private static class Storage {

private final ThreadLocal<Map<String, KryoState>> kryoStateMap =
Expand All @@ -51,7 +47,8 @@ KryoState getOrCreate(KryoCoder<?> coder) {
k -> {
final Kryo kryo = new Kryo();
// fallback in case serialized class does not have default constructor
kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
kryo.setInstantiatorStrategy(
new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
kryo.setReferences(coder.getOptions().getReferences());
kryo.setRegistrationRequired(coder.getOptions().getRegistrationRequired());
for (KryoRegistrar registrar : coder.getRegistrars()) {
Expand All @@ -65,19 +62,13 @@ KryoState getOrCreate(KryoCoder<?> coder) {
}
}

/**
* The kryo instance.
*/
/** The kryo instance. */
private final Kryo kryo;

/**
* A reusable input buffer.
*/
/** A reusable input buffer. */
private final InputChunked inputChunked;

/**
* A reusable output buffer.
*/
/** A reusable output buffer. */
private final OutputChunked outputChunked;

private KryoState(Kryo kryo, InputChunked inputChunked, OutputChunked outputChunked) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.io.ObjectOutputStream;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.junit.Assert;
import org.junit.Test;

/** A set of unit {@link KryoState} tests. */
Expand All @@ -47,8 +46,8 @@ public void testSameKryoAfterDeserialization() throws IOException, ClassNotFound

final ObjectInputStream ois =
new ObjectInputStream(new ByteArrayInputStream(outStr.toByteArray()));
@SuppressWarnings("unchecked") final KryoCoder<?> deserializedCoder =
(KryoCoder) ois.readObject();
@SuppressWarnings("unchecked")
final KryoCoder<?> deserializedCoder = (KryoCoder) ois.readObject();
final KryoState secondKryo = KryoState.get(deserializedCoder);
assertSame(firstKryo, secondKryo);
}
Expand Down

0 comments on commit 46cd74b

Please sign in to comment.