Skip to content

Commit

Permalink
Merge pull request #40 from seznam/dejv/kryo_with_multiple_registrars
Browse files Browse the repository at this point in the history
[BEAM-5437] Allow kryo provider to use multiple registrars.
  • Loading branch information
VaclavPlajt authored Oct 4, 2018
2 parents d127f45 + 3906b78 commit 5875038
Show file tree
Hide file tree
Showing 17 changed files with 650 additions and 537 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static TestPipeline createTestPipeline() {
final TestPipeline testPipeline = TestPipeline.fromOptions(pipelineOptions);
testPipeline
.getCoderRegistry()
.registerCoderForClass(Object.class, KryoCoder.withoutClassRegistration());
.registerCoderForClass(Object.class, KryoCoder.of(pipelineOptions));
return testPipeline;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ public <T> void execute(TestCase<T> tc) {
final EuphoriaOptions euphoriaOptions = pipelineOptions.as(EuphoriaOptions.class);
euphoriaOptions.setAccumulatorProviderFactory(accumulatorProvider);
final Pipeline pipeline = TestPipeline.create(pipelineOptions);
pipeline
.getCoderRegistry()
.registerCoderForClass(Object.class, KryoCoder.withoutClassRegistration());
pipeline.getCoderRegistry().registerCoderForClass(Object.class, KryoCoder.of(pipelineOptions));
final Dataset<T> output = tc.getOutput(pipeline);
tc.validate(output);
pipeline.run().waitUntilFinish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.MapElements;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.RightJoin;
import org.apache.beam.sdk.extensions.kryo.KryoCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
Expand Down Expand Up @@ -799,7 +800,7 @@ public boolean isCompatible(WindowFn<?, ?> other) {

@Override
public Coder<BoundedWindow> windowCoder() {
return KryoCoder.withoutClassRegistration();
return KryoCoder.of(PipelineOptionsFactory.create());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Sums;
import org.apache.beam.sdk.extensions.euphoria.core.testkit.accumulators.SnapshotProvider;
import org.apache.beam.sdk.extensions.kryo.KryoCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
Expand Down Expand Up @@ -652,7 +653,7 @@ public boolean isCompatible(WindowFn<?, ?> other) {

@Override
public Coder<CountWindow> windowCoder() {
return KryoCoder.withoutClassRegistration();
return KryoCoder.of(PipelineOptionsFactory.create());
}

@Override
Expand Down Expand Up @@ -764,7 +765,7 @@ public boolean isCompatible(WindowFn<?, ?> other) {

@Override
public Coder<UniqueWindow> windowCoder() {
return KryoCoder.withoutClassRegistration();
return KryoCoder.of(PipelineOptionsFactory.create());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class BeamMetricsTranslationTest {
public void setup() {
testPipeline
.getCoderRegistry()
.registerCoderForClass(Object.class, KryoCoder.withoutClassRegistration());
.registerCoderForClass(Object.class, KryoCoder.of(testPipeline.getOptions()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class EuphoriaTest implements Serializable {
public void setup() {
pipeline
.getCoderRegistry()
.registerCoderForClass(Object.class, KryoCoder.withoutClassRegistration());
.registerCoderForClass(Object.class, KryoCoder.of(pipeline.getOptions()));
}

@Test
Expand Down
22 changes: 14 additions & 8 deletions sdks/java/extensions/kryo/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,29 @@
*/

apply plugin: org.apache.beam.gradle.BeamModulePlugin
applyJavaNature()

description = "Apache Beam :: SDKs :: Java :: Extensions :: Euphoria Java 8 DSL :: Kryo"

ext {
kryoVersion = '4.0.2'
}

applyJavaNature(shadowClosure: DEFAULT_SHADOW_CLOSURE << {
dependencies {
include(dependency('com.esotericsoftware:.*'))
include(dependency('org.ow2.asm:asm'))
}
relocate 'com.esotericsoftware', getJavaRelocatedPath('com.esotericsoftware')
relocate 'org.objectweb', getJavaRelocatedPath('org.objectweb')
})

description = 'Apache Beam :: SDKs :: Java :: Extensions :: Kryo'

dependencies {
shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
shadow "com.esotericsoftware:kryo:${kryoVersion}"
testCompile project(path: ":beam-sdks-java-core", configuration: "shadowTest")
compile "com.esotericsoftware:kryo:${kryoVersion}"
shadow project(path: ':beam-sdks-java-core', configuration: 'shadow')
testCompile project(path: ':beam-sdks-java-core', configuration: 'shadowTest')
testCompile project(':beam-runners-direct-java')
}

test {
jvmArgs '-Dsun.io.serialization.extendedDebugInfo=true'
}

test.testLogging.showStandardStreams = false

This file was deleted.

Loading

0 comments on commit 5875038

Please sign in to comment.