diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/OperatorTests.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/OperatorTests.java index 164a093002c9c..89b82384e8ac5 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/OperatorTests.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/OperatorTests.java @@ -61,7 +61,7 @@ public static TestPipeline createTestPipeline() { final TestPipeline testPipeline = TestPipeline.fromOptions(pipelineOptions); testPipeline .getCoderRegistry() - .registerCoderForClass(Object.class, KryoCoder.withoutClassRegistration()); + .registerCoderForClass(Object.class, KryoCoder.of(pipelineOptions)); return testPipeline; } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/AbstractOperatorTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/AbstractOperatorTest.java index 0d2c86a341ec5..0cb747faa056e 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/AbstractOperatorTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/AbstractOperatorTest.java @@ -49,9 +49,7 @@ public void execute(TestCase tc) { final EuphoriaOptions euphoriaOptions = pipelineOptions.as(EuphoriaOptions.class); euphoriaOptions.setAccumulatorProviderFactory(accumulatorProvider); final Pipeline pipeline = TestPipeline.create(pipelineOptions); - pipeline - .getCoderRegistry() - .registerCoderForClass(Object.class, KryoCoder.withoutClassRegistration()); + pipeline.getCoderRegistry().registerCoderForClass(Object.class, KryoCoder.of(pipelineOptions)); final Dataset output = tc.getOutput(pipeline); tc.validate(output); pipeline.run().waitUntilFinish(); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/JoinTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/JoinTest.java index 1865056c39132..590c7e4b91c34 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/JoinTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/JoinTest.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.extensions.euphoria.core.client.operator.MapElements; import org.apache.beam.sdk.extensions.euphoria.core.client.operator.RightJoin; import org.apache.beam.sdk.extensions.kryo.KryoCoder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -799,7 +800,7 @@ public boolean isCompatible(WindowFn other) { @Override public Coder windowCoder() { - return KryoCoder.withoutClassRegistration(); + return KryoCoder.of(PipelineOptionsFactory.create()); } @Override diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/ReduceByKeyTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/ReduceByKeyTest.java index 4754592a463a9..a7a1607f2c197 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/ReduceByKeyTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/ReduceByKeyTest.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.extensions.euphoria.core.client.util.Sums; import org.apache.beam.sdk.extensions.euphoria.core.testkit.accumulators.SnapshotProvider; import org.apache.beam.sdk.extensions.kryo.KryoCoder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; @@ -652,7 +653,7 @@ public boolean isCompatible(WindowFn other) { @Override public Coder windowCoder() { - return KryoCoder.withoutClassRegistration(); + return KryoCoder.of(PipelineOptionsFactory.create()); } @Override @@ -764,7 +765,7 @@ public boolean isCompatible(WindowFn other) { @Override public Coder windowCoder() { - return KryoCoder.withoutClassRegistration(); + return KryoCoder.of(PipelineOptionsFactory.create()); } @Override diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamMetricsTranslationTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamMetricsTranslationTest.java index 4953e0a6a3ce9..1f324ca3effc6 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamMetricsTranslationTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamMetricsTranslationTest.java @@ -52,7 +52,7 @@ public class BeamMetricsTranslationTest { public void setup() { testPipeline .getCoderRegistry() - .registerCoderForClass(Object.class, KryoCoder.withoutClassRegistration()); + .registerCoderForClass(Object.class, KryoCoder.of(testPipeline.getOptions())); } /** diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/EuphoriaTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/EuphoriaTest.java index 0b8a0b9187d7d..e3e6ddf11a8ad 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/EuphoriaTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/EuphoriaTest.java @@ -48,7 +48,7 @@ public class EuphoriaTest implements Serializable { public void setup() { pipeline .getCoderRegistry() - .registerCoderForClass(Object.class, KryoCoder.withoutClassRegistration()); + .registerCoderForClass(Object.class, KryoCoder.of(pipeline.getOptions())); } @Test diff --git a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoder.java b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoder.java index 1a7540927f041..967bb59c23615 100644 --- a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoder.java +++ b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoder.java @@ -47,8 +47,8 @@ public class KryoCoder extends AtomicCoder { * Create a new {@link KryoCoder}. * * @param pipelineOptions Options used for coder setup. See {@link KryoOptions} for more details. - * @param type of element this class should decode/encode - * {@link Kryo} instance used by returned {@link KryoCoder} + * @param type of element this class should decode/encode {@link Kryo} instance used by + * returned {@link KryoCoder} * @return Newly created a {@link KryoCoder} */ public static KryoCoder of(PipelineOptions pipelineOptions) { @@ -59,9 +59,10 @@ public static KryoCoder of(PipelineOptions pipelineOptions) { * Create a new {@link KryoCoder}. * * @param pipelineOptions Options used for coder setup. See {@link KryoOptions} for more details. - * @param registrars {@link KryoRegistrar}s which are used to register classes with underlying kryo instance - * @param type of element this class should decode/encode - * {@link Kryo} instance used by returned {@link KryoCoder} + * @param registrars {@link KryoRegistrar}s which are used to register classes with underlying + * kryo instance + * @param type of element this class should decode/encode {@link Kryo} instance used by + * returned {@link KryoCoder} * @return Newly created a {@link KryoCoder} */ public static KryoCoder of(PipelineOptions pipelineOptions, KryoRegistrar... registrars) { @@ -72,9 +73,10 @@ public static KryoCoder of(PipelineOptions pipelineOptions, KryoRegistrar * Create a new {@link KryoCoder}. * * @param pipelineOptions Options used for coder setup. See {@link KryoOptions} for more details. - * @param registrars {@link KryoRegistrar}s which are used to register classes with underlying kryo instance - * @param type of element this class should decode/encode - * {@link Kryo} instance used by returned {@link KryoCoder} + * @param registrars {@link KryoRegistrar}s which are used to register classes with underlying + * kryo instance + * @param type of element this class should decode/encode {@link Kryo} instance used by + * returned {@link KryoCoder} * @return Newly created a {@link KryoCoder} */ public static KryoCoder of( @@ -91,19 +93,13 @@ public static KryoCoder of( /** Serializable wrapper for {@link KryoOptions}. */ static class SerializableOptions implements Serializable { - /** - * Size of input and output buffer. - */ + /** Size of input and output buffer. */ private final int bufferSize; - /** - * Enables kryo reference tracking. - */ + /** Enables kryo reference tracking. */ private final boolean references; - /** - * Enables kryo required registration. - */ + /** Enables kryo required registration. */ private final boolean registrationRequired; private SerializableOptions(int bufferSize, boolean references, boolean registrationRequired) { @@ -140,14 +136,10 @@ boolean getRegistrationRequired() { } } - /** - * Unique id of the {@link KryoCoder} instance. - */ + /** Unique id of the {@link KryoCoder} instance. */ private final String instanceId = UUID.randomUUID().toString(); - /** - * Options for underlying kryo instance. - */ + /** Options for underlying kryo instance. */ private final SerializableOptions options; /** Client-defined class registrations to {@link Kryo}. */ @@ -190,7 +182,8 @@ public T decode(InputStream inStream) throws IOException { final InputChunked inputChunked = kryoState.getInputChunked(); inputChunked.setInputStream(inStream); try { - @SuppressWarnings("unchecked") final T instance = (T) kryoState.getKryo().readClassAndObject(inputChunked); + @SuppressWarnings("unchecked") + final T instance = (T) kryoState.getKryo().readClassAndObject(inputChunked); return instance; } catch (KryoException e) { throw new CoderException("Cannot decode object from input stream.", e); diff --git a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProvider.java b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProvider.java index 906f74e28dda2..c47311c41fb9d 100644 --- a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProvider.java +++ b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProvider.java @@ -50,7 +50,8 @@ public static KryoCoderProvider of(PipelineOptions pipelineOptions) { * Create a new {@link KryoCoderProvider}. * * @param pipelineOptions Options used for coder setup. See {@link KryoOptions} for more details. - * @param registrars {@link KryoRegistrar}s which are used to register classes with underlying kryo instance + * @param registrars {@link KryoRegistrar}s which are used to register classes with underlying + * kryo instance * @return A newly created {@link KryoCoderProvider} */ public static KryoCoderProvider of(PipelineOptions pipelineOptions, KryoRegistrar... registrars) { @@ -61,7 +62,8 @@ public static KryoCoderProvider of(PipelineOptions pipelineOptions, KryoRegistra * Create a new {@link KryoCoderProvider}. * * @param pipelineOptions Options used for coder setup. See {@link KryoOptions} for more details. - * @param registrars {@link KryoRegistrar}s which are used to register classes with underlying kryo instance + * @param registrars {@link KryoRegistrar}s which are used to register classes with underlying + * kryo instance * @return A newly created {@link KryoCoderProvider} */ public static KryoCoderProvider of( diff --git a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoOptions.java b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoOptions.java index 69ddffab7130f..fa217837aae4e 100644 --- a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoOptions.java +++ b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoOptions.java @@ -40,8 +40,8 @@ public interface KryoOptions extends PipelineOptions { void setKryoReferences(boolean references); @JsonIgnore - @Description("Set to false to disable required registration") - @Default.Boolean(true) + @Description("Set to true to enable required registration") + @Default.Boolean(false) boolean getKryoRegistrationRequired(); void setKryoRegistrationRequired(boolean registrationRequired); diff --git a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoState.java b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoState.java index ed36f1ec52828..c8c6ead1f60d6 100644 --- a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoState.java +++ b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoState.java @@ -24,9 +24,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.objenesis.strategy.StdInstantiatorStrategy; -/** - * Reusable kryo instance. - */ +/** Reusable kryo instance. */ class KryoState { private static final Storage STORAGE = new Storage(); @@ -35,9 +33,7 @@ static KryoState get(KryoCoder coder) { return STORAGE.getOrCreate(coder); } - /** - * Caching thread local storage for reusable {@link KryoState}s. - */ + /** Caching thread local storage for reusable {@link KryoState}s. */ private static class Storage { private final ThreadLocal> kryoStateMap = @@ -51,7 +47,8 @@ KryoState getOrCreate(KryoCoder coder) { k -> { final Kryo kryo = new Kryo(); // fallback in case serialized class does not have default constructor - kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy())); + kryo.setInstantiatorStrategy( + new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy())); kryo.setReferences(coder.getOptions().getReferences()); kryo.setRegistrationRequired(coder.getOptions().getRegistrationRequired()); for (KryoRegistrar registrar : coder.getRegistrars()) { @@ -65,19 +62,13 @@ KryoState getOrCreate(KryoCoder coder) { } } - /** - * The kryo instance. - */ + /** The kryo instance. */ private final Kryo kryo; - /** - * A reusable input buffer. - */ + /** A reusable input buffer. */ private final InputChunked inputChunked; - /** - * A reusable output buffer. - */ + /** A reusable output buffer. */ private final OutputChunked outputChunked; private KryoState(Kryo kryo, InputChunked inputChunked, OutputChunked outputChunked) { diff --git a/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoStateTest.java b/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoStateTest.java index 46488cc0fd2ee..2b5f2f49d1823 100644 --- a/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoStateTest.java +++ b/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoStateTest.java @@ -26,7 +26,6 @@ import java.io.ObjectOutputStream; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.junit.Assert; import org.junit.Test; /** A set of unit {@link KryoState} tests. */ @@ -47,8 +46,8 @@ public void testSameKryoAfterDeserialization() throws IOException, ClassNotFound final ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(outStr.toByteArray())); - @SuppressWarnings("unchecked") final KryoCoder deserializedCoder = - (KryoCoder) ois.readObject(); + @SuppressWarnings("unchecked") + final KryoCoder deserializedCoder = (KryoCoder) ois.readObject(); final KryoState secondKryo = KryoState.get(deserializedCoder); assertSame(firstKryo, secondKryo); }