diff --git a/beam/tools/src/main/java/cz/o2/proxima/beam/tools/groovy/BeamWindowedStream.java b/beam/tools/src/main/java/cz/o2/proxima/beam/tools/groovy/BeamWindowedStream.java index 3df87ebdd..93d379ff3 100644 --- a/beam/tools/src/main/java/cz/o2/proxima/beam/tools/groovy/BeamWindowedStream.java +++ b/beam/tools/src/main/java/cz/o2/proxima/beam/tools/groovy/BeamWindowedStream.java @@ -717,27 +717,26 @@ public JoinInputs( } if (!windowingStrategy.equals(rightTmp.getWindowingStrategy())) { rightTmp = - (PCollection) - rightTmp.apply( - createWindowTransform( - (WindowingStrategy) windowingStrategy, triggerSupplier.get())); + rightTmp.apply( + createWindowTransform( + (WindowingStrategy) windowingStrategy, triggerSupplier.get())); } TypeDescriptor keyType = TypeDescriptor.of(Types.returnClass(leftKeyDehydrated)); Coder keyCoder = getCoder(pipeline, keyType); + this.leftCoder = leftTmp.getCoder(); + this.rightCoder = rightTmp.getCoder(); this.leftKv = leftTmp .apply( MapElements.into(TypeDescriptors.kvs(keyType, leftTmp.getTypeDescriptor())) .via(e -> KV.of(leftKeyDehydrated.call(e), e))) - .setCoder(KvCoder.of(keyCoder, leftTmp.getCoder())); + .setCoder(KvCoder.of(keyCoder, leftCoder)); this.rightKv = rightTmp .apply( MapElements.into(TypeDescriptors.kvs(keyType, rightTmp.getTypeDescriptor())) .via(e -> KV.of(rightKeyDehydrated.call(e), e))) - .setCoder(KvCoder.of(keyCoder, rightTmp.getCoder())); - this.leftCoder = leftTmp.getCoder(); - this.rightCoder = rightTmp.getCoder(); + .setCoder(KvCoder.of(keyCoder, rightCoder)); } } diff --git a/beam/tools/src/test/java/cz/o2/proxima/beam/tools/groovy/BeamStreamTest.java b/beam/tools/src/test/java/cz/o2/proxima/beam/tools/groovy/BeamStreamTest.java index 7c4d534fe..c4a3fa508 100644 --- a/beam/tools/src/test/java/cz/o2/proxima/beam/tools/groovy/BeamStreamTest.java +++ b/beam/tools/src/test/java/cz/o2/proxima/beam/tools/groovy/BeamStreamTest.java @@ -74,9 +74,11 @@ import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.beam.runners.direct.DirectOptions; +import org.apache.beam.runners.direct.DirectRunner; import org.apache.beam.runners.flink.FlinkRunner; import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -143,6 +145,10 @@ public void tearDown() { } static TestStreamProvider provider(boolean stream) { + return provider(stream, DirectRunner.class); + } + + static TestStreamProvider provider(boolean stream, Class> runner) { return new TestStreamProvider() { @SuppressWarnings("unchecked") @Override @@ -167,6 +173,11 @@ public Stream of(List values) { () -> { LockSupport.park(); return false; + }, + () -> { + Pipeline p = BeamStream.createPipelineDefault(); + p.getOptions().setRunner(runner); + return p; })); } }; diff --git a/beam/tools/src/test/java/cz/o2/proxima/beam/tools/groovy/BeamWindowedStreamDirectTest.java b/beam/tools/src/test/java/cz/o2/proxima/beam/tools/groovy/BeamWindowedStreamDirectTest.java new file mode 100644 index 000000000..6dcc56795 --- /dev/null +++ b/beam/tools/src/test/java/cz/o2/proxima/beam/tools/groovy/BeamWindowedStreamDirectTest.java @@ -0,0 +1,31 @@ +/* + * Copyright 2017-2023 O2 Czech Republic, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.o2.proxima.beam.tools.groovy; + +import cz.o2.proxima.tools.groovy.WindowedStreamTest; +import org.apache.beam.runners.direct.DirectRunner; +import org.apache.beam.sdk.PipelineRunner; + +public class BeamWindowedStreamDirectTest extends WindowedStreamTest { + + public BeamWindowedStreamDirectTest() { + this(DirectRunner.class); + } + + protected BeamWindowedStreamDirectTest(Class> runner) { + super(BeamStreamTest.provider(true, runner)); + } +} diff --git a/beam/tools/src/test/java/cz/o2/proxima/beam/tools/groovy/BeamWindowedStreamTest.java b/beam/tools/src/test/java/cz/o2/proxima/beam/tools/groovy/BeamWindowedStreamFlinkTest.java similarity index 76% rename from beam/tools/src/test/java/cz/o2/proxima/beam/tools/groovy/BeamWindowedStreamTest.java rename to beam/tools/src/test/java/cz/o2/proxima/beam/tools/groovy/BeamWindowedStreamFlinkTest.java index be5177e73..dec09333e 100644 --- a/beam/tools/src/test/java/cz/o2/proxima/beam/tools/groovy/BeamWindowedStreamTest.java +++ b/beam/tools/src/test/java/cz/o2/proxima/beam/tools/groovy/BeamWindowedStreamFlinkTest.java @@ -15,11 +15,11 @@ */ package cz.o2.proxima.beam.tools.groovy; -import cz.o2.proxima.tools.groovy.WindowedStreamTest; +import org.apache.beam.runners.flink.FlinkRunner; -public class BeamWindowedStreamTest extends WindowedStreamTest { +public class BeamWindowedStreamFlinkTest extends BeamWindowedStreamDirectTest { - public BeamWindowedStreamTest() { - super(BeamStreamTest.provider(true)); + public BeamWindowedStreamFlinkTest() { + super(FlinkRunner.class); } } diff --git a/tools/src/main/java/cz/o2/proxima/tools/groovy/WindowedStream.java b/tools/src/main/java/cz/o2/proxima/tools/groovy/WindowedStream.java index 69355c0d7..f297c9c51 100644 --- a/tools/src/main/java/cz/o2/proxima/tools/groovy/WindowedStream.java +++ b/tools/src/main/java/cz/o2/proxima/tools/groovy/WindowedStream.java @@ -316,7 +316,7 @@ WindowedStream> averageByKey( default WindowedStream> join( WindowedStream right, @ClosureParams(value = FromString.class, options = "T") Closure leftKey, - @ClosureParams(value = FromString.class, options = "T") Closure rightKey) { + @ClosureParams(value = FromString.class, options = "OTHER") Closure rightKey) { return join(null, right, leftKey, rightKey); } @@ -336,7 +336,7 @@ WindowedStream> join( @Nullable String name, WindowedStream right, @ClosureParams(value = FromString.class, options = "T") Closure leftKey, - @ClosureParams(value = FromString.class, options = "T") Closure rightKey); + @ClosureParams(value = FromString.class, options = "OTHER") Closure rightKey); /** * Left join with other stream. @@ -351,7 +351,7 @@ WindowedStream> join( default WindowedStream> leftJoin( WindowedStream right, @ClosureParams(value = FromString.class, options = "T") Closure leftKey, - @ClosureParams(value = FromString.class, options = "T") Closure rightKey) { + @ClosureParams(value = FromString.class, options = "OTHER") Closure rightKey) { return leftJoin(null, right, leftKey, rightKey); } @@ -371,7 +371,7 @@ WindowedStream> leftJoin( @Nullable String name, WindowedStream right, @ClosureParams(value = FromString.class, options = "T") Closure leftKey, - @ClosureParams(value = FromString.class, options = "T") Closure rightKey); + @ClosureParams(value = FromString.class, options = "OTHER") Closure rightKey); /** * Sort stream.