From 0d100b1af00786a8485180bd2bd39b20ad4b39bb Mon Sep 17 00:00:00 2001 From: David Moravek Date: Sat, 22 Sep 2018 14:08:48 +0200 Subject: [PATCH 1/5] [BEAM-5437] Allow kryo provider to use multiple registrars. --- .../extensions/kryo/IdentifiedRegistrar.java | 82 ------ .../beam/sdk/extensions/kryo/KryoCoder.java | 236 +++++++++++++----- .../extensions/kryo/KryoCoderProvider.java | 142 +++++------ .../beam/sdk/extensions/kryo/KryoFactory.java | 113 --------- .../beam/sdk/extensions/kryo/KryoOptions.java | 48 ++++ .../beam/sdk/extensions/kryo/KryoState.java | 115 +++++++++ .../kryo/IdentifiedRegistrarTest.java | 48 ---- .../kryo/KryoCoderProviderTest.java | 78 +++--- .../sdk/extensions/kryo/KryoCoderTest.java | 154 ++++++------ ...ryoFactoryTest.java => KryoStateTest.java} | 41 ++- 10 files changed, 531 insertions(+), 526 deletions(-) delete mode 100644 sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/IdentifiedRegistrar.java delete mode 100644 sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoFactory.java create mode 100644 sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoOptions.java create mode 100644 sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoState.java delete mode 100644 sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/IdentifiedRegistrarTest.java rename sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/{KryoFactoryTest.java => KryoStateTest.java} (52%) diff --git a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/IdentifiedRegistrar.java b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/IdentifiedRegistrar.java deleted file mode 100644 index b2c51c73f67a9..0000000000000 --- a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/IdentifiedRegistrar.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.kryo; - -import com.esotericsoftware.kryo.Kryo; -import java.io.Serializable; -import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * {@link KryoRegistrar} enriched by Id. - * - *

New instances of the same (possibly lambda) implementation of {@link KryoRegistrar} may be - * created by (de)serialization. And since lambda expressions do not retain their type (instance of - * {@link Class}) after deserialization, we need something else to avoid creation of more {@link - * Kryo} instances then really needed. That is why any given {@link KryoRegistrar} instance is - * enriched by Id. - */ -class IdentifiedRegistrar implements Serializable { - - static final int NO_OP_REGISTRAR_ID = -1; - private static final Logger LOG = LoggerFactory.getLogger(IdentifiedRegistrar.class); - - private static final AtomicInteger idSource = new AtomicInteger(); - - private final int id; - private final KryoRegistrar registrar; - - private IdentifiedRegistrar(int id, KryoRegistrar registrar) { - this.id = id; - this.registrar = registrar; - } - - static IdentifiedRegistrar of(KryoRegistrar registrar) { - Objects.requireNonNull(registrar); - IdentifiedRegistrar identifiedRegistrar = - new IdentifiedRegistrar(idSource.getAndIncrement(), registrar); - LOG.debug( - "Kryo registrar id [{}] was assigned to [{}].", - identifiedRegistrar.getId(), - registrar.getClass()); - return identifiedRegistrar; - } - - static IdentifiedRegistrar defaultNoOpRegistrar() { - return new IdentifiedRegistrar( - NO_OP_REGISTRAR_ID, - (kryo) -> { - /*No-Op*/ - }); - } - - @Override - public String toString() { - return "IdentifiedRegistrar{" + "id=" + id + ", registrar=" + registrar + '}'; - } - - public int getId() { - return id; - } - - public KryoRegistrar getRegistrar() { - return registrar; - } -} diff --git a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoder.java b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoder.java index d7dcaf11ee20b..1a7540927f041 100644 --- a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoder.java +++ b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoder.java @@ -21,106 +21,218 @@ import com.esotericsoftware.kryo.KryoException; import com.esotericsoftware.kryo.io.InputChunked; import com.esotericsoftware.kryo.io.OutputChunked; -import com.google.common.annotations.VisibleForTesting; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.coders.CustomCoder; +import org.apache.beam.sdk.options.PipelineOptions; /** * Coder using Kryo as (de)serialization mechanism. See {@link KryoCoderProvider} to get more - * details of how to use it + * details about usage. + * + * @param type of element coder can handle */ -public class KryoCoder extends CustomCoder { +public class KryoCoder extends AtomicCoder { /** - * Client-defined class registrations to {@link Kryo}. + * Create a new {@link KryoCoder}. * - *

{@link KryoCoder} needs it to be able to create a {@link Kryo} instance with correct class - * registrations after its deserialization. + * @param pipelineOptions Options used for coder setup. See {@link KryoOptions} for more details. + * @param type of element this class should decode/encode + * {@link Kryo} instance used by returned {@link KryoCoder} + * @return Newly created a {@link KryoCoder} */ - private final IdentifiedRegistrar registrarWithId; - - private KryoCoder(IdentifiedRegistrar registrarWithId) { - this.registrarWithId = registrarWithId; + public static KryoCoder of(PipelineOptions pipelineOptions) { + return of(pipelineOptions, Collections.emptyList()); } /** - * @param type of element this class should code/encode - * @param registrarWithId uniquely identified {@link KryoRegistrar} which is used to register - * classes to {@link Kryo} instance used by returned {@link KryoCoder} - * @return Newly created a {@link KryoCoder} instance which will use {@link Kryo} with classes - * registered by {@code registrarWithId}. + * Create a new {@link KryoCoder}. + * + * @param pipelineOptions Options used for coder setup. See {@link KryoOptions} for more details. + * @param registrars {@link KryoRegistrar}s which are used to register classes with underlying kryo instance + * @param type of element this class should decode/encode + * {@link Kryo} instance used by returned {@link KryoCoder} + * @return Newly created a {@link KryoCoder} */ - public static KryoCoder of(IdentifiedRegistrar registrarWithId) { - return new KryoCoder<>(registrarWithId); + public static KryoCoder of(PipelineOptions pipelineOptions, KryoRegistrar... registrars) { + return of(pipelineOptions, Arrays.asList(registrars)); } /** - * @param type of element this class should code/encode - * @return Newly created a {@link KryoCoder} instance which will use {@link Kryo} without class - * registration. That degrades performance. Use {@link #of(IdentifiedRegistrar)} whenever - * possible. + * Create a new {@link KryoCoder}. + * + * @param pipelineOptions Options used for coder setup. See {@link KryoOptions} for more details. + * @param registrars {@link KryoRegistrar}s which are used to register classes with underlying kryo instance + * @param type of element this class should decode/encode + * {@link Kryo} instance used by returned {@link KryoCoder} + * @return Newly created a {@link KryoCoder} */ - public static KryoCoder withoutClassRegistration() { - return new KryoCoder<>(KryoFactory.NO_OP_REGISTRAR); + public static KryoCoder of( + PipelineOptions pipelineOptions, List registrars) { + final KryoOptions kryoOptions = pipelineOptions.as(KryoOptions.class); + return new KryoCoder<>( + new SerializableOptions( + kryoOptions.getKryoBufferSize(), + kryoOptions.getKryoReferences(), + kryoOptions.getKryoRegistrationRequired()), + registrars); } - @Override - public void encode(T value, OutputStream outStream) throws IOException { + /** Serializable wrapper for {@link KryoOptions}. */ + static class SerializableOptions implements Serializable { - Kryo kryo = KryoFactory.getOrCreateKryo(registrarWithId); + /** + * Size of input and output buffer. + */ + private final int bufferSize; - OutputChunked output = KryoFactory.getKryoOutput(); - output.clear(); - output.setOutputStream(outStream); + /** + * Enables kryo reference tracking. + */ + private final boolean references; - try { - kryo.writeClassAndObject(output, value); - output.endChunks(); - output.flush(); - } catch (IllegalArgumentException e) { - throw new CoderException( - String.format( - "Cannot encode given object of type '%s'. " - + "Forgotten kryo registration is possible explanation. Kryo registrations where done by '%s'.", - (value == null) ? null : value.getClass().getSimpleName(), registrarWithId), - e); + /** + * Enables kryo required registration. + */ + private final boolean registrationRequired; + + private SerializableOptions(int bufferSize, boolean references, boolean registrationRequired) { + this.bufferSize = bufferSize; + this.references = references; + this.registrationRequired = registrationRequired; + } + + /** + * {@link SerializableOptions#bufferSize} + * + * @return buffer size + */ + int getBufferSize() { + return bufferSize; + } + + /** + * {@link SerializableOptions#references} + * + * @return boolean flag + */ + boolean getReferences() { + return references; + } + + /** + * {@link SerializableOptions#registrationRequired} + * + * @return boolean flag + */ + boolean getRegistrationRequired() { + return registrationRequired; } } - @Override - public T decode(InputStream inStream) throws IOException { + /** + * Unique id of the {@link KryoCoder} instance. + */ + private final String instanceId = UUID.randomUUID().toString(); - InputChunked input = KryoFactory.getKryoInput(); - input.rewind(); - input.setInputStream(inStream); + /** + * Options for underlying kryo instance. + */ + private final SerializableOptions options; + + /** Client-defined class registrations to {@link Kryo}. */ + private final List registrars; - Kryo kryo = KryoFactory.getOrCreateKryo(registrarWithId); + private KryoCoder(SerializableOptions options, List registrars) { + this.options = options; + this.registrars = registrars; + } + @Override + public void encode(T value, OutputStream outStream) throws IOException { + final KryoState kryoState = KryoState.get(this); + if (value == null) { + throw new CoderException("Cannot encode a null value."); + } + final OutputChunked outputChunked = kryoState.getOutputChunked(); + outputChunked.setOutputStream(outStream); try { - @SuppressWarnings("unchecked") - T outObject = (T) kryo.readClassAndObject(input); - return outObject; + kryoState.getKryo().writeClassAndObject(outputChunked, value); + outputChunked.endChunks(); + outputChunked.flush(); + } catch (KryoException e) { + outputChunked.clear(); + if (e.getCause() instanceof EOFException) { + throw (EOFException) e.getCause(); + } + throw new CoderException("Cannot encode given object of type [" + value.getClass() + "].", e); + } catch (IllegalArgumentException e) { + if (e.getMessage().startsWith("Class is not registered")) { + throw new CoderException(e.getMessage()); + } + throw e; + } + } + @Override + public T decode(InputStream inStream) throws IOException { + final KryoState kryoState = KryoState.get(this); + final InputChunked inputChunked = kryoState.getInputChunked(); + inputChunked.setInputStream(inStream); + try { + @SuppressWarnings("unchecked") final T instance = (T) kryoState.getKryo().readClassAndObject(inputChunked); + return instance; } catch (KryoException e) { - throw new CoderException( - String.format( - "Cannot decode object from input stream." - + " Forgotten kryo registration is possible explanation. Kryo registrations where done by '%s'.", - registrarWithId), - e); + throw new CoderException("Cannot decode object from input stream.", e); } } - @VisibleForTesting - IdentifiedRegistrar getRegistrar() { - return registrarWithId; + /** + * Create a new {@link KryoCoder} instance with the user provided registrar. + * + * @param registrar registrar to append to list of already registered registrars. + * @return new kryo coder + */ + public KryoCoder withRegistrar(KryoRegistrar registrar) { + final List newRegistrars = new ArrayList<>(registrars); + registrars.add(registrar); + return new KryoCoder<>(options, newRegistrars); } - @Override - public void verifyDeterministic() throws NonDeterministicException { - // nop + /** + * {@link KryoCoder#instanceId} + * + * @return instance id + */ + String getInstanceId() { + return instanceId; + } + + /** + * {@link KryoCoder#options} + * + * @return options + */ + SerializableOptions getOptions() { + return options; + } + + /** + * {@link KryoCoder#registrars} + * + * @return registrars + */ + List getRegistrars() { + return registrars; } } diff --git a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProvider.java b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProvider.java index 5d2e36b191470..906f74e28dda2 100644 --- a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProvider.java +++ b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProvider.java @@ -17,17 +17,17 @@ */ package org.apache.beam.sdk.extensions.kryo; -import static java.util.Objects.requireNonNull; - import com.esotericsoftware.kryo.ClassResolver; import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.Registration; import com.google.common.annotations.VisibleForTesting; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderProvider; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.values.TypeDescriptor; /** @@ -36,95 +36,91 @@ */ public class KryoCoderProvider extends CoderProvider { - private final IdentifiedRegistrar kryoRegistrar; + /** + * Create a new {@link KryoCoderProvider}. + * + * @param pipelineOptions Options used for coder setup. See {@link KryoOptions} for more details. + * @return A newly created {@link KryoCoderProvider} + */ + public static KryoCoderProvider of(PipelineOptions pipelineOptions) { + return of(pipelineOptions, Collections.emptyList()); + } /** - * Starts build of {@link KryoCoderProvider} with given {@link KryoRegistrar}. + * Create a new {@link KryoCoderProvider}. * - * @param registrar user defined implementation of {@link KryoRegistrar} - * @return next step of building + * @param pipelineOptions Options used for coder setup. See {@link KryoOptions} for more details. + * @param registrars {@link KryoRegistrar}s which are used to register classes with underlying kryo instance + * @return A newly created {@link KryoCoderProvider} */ - public static FinalBuilder of(KryoRegistrar registrar) { - return new Builder(registrar); + public static KryoCoderProvider of(PipelineOptions pipelineOptions, KryoRegistrar... registrars) { + return of(pipelineOptions, Arrays.asList(registrars)); } - private KryoCoderProvider(IdentifiedRegistrar kryoRegistrar) { - requireNonNull(kryoRegistrar); - this.kryoRegistrar = kryoRegistrar; + /** + * Create a new {@link KryoCoderProvider}. + * + * @param pipelineOptions Options used for coder setup. See {@link KryoOptions} for more details. + * @param registrars {@link KryoRegistrar}s which are used to register classes with underlying kryo instance + * @return A newly created {@link KryoCoderProvider} + */ + public static KryoCoderProvider of( + PipelineOptions pipelineOptions, List registrars) { + final KryoOptions kryoOptions = pipelineOptions.as(KryoOptions.class); + return new KryoCoderProvider(KryoCoder.of(kryoOptions, registrars)); } - @Override - public Coder coderFor( - TypeDescriptor typeDescriptor, List> componentCoders) - throws CannotProvideCoderException { + /** {@link KryoRegistrar}s associated with this provider instance. */ + private final KryoCoder coder; - return createKryoCoderIfRawTypeRegistered(typeDescriptor); + private KryoCoderProvider(KryoCoder coder) { + this.coder = coder; } - private KryoCoder createKryoCoderIfRawTypeRegistered(TypeDescriptor typeDescriptor) + @Override + @SuppressWarnings("unchecked") + public Coder coderFor( + TypeDescriptor typeDescriptor, List> componentCoders) throws CannotProvideCoderException { - - Class rawType = typeDescriptor.getRawType(); - Kryo kryo = KryoFactory.getOrCreateKryo(kryoRegistrar); - ClassResolver classResolver = kryo.getClassResolver(); - - Registration registration = classResolver.getRegistration(rawType); - if (registration == null) { - throw new CannotProvideCoderException( - String.format( - "Cannot provide %s, given type descriptor's '%s' raw type is not registered in Kryo.", - KryoCoder.class.getSimpleName(), typeDescriptor)); + if (hasRegistration(typeDescriptor)) { + return (Coder) coder; } - - return KryoCoder.of(kryoRegistrar); + throw new CannotProvideCoderException( + String.format( + "Cannot provide [%s], given type descriptor's [%s] raw type is not registered in Kryo.", + KryoCoder.class.getSimpleName(), typeDescriptor)); } - @VisibleForTesting - IdentifiedRegistrar getKryoRegistrar() { - return kryoRegistrar; + private boolean hasRegistration(TypeDescriptor typeDescriptor) { + final KryoState kryoState = KryoState.get(coder); + final Class rawType = typeDescriptor.getRawType(); + final Kryo kryo = kryoState.getKryo(); + final ClassResolver classResolver = kryo.getClassResolver(); + return classResolver.getRegistration(rawType) != null; } - // ----------------- Builder steps - - /** Last step when building {@link KryoCoderProvider}. */ - public interface FinalBuilder { - - /** - * Builds {@link KryoCoderProvider}. - * - * @return the build {@link KryoCoderProvider} instance. - */ - KryoCoderProvider build(); - - /** - * Builds {@link KryoCoderProvider} and register it to given {@link Pipeline}. - * - * @param pipeline Pipeline whose coder registry will be used to register {@link - * KryoCoderProvider} under build. - */ - void buildAndRegister(Pipeline pipeline); + /** + * Create a new {@link KryoCoderProvider} with the provided registrar. + * + * @param registrar registrar to append to the list of already registered registrars. + * @return a new {@link KryoCoderProvider} + */ + public KryoCoderProvider withRegistrar(KryoRegistrar registrar) { + return new KryoCoderProvider(coder.withRegistrar(registrar)); } - // ----------------- Builder - - /** A builders chain implementation. Starts with {@link KryoCoderProvider#of(KryoRegistrar)}. */ - public static class Builder implements FinalBuilder { - private final KryoRegistrar registrar; - - public Builder(KryoRegistrar registrar) { - this.registrar = requireNonNull(registrar); - } - - @Override - public KryoCoderProvider build() { - IdentifiedRegistrar registrarWithId = IdentifiedRegistrar.of(registrar); - return new KryoCoderProvider(registrarWithId); - } + /** + * Builds {@link KryoCoderProvider} and register it to given {@link Pipeline}. + * + * @param pipeline Pipeline whose coder registry will be used to register {@link + * KryoCoderProvider} under build. + */ + public void registerTo(Pipeline pipeline) { + pipeline.getCoderRegistry().registerCoderProvider(this); + } - @Override - public void buildAndRegister(Pipeline pipeline) { - KryoCoderProvider provider = build(); - pipeline.getCoderRegistry().registerCoderProvider(provider); - } + @VisibleForTesting + KryoCoder getCoder() { + return coder; } } diff --git a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoFactory.java b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoFactory.java deleted file mode 100644 index 0e22bebe81e70..0000000000000 --- a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoFactory.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.kryo; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.InputChunked; -import com.esotericsoftware.kryo.io.Output; -import com.esotericsoftware.kryo.io.OutputChunked; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import org.objenesis.strategy.StdInstantiatorStrategy; - -/** - * A source of {@link Kryo} instances. It allows {@link Kryo} to be reused by many {@link KryoCoder - * KryoCoders}. - */ -class KryoFactory { - - /** Initial size of byte buffers in {@link Output}, {@link Input}. */ - private static final int DEFAULT_BUFFER_SIZE = 4096; - - /** - * No-op {@link IdentifiedRegistrar}. Use of this registrar degrades performance since {@link - * Kryo} needs to serialize fully specified class name instead of id. - * - *

{@link #getOrCreateKryo(IdentifiedRegistrar)} returns {@link Kryo} which allows for - * serialization of unregistered classes when this {@link IdentifiedRegistrar} is used to call it. - */ - static final IdentifiedRegistrar NO_OP_REGISTRAR = IdentifiedRegistrar.defaultNoOpRegistrar(); - - private static Kryo createKryo(IdentifiedRegistrar registrarWithId) { - final Kryo instance = new Kryo(); - ((Kryo.DefaultInstantiatorStrategy) instance.getInstantiatorStrategy()) - .setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()); - - // mere NO_OP_REGISTRAR == registrarWithId is not enough since - // NO_OP_REGISTRAR can be deserialized into several instances - if (registrarWithId.getId() == IdentifiedRegistrar.NO_OP_REGISTRAR_ID) { - instance.setRegistrationRequired(false); - } else { - instance.setRegistrationRequired(true); - registrarWithId.getRegistrar().registerClasses(instance); - } - - return instance; - } - - private static ThreadLocal threadLocalOutput = - ThreadLocal.withInitial(() -> new OutputChunked(DEFAULT_BUFFER_SIZE)); - - private static ThreadLocal threadLocalInput = - ThreadLocal.withInitial(() -> new InputChunked(DEFAULT_BUFFER_SIZE)); - - /** - * We need an instance of {@link KryoRegistrar} to do actual {@link Kryo} registration. But since - * every other instance of the same implementation of {@link KryoRegistrar} should do the same - * classes registration, we use {@link IdentifiedRegistrar IdentifiedRegistrar's} Id as a key. - * - *

{@link ThreadLocal} is utilized to allow re-usability of {@link Kryo} by many instances of - * {@link KryoCoder}. - */ - private static Map> kryoByRegistrarId = new HashMap<>(); - - /** - * Returns {@link Kryo} instance which has classes registered by this {@code registrar} or - * previously given {@link KryoRegistrar} instance of the same type. The returned instance is - * either created by this call or returned from cache. - * - *

If given {@code registrar} is {@link #NO_OP_REGISTRAR} then returned kryo allows for - * (de)serialization of unregistered classes. That is not otherwise allowed. - */ - static Kryo getOrCreateKryo(IdentifiedRegistrar registrarWithId) { - Objects.requireNonNull(registrarWithId); - - synchronized (kryoByRegistrarId) { - ThreadLocal kryoThreadLocal = - kryoByRegistrarId.computeIfAbsent(registrarWithId.getId(), (k) -> new ThreadLocal<>()); - - Kryo kryoInstance = kryoThreadLocal.get(); - if (kryoInstance == null) { - kryoInstance = createKryo(registrarWithId); - kryoThreadLocal.set(kryoInstance); - } - - return kryoInstance; - } - } - - static InputChunked getKryoInput() { - return threadLocalInput.get(); - } - - static OutputChunked getKryoOutput() { - return threadLocalOutput.get(); - } -} diff --git a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoOptions.java b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoOptions.java new file mode 100644 index 0000000000000..69ddffab7130f --- /dev/null +++ b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoOptions.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.kryo; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; + +@Description("Options for KryoCoder") +public interface KryoOptions extends PipelineOptions { + + @JsonIgnore + @Description("Set buffer size") + @Default.Integer(64 * 1024) + int getKryoBufferSize(); + + void setKryoBufferSize(int bufferSize); + + @JsonIgnore + @Description("Set to false to disable reference tracking") + @Default.Boolean(true) + boolean getKryoReferences(); + + void setKryoReferences(boolean references); + + @JsonIgnore + @Description("Set to false to disable required registration") + @Default.Boolean(true) + boolean getKryoRegistrationRequired(); + + void setKryoRegistrationRequired(boolean registrationRequired); +} diff --git a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoState.java b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoState.java new file mode 100644 index 0000000000000..ed36f1ec52828 --- /dev/null +++ b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoState.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.kryo; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.InputChunked; +import com.esotericsoftware.kryo.io.OutputChunked; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.objenesis.strategy.StdInstantiatorStrategy; + +/** + * Reusable kryo instance. + */ +class KryoState { + + private static final Storage STORAGE = new Storage(); + + static KryoState get(KryoCoder coder) { + return STORAGE.getOrCreate(coder); + } + + /** + * Caching thread local storage for reusable {@link KryoState}s. + */ + private static class Storage { + + private final ThreadLocal> kryoStateMap = + ThreadLocal.withInitial(ConcurrentHashMap::new); + + KryoState getOrCreate(KryoCoder coder) { + return kryoStateMap + .get() + .computeIfAbsent( + coder.getInstanceId(), + k -> { + final Kryo kryo = new Kryo(); + // fallback in case serialized class does not have default constructor + kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy())); + kryo.setReferences(coder.getOptions().getReferences()); + kryo.setRegistrationRequired(coder.getOptions().getRegistrationRequired()); + for (KryoRegistrar registrar : coder.getRegistrars()) { + registrar.registerClasses(kryo); + } + return new KryoState( + kryo, + new InputChunked(coder.getOptions().getBufferSize()), + new OutputChunked(coder.getOptions().getBufferSize())); + }); + } + } + + /** + * The kryo instance. + */ + private final Kryo kryo; + + /** + * A reusable input buffer. + */ + private final InputChunked inputChunked; + + /** + * A reusable output buffer. + */ + private final OutputChunked outputChunked; + + private KryoState(Kryo kryo, InputChunked inputChunked, OutputChunked outputChunked) { + this.kryo = kryo; + this.inputChunked = inputChunked; + this.outputChunked = outputChunked; + } + + /** + * {@link KryoState#kryo} + * + * @return kryo + */ + Kryo getKryo() { + return kryo; + } + + /** + * {@link KryoState#inputChunked} + * + * @return input buffer + */ + InputChunked getInputChunked() { + return inputChunked; + } + + /** + * {@link KryoState#outputChunked} + * + * @return output buffer + */ + OutputChunked getOutputChunked() { + return outputChunked; + } +} diff --git a/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/IdentifiedRegistrarTest.java b/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/IdentifiedRegistrarTest.java deleted file mode 100644 index 28791e60b798c..0000000000000 --- a/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/IdentifiedRegistrarTest.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.extensions.kryo; - -import static org.junit.Assert.assertEquals; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import org.junit.Test; - -/** Unit tests of {@link IdentifiedRegistrar}. */ -public class IdentifiedRegistrarTest { - - @Test - public void testSerializationRetainsId() throws IOException, ClassNotFoundException { - IdentifiedRegistrar registrarWithIdOriginal = IdentifiedRegistrar.of((k) -> {}); - - ByteArrayOutputStream outStr = new ByteArrayOutputStream(); - ObjectOutputStream oss = new ObjectOutputStream(outStr); - - oss.writeObject(registrarWithIdOriginal); - oss.flush(); - oss.close(); - - ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(outStr.toByteArray())); - IdentifiedRegistrar registrarDeserialized = (IdentifiedRegistrar) ois.readObject(); - - assertEquals(registrarWithIdOriginal.getId(), registrarDeserialized.getId()); - } -} diff --git a/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProviderTest.java b/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProviderTest.java index 46034dc35ac2e..a524263272784 100644 --- a/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProviderTest.java +++ b/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProviderTest.java @@ -17,12 +17,15 @@ */ package org.apache.beam.sdk.extensions.kryo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + import java.util.Collections; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.TypeDescriptor; -import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -33,26 +36,25 @@ public class KryoCoderProviderTest { @Test public void testBuilding() { - KryoCoderProvider provider = KryoCoderProvider.of((kryo) -> {}).build(); - Assert.assertNotNull(provider); + KryoCoderProvider provider = KryoCoderProvider.of(pipeline.getOptions()); + assertNotNull(provider); } @Test public void testBuildingAndRegister() { - KryoCoderProvider.of((kryo) -> {}).buildAndRegister(pipeline); + KryoCoderProvider.of(pipeline.getOptions()).registerTo(pipeline); } @Test public void testItProvidesCodersToRegisteredClasses() throws CannotProvideCoderException { - KryoCoderProvider provider = + final KryoCoderProvider provider = KryoCoderProvider.of( - (kryo) -> { - kryo.register(FirstTestClass.class); - kryo.register(SecondTestClass.class); - kryo.register(ThirdTestClass.class); - }) - .build(); - + pipeline.getOptions(), + kryo -> { + kryo.register(FirstTestClass.class); + kryo.register(SecondTestClass.class); + kryo.register(ThirdTestClass.class); + }); assertProviderReturnsKryoCoderForClass(provider, FirstTestClass.class); assertProviderReturnsKryoCoderForClass(provider, SecondTestClass.class); assertProviderReturnsKryoCoderForClass(provider, ThirdTestClass.class); @@ -60,48 +62,38 @@ public void testItProvidesCodersToRegisteredClasses() throws CannotProvideCoderE @Test(expected = CannotProvideCoderException.class) public void testDoNotProvideCOderForUnregisteredClasses() throws CannotProvideCoderException { - KryoCoderProvider provider = + final KryoCoderProvider provider = KryoCoderProvider.of( - (kryo) -> { - kryo.register(FirstTestClass.class); - kryo.register(SecondTestClass.class); - kryo.register(ThirdTestClass.class); - }) - .build(); - + pipeline.getOptions(), + kryo -> { + kryo.register(FirstTestClass.class); + kryo.register(SecondTestClass.class); + kryo.register(ThirdTestClass.class); + }); provider.coderFor(TypeDescriptor.of(NeverRegisteredClass.class), Collections.emptyList()); } @Test public void testProviderRegisteredToPipeline() throws CannotProvideCoderException { - KryoCoderProvider.of( - (kryo) -> { - kryo.register(FirstTestClass.class); - }) - .buildAndRegister(pipeline); - - Coder coderToAssert = + KryoCoderProvider.of(pipeline.getOptions(), kryo -> kryo.register(FirstTestClass.class)) + .registerTo(pipeline); + final Coder coderToAssert = pipeline.getCoderRegistry().getCoder(FirstTestClass.class); - - Assert.assertNotNull(coderToAssert); - Assert.assertTrue(coderToAssert instanceof KryoCoder); - KryoCoder casted = (KryoCoder) coderToAssert; - IdentifiedRegistrar coderRegistrar = casted.getRegistrar(); - Assert.assertNotNull(coderRegistrar); + assertNotNull(coderToAssert); + assertTrue(coderToAssert instanceof KryoCoder); + final KryoCoder casted = (KryoCoder) coderToAssert; + assertEquals(1, casted.getRegistrars().size()); } private void assertProviderReturnsKryoCoderForClass(KryoCoderProvider provider, Class type) throws CannotProvideCoderException { - IdentifiedRegistrar providerRegistrar = provider.getKryoRegistrar(); - Assert.assertNotNull(providerRegistrar); - Coder coderToAssert = provider.coderFor(TypeDescriptor.of(type), Collections.emptyList()); - - Assert.assertNotNull(coderToAssert); - Assert.assertTrue(coderToAssert instanceof KryoCoder); - KryoCoder casted = (KryoCoder) coderToAssert; - IdentifiedRegistrar coderRegistrar = casted.getRegistrar(); - Assert.assertNotNull(coderRegistrar); - Assert.assertEquals(providerRegistrar.getId(), coderRegistrar.getId()); + assertTrue(provider.getCoder().getRegistrars().size() > 0); + final Coder coderToAssert = + provider.coderFor(TypeDescriptor.of(type), Collections.emptyList()); + assertNotNull(coderToAssert); + assertTrue(coderToAssert instanceof KryoCoder); + final KryoCoder casted = (KryoCoder) coderToAssert; + assertEquals(provider.getCoder().getInstanceId(), casted.getInstanceId()); } private static class FirstTestClass {} diff --git a/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoCoderTest.java b/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoCoderTest.java index 0f46306035147..61df25197816e 100644 --- a/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoCoderTest.java +++ b/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoCoderTest.java @@ -17,6 +17,9 @@ */ package org.apache.beam.sdk.extensions.kryo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -30,92 +33,80 @@ import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.values.KV; -import org.junit.Assert; import org.junit.Test; /** Test targeted at {@link KryoCoder}. */ public class KryoCoderTest { + private static final PipelineOptions OPTIONS = PipelineOptionsFactory.create(); + @Test public void testBasicCoding() throws IOException { - - KryoRegistrar registrar = (k) -> k.register(ClassToBeEncoded.class); - - KryoCoder coder = KryoCoder.of(IdentifiedRegistrar.of(registrar)); + final KryoCoder coder = + KryoCoder.of(OPTIONS, k -> k.register(ClassToBeEncoded.class)); assertEncoding(coder); } @Test(expected = CoderException.class) public void testWrongRegistrarCoding() throws IOException { - - KryoRegistrar registrar = - (k) -> { - // No-op - }; - - KryoCoder coder = KryoCoder.of(IdentifiedRegistrar.of(registrar)); + final KryoCoder coder = KryoCoder.of(OPTIONS); assertEncoding(coder); } @Test(expected = CoderException.class) public void testWrongRegistrarDecoding() throws IOException { - - KryoRegistrar registrarCoding = (k) -> k.register(ClassToBeEncoded.class); - KryoRegistrar registrarDecoding = - (k) -> { + final KryoRegistrar registrarCoding = k -> k.register(ClassToBeEncoded.class); + final KryoRegistrar registrarDecoding = + k -> { // No-op }; - - KryoCoder coderToEncode = - KryoCoder.of(IdentifiedRegistrar.of(registrarCoding)); - KryoCoder coderToDecode = - KryoCoder.of(IdentifiedRegistrar.of(registrarDecoding)); - + final KryoCoder coderToEncode = KryoCoder.of(OPTIONS, registrarCoding); + final KryoCoder coderToDecode = KryoCoder.of(OPTIONS, registrarDecoding); assertEncoding(coderToEncode, coderToDecode); } @Test public void testCodingOfTwoClassesInSerial() throws IOException { - KryoRegistrar registrar = - (k) -> { + final KryoRegistrar registrar = + k -> { k.register(ClassToBeEncoded.class); k.register(TestClass.class); }; + final KryoCoder coder = KryoCoder.of(OPTIONS, registrar); + final KryoCoder secondCoder = KryoCoder.of(OPTIONS, registrar); - IdentifiedRegistrar registrarWithId = IdentifiedRegistrar.of(registrar); - KryoCoder coder = KryoCoder.of(registrarWithId); - KryoCoder secondCoder = KryoCoder.of(registrarWithId); - - ClassToBeEncoded originalValue = new ClassToBeEncoded("XyZ", 42, Double.NaN); - TestClass secondOriginalValue = new TestClass("just a parameter"); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + final ClassToBeEncoded originalValue = new ClassToBeEncoded("XyZ", 42, Double.NaN); + final TestClass secondOriginalValue = new TestClass("just a parameter"); + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); coder.encode(originalValue, outputStream); secondCoder.encode(secondOriginalValue, outputStream); - byte[] buf = outputStream.toByteArray(); - ByteArrayInputStream inputStream = new ByteArrayInputStream(buf); + final byte[] buf = outputStream.toByteArray(); + final ByteArrayInputStream inputStream = new ByteArrayInputStream(buf); - ClassToBeEncoded decodedValue = coder.decode(inputStream); - TestClass secondDecodedValue = secondCoder.decode(inputStream); + final ClassToBeEncoded decodedValue = coder.decode(inputStream); + final TestClass secondDecodedValue = secondCoder.decode(inputStream); - Assert.assertNotNull(decodedValue); - Assert.assertEquals(originalValue, decodedValue); + assertNotNull(decodedValue); + assertEquals(originalValue, decodedValue); - Assert.assertNotNull(secondDecodedValue); - Assert.assertNotNull(secondDecodedValue.param); - Assert.assertEquals("just a parameter", secondDecodedValue.param); + assertNotNull(secondDecodedValue); + assertNotNull(secondDecodedValue.param); + assertEquals("just a parameter", secondDecodedValue.param); } /** Test whenever the {@link KryoCoder} is serializable. */ @Test public void testCoderSerialization() throws IOException, ClassNotFoundException { - KryoRegistrar registrar = (k) -> k.register(ClassToBeEncoded.class); + final KryoRegistrar registrar = k -> k.register(ClassToBeEncoded.class); - KryoCoder coder = KryoCoder.of(IdentifiedRegistrar.of(registrar)); - ByteArrayOutputStream outStr = new ByteArrayOutputStream(); - ObjectOutputStream oss = new ObjectOutputStream(outStr); + final KryoCoder coder = KryoCoder.of(OPTIONS, registrar); + final ByteArrayOutputStream outStr = new ByteArrayOutputStream(); + final ObjectOutputStream oss = new ObjectOutputStream(outStr); oss.writeObject(coder); oss.flush(); @@ -130,85 +121,85 @@ public void testCoderSerialization() throws IOException, ClassNotFoundException @Test public void testCodingWithKvCoderKeyIsKryoCoder() throws IOException { - KryoRegistrar registrar = (k) -> k.register(TestClass.class); + final KryoRegistrar registrar = k -> k.register(TestClass.class); final ListCoder listCoder = ListCoder.of(VoidCoder.of()); final KvCoder> kvCoder = - KvCoder.of(KryoCoder.of(IdentifiedRegistrar.of(registrar)), listCoder); + KvCoder.of(KryoCoder.of(OPTIONS, registrar), listCoder); - List inputValue = new ArrayList<>(); + final List inputValue = new ArrayList<>(); inputValue.add(null); inputValue.add(null); final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - TestClass inputKey = new TestClass("something"); + final TestClass inputKey = new TestClass("something"); kvCoder.encode(KV.of(inputKey, inputValue), byteArrayOutputStream); final KV> decoded = kvCoder.decode(new ByteArrayInputStream(byteArrayOutputStream.toByteArray())); - Assert.assertNotNull(decoded); - Assert.assertNotNull(decoded.getKey()); - Assert.assertEquals(inputKey, decoded.getKey()); + assertNotNull(decoded); + assertNotNull(decoded.getKey()); + assertEquals(inputKey, decoded.getKey()); - Assert.assertNotNull(decoded.getValue()); - Assert.assertEquals(inputValue, decoded.getValue()); + assertNotNull(decoded.getValue()); + assertEquals(inputValue, decoded.getValue()); } @Test public void testCodingWithKvCoderValueIsKryoCoder() throws IOException { - KryoRegistrar registrar = (k) -> k.register(TestClass.class); + final KryoRegistrar registrar = k -> k.register(TestClass.class); final KvCoder kvCoder = - KvCoder.of(StringUtf8Coder.of(), KryoCoder.of(IdentifiedRegistrar.of(registrar))); + KvCoder.of(StringUtf8Coder.of(), KryoCoder.of(OPTIONS, registrar)); final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - String inputKey = "key"; - TestClass inputValue = new TestClass("something"); + final String inputKey = "key"; + final TestClass inputValue = new TestClass("something"); kvCoder.encode(KV.of(inputKey, inputValue), byteArrayOutputStream); final KV decoded = kvCoder.decode(new ByteArrayInputStream(byteArrayOutputStream.toByteArray())); - Assert.assertNotNull(decoded); - Assert.assertNotNull(decoded.getKey()); - Assert.assertEquals(inputKey, decoded.getKey()); + assertNotNull(decoded); + assertNotNull(decoded.getKey()); + assertEquals(inputKey, decoded.getKey()); - Assert.assertNotNull(decoded.getValue()); - Assert.assertEquals(inputValue, decoded.getValue()); + assertNotNull(decoded.getValue()); + assertEquals(inputValue, decoded.getValue()); } @Test public void testCodingWithKvCoderClassToBeEncoded() throws IOException { - KryoRegistrar registrar = - (k) -> { + final KryoRegistrar registrar = + k -> { k.register(TestClass.class); k.register(ClassToBeEncoded.class); }; final ListCoder listCoder = ListCoder.of(VoidCoder.of()); final KvCoder> kvCoder = - KvCoder.of(KryoCoder.of(IdentifiedRegistrar.of(registrar)), listCoder); - List inputValue = new ArrayList<>(); + KvCoder.of(KryoCoder.of(OPTIONS, registrar), listCoder); + final List inputValue = new ArrayList<>(); inputValue.add(null); inputValue.add(null); final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - ClassToBeEncoded inputKey = new ClassToBeEncoded("something", 1, 0.2); + final ClassToBeEncoded inputKey = new ClassToBeEncoded("something", 1, 0.2); kvCoder.encode(KV.of(inputKey, inputValue), byteArrayOutputStream); final KV> decoded = kvCoder.decode(new ByteArrayInputStream(byteArrayOutputStream.toByteArray())); - Assert.assertNotNull(decoded); - Assert.assertNotNull(decoded.getKey()); - Assert.assertEquals(inputKey, decoded.getKey()); + assertNotNull(decoded); + assertNotNull(decoded.getKey()); + assertEquals(inputKey, decoded.getKey()); - Assert.assertNotNull(decoded.getValue()); - Assert.assertEquals(inputValue, decoded.getValue()); + assertNotNull(decoded.getValue()); + assertEquals(inputValue, decoded.getValue()); } private void assertEncoding(KryoCoder coder) throws IOException { @@ -218,19 +209,14 @@ private void assertEncoding(KryoCoder coder) throws IOExceptio private void assertEncoding( KryoCoder coderToEncode, KryoCoder coderToDecode) throws IOException { - - ClassToBeEncoded originalValue = new ClassToBeEncoded("XyZ", 42, Double.NaN); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - + final ClassToBeEncoded originalValue = new ClassToBeEncoded("XyZ", 42, Double.NaN); + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); coderToEncode.encode(originalValue, outputStream); - - byte[] buf = outputStream.toByteArray(); - ByteArrayInputStream inputStream = new ByteArrayInputStream(buf); - - ClassToBeEncoded decodedValue = coderToDecode.decode(inputStream); - - Assert.assertNotNull(decodedValue); - Assert.assertEquals(originalValue, decodedValue); + final byte[] buf = outputStream.toByteArray(); + final ByteArrayInputStream inputStream = new ByteArrayInputStream(buf); + final ClassToBeEncoded decodedValue = coderToDecode.decode(inputStream); + assertNotNull(decodedValue); + assertEquals(originalValue, decodedValue); } private static class ClassToBeEncoded { diff --git a/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoFactoryTest.java b/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoStateTest.java similarity index 52% rename from sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoFactoryTest.java rename to sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoStateTest.java index 1d7559936968a..46488cc0fd2ee 100644 --- a/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoFactoryTest.java +++ b/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoStateTest.java @@ -17,41 +17,40 @@ */ package org.apache.beam.sdk.extensions.kryo; -import com.esotericsoftware.kryo.Kryo; +import static org.junit.Assert.assertSame; + import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.junit.Assert; import org.junit.Test; -/** A set of unit {@link KryoFactory} tests. */ -public class KryoFactoryTest { - - @Test - public void testGiveTheSameKrioAfterKryoRegistrarDeserialized() - throws IOException, ClassNotFoundException { - - IdentifiedRegistrar registrar = IdentifiedRegistrar.of((k) -> k.register(TestClass.class)); +/** A set of unit {@link KryoState} tests. */ +public class KryoStateTest { - Kryo firstKryo = KryoFactory.getOrCreateKryo(registrar); + private static final PipelineOptions OPTIONS = PipelineOptionsFactory.create(); - ByteArrayOutputStream outStr = new ByteArrayOutputStream(); - ObjectOutputStream oss = new ObjectOutputStream(outStr); + @Test + public void testSameKryoAfterDeserialization() throws IOException, ClassNotFoundException { + final KryoCoder coder = KryoCoder.of(OPTIONS, k -> k.register(TestClass.class)); + final KryoState firstKryo = KryoState.get(coder); - oss.writeObject(registrar); + final ByteArrayOutputStream outStr = new ByteArrayOutputStream(); + final ObjectOutputStream oss = new ObjectOutputStream(outStr); + oss.writeObject(coder); oss.flush(); oss.close(); - ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(outStr.toByteArray())); - - @SuppressWarnings("unchecked") - IdentifiedRegistrar deserializedRegistrar = (IdentifiedRegistrar) ois.readObject(); - - Kryo secondKryo = KryoFactory.getOrCreateKryo(deserializedRegistrar); - - Assert.assertSame(firstKryo, secondKryo); + final ObjectInputStream ois = + new ObjectInputStream(new ByteArrayInputStream(outStr.toByteArray())); + @SuppressWarnings("unchecked") final KryoCoder deserializedCoder = + (KryoCoder) ois.readObject(); + final KryoState secondKryo = KryoState.get(deserializedCoder); + assertSame(firstKryo, secondKryo); } private static class TestClass {} From a45c0acda52e5c2898a1242189e7f9612751cbbb Mon Sep 17 00:00:00 2001 From: David Moravek Date: Sat, 22 Sep 2018 14:19:39 +0200 Subject: [PATCH 2/5] [BEAM-5437] Fix tests. --- .../core/client/operator/OperatorTests.java | 2 +- .../core/testkit/AbstractOperatorTest.java | 4 +- .../euphoria/core/testkit/JoinTest.java | 3 +- .../core/testkit/ReduceByKeyTest.java | 5 ++- .../translate/BeamMetricsTranslationTest.java | 2 +- .../euphoria/core/translate/EuphoriaTest.java | 2 +- .../beam/sdk/extensions/kryo/KryoCoder.java | 41 ++++++++----------- .../extensions/kryo/KryoCoderProvider.java | 6 ++- .../beam/sdk/extensions/kryo/KryoOptions.java | 4 +- .../beam/sdk/extensions/kryo/KryoState.java | 29 +++++-------- .../sdk/extensions/kryo/KryoStateTest.java | 5 +-- 11 files changed, 45 insertions(+), 58 deletions(-) diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/OperatorTests.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/OperatorTests.java index 164a093002c9c..89b82384e8ac5 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/OperatorTests.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/OperatorTests.java @@ -61,7 +61,7 @@ public static TestPipeline createTestPipeline() { final TestPipeline testPipeline = TestPipeline.fromOptions(pipelineOptions); testPipeline .getCoderRegistry() - .registerCoderForClass(Object.class, KryoCoder.withoutClassRegistration()); + .registerCoderForClass(Object.class, KryoCoder.of(pipelineOptions)); return testPipeline; } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/AbstractOperatorTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/AbstractOperatorTest.java index 0d2c86a341ec5..0cb747faa056e 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/AbstractOperatorTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/AbstractOperatorTest.java @@ -49,9 +49,7 @@ public void execute(TestCase tc) { final EuphoriaOptions euphoriaOptions = pipelineOptions.as(EuphoriaOptions.class); euphoriaOptions.setAccumulatorProviderFactory(accumulatorProvider); final Pipeline pipeline = TestPipeline.create(pipelineOptions); - pipeline - .getCoderRegistry() - .registerCoderForClass(Object.class, KryoCoder.withoutClassRegistration()); + pipeline.getCoderRegistry().registerCoderForClass(Object.class, KryoCoder.of(pipelineOptions)); final Dataset output = tc.getOutput(pipeline); tc.validate(output); pipeline.run().waitUntilFinish(); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/JoinTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/JoinTest.java index 1865056c39132..590c7e4b91c34 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/JoinTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/JoinTest.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.extensions.euphoria.core.client.operator.MapElements; import org.apache.beam.sdk.extensions.euphoria.core.client.operator.RightJoin; import org.apache.beam.sdk.extensions.kryo.KryoCoder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -799,7 +800,7 @@ public boolean isCompatible(WindowFn other) { @Override public Coder windowCoder() { - return KryoCoder.withoutClassRegistration(); + return KryoCoder.of(PipelineOptionsFactory.create()); } @Override diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/ReduceByKeyTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/ReduceByKeyTest.java index 4754592a463a9..a7a1607f2c197 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/ReduceByKeyTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/ReduceByKeyTest.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.extensions.euphoria.core.client.util.Sums; import org.apache.beam.sdk.extensions.euphoria.core.testkit.accumulators.SnapshotProvider; import org.apache.beam.sdk.extensions.kryo.KryoCoder; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; @@ -652,7 +653,7 @@ public boolean isCompatible(WindowFn other) { @Override public Coder windowCoder() { - return KryoCoder.withoutClassRegistration(); + return KryoCoder.of(PipelineOptionsFactory.create()); } @Override @@ -764,7 +765,7 @@ public boolean isCompatible(WindowFn other) { @Override public Coder windowCoder() { - return KryoCoder.withoutClassRegistration(); + return KryoCoder.of(PipelineOptionsFactory.create()); } @Override diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamMetricsTranslationTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamMetricsTranslationTest.java index 4953e0a6a3ce9..1f324ca3effc6 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamMetricsTranslationTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/BeamMetricsTranslationTest.java @@ -52,7 +52,7 @@ public class BeamMetricsTranslationTest { public void setup() { testPipeline .getCoderRegistry() - .registerCoderForClass(Object.class, KryoCoder.withoutClassRegistration()); + .registerCoderForClass(Object.class, KryoCoder.of(testPipeline.getOptions())); } /** diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/EuphoriaTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/EuphoriaTest.java index 0b8a0b9187d7d..e3e6ddf11a8ad 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/EuphoriaTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/translate/EuphoriaTest.java @@ -48,7 +48,7 @@ public class EuphoriaTest implements Serializable { public void setup() { pipeline .getCoderRegistry() - .registerCoderForClass(Object.class, KryoCoder.withoutClassRegistration()); + .registerCoderForClass(Object.class, KryoCoder.of(pipeline.getOptions())); } @Test diff --git a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoder.java b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoder.java index 1a7540927f041..967bb59c23615 100644 --- a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoder.java +++ b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoder.java @@ -47,8 +47,8 @@ public class KryoCoder extends AtomicCoder { * Create a new {@link KryoCoder}. * * @param pipelineOptions Options used for coder setup. See {@link KryoOptions} for more details. - * @param type of element this class should decode/encode - * {@link Kryo} instance used by returned {@link KryoCoder} + * @param type of element this class should decode/encode {@link Kryo} instance used by + * returned {@link KryoCoder} * @return Newly created a {@link KryoCoder} */ public static KryoCoder of(PipelineOptions pipelineOptions) { @@ -59,9 +59,10 @@ public static KryoCoder of(PipelineOptions pipelineOptions) { * Create a new {@link KryoCoder}. * * @param pipelineOptions Options used for coder setup. See {@link KryoOptions} for more details. - * @param registrars {@link KryoRegistrar}s which are used to register classes with underlying kryo instance - * @param type of element this class should decode/encode - * {@link Kryo} instance used by returned {@link KryoCoder} + * @param registrars {@link KryoRegistrar}s which are used to register classes with underlying + * kryo instance + * @param type of element this class should decode/encode {@link Kryo} instance used by + * returned {@link KryoCoder} * @return Newly created a {@link KryoCoder} */ public static KryoCoder of(PipelineOptions pipelineOptions, KryoRegistrar... registrars) { @@ -72,9 +73,10 @@ public static KryoCoder of(PipelineOptions pipelineOptions, KryoRegistrar * Create a new {@link KryoCoder}. * * @param pipelineOptions Options used for coder setup. See {@link KryoOptions} for more details. - * @param registrars {@link KryoRegistrar}s which are used to register classes with underlying kryo instance - * @param type of element this class should decode/encode - * {@link Kryo} instance used by returned {@link KryoCoder} + * @param registrars {@link KryoRegistrar}s which are used to register classes with underlying + * kryo instance + * @param type of element this class should decode/encode {@link Kryo} instance used by + * returned {@link KryoCoder} * @return Newly created a {@link KryoCoder} */ public static KryoCoder of( @@ -91,19 +93,13 @@ public static KryoCoder of( /** Serializable wrapper for {@link KryoOptions}. */ static class SerializableOptions implements Serializable { - /** - * Size of input and output buffer. - */ + /** Size of input and output buffer. */ private final int bufferSize; - /** - * Enables kryo reference tracking. - */ + /** Enables kryo reference tracking. */ private final boolean references; - /** - * Enables kryo required registration. - */ + /** Enables kryo required registration. */ private final boolean registrationRequired; private SerializableOptions(int bufferSize, boolean references, boolean registrationRequired) { @@ -140,14 +136,10 @@ boolean getRegistrationRequired() { } } - /** - * Unique id of the {@link KryoCoder} instance. - */ + /** Unique id of the {@link KryoCoder} instance. */ private final String instanceId = UUID.randomUUID().toString(); - /** - * Options for underlying kryo instance. - */ + /** Options for underlying kryo instance. */ private final SerializableOptions options; /** Client-defined class registrations to {@link Kryo}. */ @@ -190,7 +182,8 @@ public T decode(InputStream inStream) throws IOException { final InputChunked inputChunked = kryoState.getInputChunked(); inputChunked.setInputStream(inStream); try { - @SuppressWarnings("unchecked") final T instance = (T) kryoState.getKryo().readClassAndObject(inputChunked); + @SuppressWarnings("unchecked") + final T instance = (T) kryoState.getKryo().readClassAndObject(inputChunked); return instance; } catch (KryoException e) { throw new CoderException("Cannot decode object from input stream.", e); diff --git a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProvider.java b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProvider.java index 906f74e28dda2..c47311c41fb9d 100644 --- a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProvider.java +++ b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProvider.java @@ -50,7 +50,8 @@ public static KryoCoderProvider of(PipelineOptions pipelineOptions) { * Create a new {@link KryoCoderProvider}. * * @param pipelineOptions Options used for coder setup. See {@link KryoOptions} for more details. - * @param registrars {@link KryoRegistrar}s which are used to register classes with underlying kryo instance + * @param registrars {@link KryoRegistrar}s which are used to register classes with underlying + * kryo instance * @return A newly created {@link KryoCoderProvider} */ public static KryoCoderProvider of(PipelineOptions pipelineOptions, KryoRegistrar... registrars) { @@ -61,7 +62,8 @@ public static KryoCoderProvider of(PipelineOptions pipelineOptions, KryoRegistra * Create a new {@link KryoCoderProvider}. * * @param pipelineOptions Options used for coder setup. See {@link KryoOptions} for more details. - * @param registrars {@link KryoRegistrar}s which are used to register classes with underlying kryo instance + * @param registrars {@link KryoRegistrar}s which are used to register classes with underlying + * kryo instance * @return A newly created {@link KryoCoderProvider} */ public static KryoCoderProvider of( diff --git a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoOptions.java b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoOptions.java index 69ddffab7130f..fa217837aae4e 100644 --- a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoOptions.java +++ b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoOptions.java @@ -40,8 +40,8 @@ public interface KryoOptions extends PipelineOptions { void setKryoReferences(boolean references); @JsonIgnore - @Description("Set to false to disable required registration") - @Default.Boolean(true) + @Description("Set to true to enable required registration") + @Default.Boolean(false) boolean getKryoRegistrationRequired(); void setKryoRegistrationRequired(boolean registrationRequired); diff --git a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoState.java b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoState.java index ed36f1ec52828..125304226be77 100644 --- a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoState.java +++ b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoState.java @@ -20,13 +20,11 @@ import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.InputChunked; import com.esotericsoftware.kryo.io.OutputChunked; +import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import org.objenesis.strategy.StdInstantiatorStrategy; -/** - * Reusable kryo instance. - */ +/** Reusable kryo instance. */ class KryoState { private static final Storage STORAGE = new Storage(); @@ -35,13 +33,11 @@ static KryoState get(KryoCoder coder) { return STORAGE.getOrCreate(coder); } - /** - * Caching thread local storage for reusable {@link KryoState}s. - */ + /** Caching thread local storage for reusable {@link KryoState}s. */ private static class Storage { private final ThreadLocal> kryoStateMap = - ThreadLocal.withInitial(ConcurrentHashMap::new); + ThreadLocal.withInitial(HashMap::new); KryoState getOrCreate(KryoCoder coder) { return kryoStateMap @@ -51,9 +47,12 @@ KryoState getOrCreate(KryoCoder coder) { k -> { final Kryo kryo = new Kryo(); // fallback in case serialized class does not have default constructor - kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy())); + kryo.setInstantiatorStrategy( + new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy())); kryo.setReferences(coder.getOptions().getReferences()); kryo.setRegistrationRequired(coder.getOptions().getRegistrationRequired()); + kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); + // register user provided classes for (KryoRegistrar registrar : coder.getRegistrars()) { registrar.registerClasses(kryo); } @@ -65,19 +64,13 @@ KryoState getOrCreate(KryoCoder coder) { } } - /** - * The kryo instance. - */ + /** The kryo instance. */ private final Kryo kryo; - /** - * A reusable input buffer. - */ + /** A reusable input buffer. */ private final InputChunked inputChunked; - /** - * A reusable output buffer. - */ + /** A reusable output buffer. */ private final OutputChunked outputChunked; private KryoState(Kryo kryo, InputChunked inputChunked, OutputChunked outputChunked) { diff --git a/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoStateTest.java b/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoStateTest.java index 46488cc0fd2ee..2b5f2f49d1823 100644 --- a/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoStateTest.java +++ b/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoStateTest.java @@ -26,7 +26,6 @@ import java.io.ObjectOutputStream; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.junit.Assert; import org.junit.Test; /** A set of unit {@link KryoState} tests. */ @@ -47,8 +46,8 @@ public void testSameKryoAfterDeserialization() throws IOException, ClassNotFound final ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(outStr.toByteArray())); - @SuppressWarnings("unchecked") final KryoCoder deserializedCoder = - (KryoCoder) ois.readObject(); + @SuppressWarnings("unchecked") + final KryoCoder deserializedCoder = (KryoCoder) ois.readObject(); final KryoState secondKryo = KryoState.get(deserializedCoder); assertSame(firstKryo, secondKryo); } From 4f04ed3519d78c7ecc0e5b40e3b438888f6e3d00 Mon Sep 17 00:00:00 2001 From: David Moravek Date: Sun, 23 Sep 2018 00:11:38 +0200 Subject: [PATCH 3/5] [BEAM-5437] KryoCoderProvider shoult not provide coder for pritimitive classes. --- .../extensions/kryo/KryoCoderProvider.java | 8 +++++--- .../beam/sdk/extensions/kryo/KryoState.java | 19 ++++++++++++++++++- .../kryo/KryoCoderProviderTest.java | 9 ++++++++- 3 files changed, 31 insertions(+), 5 deletions(-) diff --git a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProvider.java b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProvider.java index c47311c41fb9d..f109860ba196c 100644 --- a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProvider.java +++ b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProvider.java @@ -19,6 +19,7 @@ import com.esotericsoftware.kryo.ClassResolver; import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Registration; import com.google.common.annotations.VisibleForTesting; import java.util.Arrays; import java.util.Collections; @@ -84,7 +85,7 @@ private KryoCoderProvider(KryoCoder coder) { public Coder coderFor( TypeDescriptor typeDescriptor, List> componentCoders) throws CannotProvideCoderException { - if (hasRegistration(typeDescriptor)) { + if (hasUserProvidedRegistration(typeDescriptor)) { return (Coder) coder; } throw new CannotProvideCoderException( @@ -93,12 +94,13 @@ public Coder coderFor( KryoCoder.class.getSimpleName(), typeDescriptor)); } - private boolean hasRegistration(TypeDescriptor typeDescriptor) { + private boolean hasUserProvidedRegistration(TypeDescriptor typeDescriptor) { final KryoState kryoState = KryoState.get(coder); final Class rawType = typeDescriptor.getRawType(); final Kryo kryo = kryoState.getKryo(); final ClassResolver classResolver = kryo.getClassResolver(); - return classResolver.getRegistration(rawType) != null; + final Registration registration = classResolver.getRegistration(rawType); + return registration != null && registration.getId() >= kryoState.getFirstRegistrationId(); } /** diff --git a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoState.java b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoState.java index 125304226be77..e1503e1e22205 100644 --- a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoState.java +++ b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoState.java @@ -52,12 +52,15 @@ KryoState getOrCreate(KryoCoder coder) { kryo.setReferences(coder.getOptions().getReferences()); kryo.setRegistrationRequired(coder.getOptions().getRegistrationRequired()); kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); + // first id of user provided class registration + final int firstRegistrationId = kryo.getNextRegistrationId(); // register user provided classes for (KryoRegistrar registrar : coder.getRegistrars()) { registrar.registerClasses(kryo); } return new KryoState( kryo, + firstRegistrationId, new InputChunked(coder.getOptions().getBufferSize()), new OutputChunked(coder.getOptions().getBufferSize())); }); @@ -67,14 +70,19 @@ KryoState getOrCreate(KryoCoder coder) { /** The kryo instance. */ private final Kryo kryo; + /** first id of user provided class registration */ + private final int firstRegistrationId; + /** A reusable input buffer. */ private final InputChunked inputChunked; /** A reusable output buffer. */ private final OutputChunked outputChunked; - private KryoState(Kryo kryo, InputChunked inputChunked, OutputChunked outputChunked) { + private KryoState( + Kryo kryo, int firstRegistrationId, InputChunked inputChunked, OutputChunked outputChunked) { this.kryo = kryo; + this.firstRegistrationId = firstRegistrationId; this.inputChunked = inputChunked; this.outputChunked = outputChunked; } @@ -88,6 +96,15 @@ Kryo getKryo() { return kryo; } + /** + * {@link KryoState#firstRegistrationId} + * + * @return registration id + */ + public int getFirstRegistrationId() { + return firstRegistrationId; + } + /** * {@link KryoState#inputChunked} * diff --git a/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProviderTest.java b/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProviderTest.java index a524263272784..2c467a6b6c9be 100644 --- a/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProviderTest.java +++ b/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProviderTest.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; import org.junit.Rule; import org.junit.Test; @@ -61,7 +62,7 @@ public void testItProvidesCodersToRegisteredClasses() throws CannotProvideCoderE } @Test(expected = CannotProvideCoderException.class) - public void testDoNotProvideCOderForUnregisteredClasses() throws CannotProvideCoderException { + public void testDoNotProvideCoderForUnregisteredClasses() throws CannotProvideCoderException { final KryoCoderProvider provider = KryoCoderProvider.of( pipeline.getOptions(), @@ -85,6 +86,12 @@ public void testProviderRegisteredToPipeline() throws CannotProvideCoderExceptio assertEquals(1, casted.getRegistrars().size()); } + @Test(expected = CannotProvideCoderException.class) + public void testPrimitiveClass() throws CannotProvideCoderException { + final KryoCoderProvider provider = KryoCoderProvider.of(pipeline.getOptions()); + provider.coderFor(TypeDescriptors.strings(), Collections.emptyList()); + } + private void assertProviderReturnsKryoCoderForClass(KryoCoderProvider provider, Class type) throws CannotProvideCoderException { assertTrue(provider.getCoder().getRegistrars().size() > 0); From 467f02a071da593b08c4a076b3b5527821ddfe1b Mon Sep 17 00:00:00 2001 From: David Moravek Date: Mon, 24 Sep 2018 15:31:59 +0200 Subject: [PATCH 4/5] [BEAM-5437] Shadow kryo dependency. --- sdks/java/extensions/kryo/build.gradle | 22 ++++++++++++------- .../extensions/kryo/KryoCoderProvider.java | 5 +++++ 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/sdks/java/extensions/kryo/build.gradle b/sdks/java/extensions/kryo/build.gradle index 4b2b51b5da4e6..6995551b37c5e 100644 --- a/sdks/java/extensions/kryo/build.gradle +++ b/sdks/java/extensions/kryo/build.gradle @@ -17,23 +17,29 @@ */ apply plugin: org.apache.beam.gradle.BeamModulePlugin -applyJavaNature() - -description = "Apache Beam :: SDKs :: Java :: Extensions :: Euphoria Java 8 DSL :: Kryo" ext { kryoVersion = '4.0.2' } +applyJavaNature(shadowClosure: DEFAULT_SHADOW_CLOSURE << { + dependencies { + include(dependency('com.esotericsoftware:.*')) + include(dependency('org.ow2.asm:asm')) + } + relocate 'com.esotericsoftware', getJavaRelocatedPath('com.esotericsoftware') + relocate 'org.objectweb', getJavaRelocatedPath('org.objectweb') +}) + +description = 'Apache Beam :: SDKs :: Java :: Extensions :: Kryo' + dependencies { - shadow project(path: ":beam-sdks-java-core", configuration: "shadow") - shadow "com.esotericsoftware:kryo:${kryoVersion}" - testCompile project(path: ":beam-sdks-java-core", configuration: "shadowTest") + compile "com.esotericsoftware:kryo:${kryoVersion}" + shadow project(path: ':beam-sdks-java-core', configuration: 'shadow') + testCompile project(path: ':beam-sdks-java-core', configuration: 'shadowTest') testCompile project(':beam-runners-direct-java') } test { jvmArgs '-Dsun.io.serialization.extendedDebugInfo=true' } - -test.testLogging.showStandardStreams = false diff --git a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProvider.java b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProvider.java index f109860ba196c..1817438eacca7 100644 --- a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProvider.java +++ b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProvider.java @@ -37,6 +37,8 @@ */ public class KryoCoderProvider extends CoderProvider { + private static final TypeDescriptor OBJECT_TYPE = new TypeDescriptor() {}; + /** * Create a new {@link KryoCoderProvider}. * @@ -88,6 +90,9 @@ public Coder coderFor( if (hasUserProvidedRegistration(typeDescriptor)) { return (Coder) coder; } + if (OBJECT_TYPE.equals(typeDescriptor)) { + return (Coder) coder; + } throw new CannotProvideCoderException( String.format( "Cannot provide [%s], given type descriptor's [%s] raw type is not registered in Kryo.", From 3906b78c291b5718820f180f630563691910f2bb Mon Sep 17 00:00:00 2001 From: David Moravek Date: Thu, 4 Oct 2018 12:41:27 +0200 Subject: [PATCH 5/5] [BEAM-5437] Code review. --- .../beam/sdk/extensions/kryo/KryoCoder.java | 60 ++++++++++++++++++- .../extensions/kryo/KryoCoderProvider.java | 32 ++++++++++ .../kryo/KryoCoderProviderTest.java | 4 +- .../sdk/extensions/kryo/KryoStateTest.java | 6 +- 4 files changed, 93 insertions(+), 9 deletions(-) diff --git a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoder.java b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoder.java index 967bb59c23615..f73b63cdcbc27 100644 --- a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoder.java +++ b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoder.java @@ -31,9 +31,10 @@ import java.util.Collections; import java.util.List; import java.util.UUID; -import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; /** * Coder using Kryo as (de)serialization mechanism. See {@link KryoCoderProvider} to get more @@ -41,7 +42,44 @@ * * @param type of element coder can handle */ -public class KryoCoder extends AtomicCoder { +public class KryoCoder extends CustomCoder { + + /** + * Create a new {@link KryoCoder} with default {@link KryoOptions}. + * + * @param type of element this class should decode/encode {@link Kryo} instance used by + * returned {@link KryoCoder} + * @return Newly created a {@link KryoCoder} + */ + public static KryoCoder of() { + return of(PipelineOptionsFactory.create(), Collections.emptyList()); + } + + /** + * Create a new {@link KryoCoder} with default {@link KryoOptions}. + * + * @param registrars {@link KryoRegistrar}s which are used to register classes with underlying + * kryo instance + * @param type of element this class should decode/encode {@link Kryo} instance used by + * returned {@link KryoCoder} + * @return Newly created a {@link KryoCoder} + */ + public static KryoCoder of(KryoRegistrar... registrars) { + return of(PipelineOptionsFactory.create(), registrars); + } + + /** + * Create a new {@link KryoCoder} with default {@link KryoOptions}. + * + * @param registrars {@link KryoRegistrar}s which are used to register classes with underlying + * kryo instance + * @param type of element this class should decode/encode {@link Kryo} instance used by + * returned {@link KryoCoder} + * @return Newly created a {@link KryoCoder} + */ + public static KryoCoder of(List registrars) { + return of(PipelineOptionsFactory.create(), registrars); + } /** * Create a new {@link KryoCoder}. @@ -190,6 +228,11 @@ public T decode(InputStream inStream) throws IOException { } } + @Override + public void verifyDeterministic() throws NonDeterministicException { + // noop + } + /** * Create a new {@link KryoCoder} instance with the user provided registrar. * @@ -228,4 +271,17 @@ SerializableOptions getOptions() { List getRegistrars() { return registrars; } + + @Override + public int hashCode() { + return instanceId.hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other != null && getClass().equals(other.getClass())) { + return instanceId.equals(((KryoCoder) other).instanceId); + } + return false; + } } diff --git a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProvider.java b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProvider.java index 1817438eacca7..684ac27a8aec2 100644 --- a/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProvider.java +++ b/sdks/java/extensions/kryo/src/main/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProvider.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderProvider; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.values.TypeDescriptor; /** @@ -39,6 +40,37 @@ public class KryoCoderProvider extends CoderProvider { private static final TypeDescriptor OBJECT_TYPE = new TypeDescriptor() {}; + /** + * Create a new {@link KryoCoderProvider} with default {@link KryoOptions}. + * + * @return A newly created {@link KryoCoderProvider} + */ + public static KryoCoderProvider of() { + return of(PipelineOptionsFactory.create(), Collections.emptyList()); + } + + /** + * Create a new {@link KryoCoderProvider} with default {@link KryoOptions}. + * + * @param registrars {@link KryoRegistrar}s which are used to register classes with underlying + * kryo instance + * @return A newly created {@link KryoCoderProvider} + */ + public static KryoCoderProvider of(KryoRegistrar... registrars) { + return of(PipelineOptionsFactory.create(), Arrays.asList(registrars)); + } + + /** + * Create a new {@link KryoCoderProvider} with default {@link KryoOptions}. + * + * @param registrars {@link KryoRegistrar}s which are used to register classes with underlying + * kryo instance + * @return A newly created {@link KryoCoderProvider} + */ + public static KryoCoderProvider of(List registrars) { + return of(PipelineOptionsFactory.create(), registrars); + } + /** * Create a new {@link KryoCoderProvider}. * diff --git a/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProviderTest.java b/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProviderTest.java index 2c467a6b6c9be..b97a18c55a187 100644 --- a/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProviderTest.java +++ b/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoCoderProviderTest.java @@ -92,8 +92,8 @@ public void testPrimitiveClass() throws CannotProvideCoderException { provider.coderFor(TypeDescriptors.strings(), Collections.emptyList()); } - private void assertProviderReturnsKryoCoderForClass(KryoCoderProvider provider, Class type) - throws CannotProvideCoderException { + private static void assertProviderReturnsKryoCoderForClass( + KryoCoderProvider provider, Class type) throws CannotProvideCoderException { assertTrue(provider.getCoder().getRegistrars().size() > 0); final Coder coderToAssert = provider.coderFor(TypeDescriptor.of(type), Collections.emptyList()); diff --git a/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoStateTest.java b/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoStateTest.java index 2b5f2f49d1823..f7917926f5ee6 100644 --- a/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoStateTest.java +++ b/sdks/java/extensions/kryo/src/test/java/org/apache/beam/sdk/extensions/kryo/KryoStateTest.java @@ -24,18 +24,14 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.junit.Test; /** A set of unit {@link KryoState} tests. */ public class KryoStateTest { - private static final PipelineOptions OPTIONS = PipelineOptionsFactory.create(); - @Test public void testSameKryoAfterDeserialization() throws IOException, ClassNotFoundException { - final KryoCoder coder = KryoCoder.of(OPTIONS, k -> k.register(TestClass.class)); + final KryoCoder coder = KryoCoder.of(k -> k.register(TestClass.class)); final KryoState firstKryo = KryoState.get(coder); final ByteArrayOutputStream outStr = new ByteArrayOutputStream();