diff --git a/sdks/java/extensions/euphoria/euphoria-beam/build.gradle b/sdks/java/extensions/euphoria/euphoria-beam/build.gradle index 5473642ef13f3..db72b6252a2fa 100644 --- a/sdks/java/extensions/euphoria/euphoria-beam/build.gradle +++ b/sdks/java/extensions/euphoria/euphoria-beam/build.gradle @@ -13,9 +13,10 @@ dependencies { shadow 'com.google.code.findbugs:annotations:3.0.1' testCompile project(':beam-sdks-java-extensions-euphoria-operator-testkit') testCompile project(':beam-sdks-java-extensions-euphoria-testing') - testCompile project(':beam-runners-direct-java') + //testCompile project(':beam-runners-direct-java') testCompile library.java.slf4j_api testCompile library.java.hamcrest_core + testCompile "org.apache.beam:beam-runners-direct-java:2.4.0" } diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/FlowTranslator.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/FlowTranslator.java index c93eadf327559..f42798e4dd166 100644 --- a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/FlowTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/FlowTranslator.java @@ -20,6 +20,7 @@ import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.io.DataSink; import cz.seznam.euphoria.core.client.operator.FlatMap; +import cz.seznam.euphoria.core.client.operator.Join; import cz.seznam.euphoria.core.client.operator.Operator; import cz.seznam.euphoria.core.client.operator.ReduceByKey; import cz.seznam.euphoria.core.client.operator.ReduceStateByKey; @@ -52,6 +53,7 @@ class FlowTranslator { // extended operators translators.put(ReduceByKey.class, new ReduceByKeyTranslator()); translators.put(ReduceStateByKey.class, new ReduceStateByKeyTranslator()); + translators.put(Join.class, new JoinTranslator()); } static Pipeline toPipeline( diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/JoinTranslator.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/JoinTranslator.java new file mode 100644 index 0000000000000..77622c38a2d32 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/JoinTranslator.java @@ -0,0 +1,125 @@ +package cz.seznam.euphoria.beam; + +import cz.seznam.euphoria.beam.common.InputToKvDoFn; +import cz.seznam.euphoria.beam.io.KryoCoder; +import cz.seznam.euphoria.beam.join.FullJoinFn; +import cz.seznam.euphoria.beam.join.InnerJoinFn; +import cz.seznam.euphoria.beam.join.JoinFn; +import cz.seznam.euphoria.beam.join.LeftOuterJoinFn; +import cz.seznam.euphoria.beam.join.RightOuterJoinFn; +import cz.seznam.euphoria.beam.window.WindowingUtils; +import cz.seznam.euphoria.core.client.dataset.windowing.Window; +import cz.seznam.euphoria.core.client.functional.BinaryFunctor; +import cz.seznam.euphoria.core.client.functional.UnaryFunction; +import cz.seznam.euphoria.core.client.operator.Join; +import cz.seznam.euphoria.core.client.util.Pair; +import java.util.List; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; + + +/** + * {@link OperatorTranslator Translator } for Euphoria {@link Join} operator. + */ +public class JoinTranslator implements OperatorTranslator { + + @Override + @SuppressWarnings("unchecked") + public PCollection translate(Join operator, BeamExecutorContext context) { + return doTranslate(operator, context); + } + + + public > PCollection> + doTranslate(Join operator, BeamExecutorContext context) { + + Coder keyCoder = context.getCoder(operator.getLeftKeyExtractor()); + + // get input data-sets transformed to Pcollections> + List> inputs = context.getInputs(operator); + + PCollection> leftKvInput = getKVInputCollection(inputs.get(0), + operator.getLeftKeyExtractor(), + keyCoder, new KryoCoder<>(), "::extract-keys-left"); + + PCollection> rightKvInput = getKVInputCollection(inputs.get(1), + operator.getRightKeyExtractor(), + keyCoder, new KryoCoder<>(), "::extract-keys-right"); + + // and apply the same widowing on input Pcolections since the documentation states: + //'all of the PCollections you want to group must use the same + // windowing strategy and window sizing' + leftKvInput = WindowingUtils.applyWindowingIfSpecified( + operator, leftKvInput, context.getAllowedLateness(operator)); + rightKvInput = WindowingUtils.applyWindowingIfSpecified( + operator, rightKvInput, context.getAllowedLateness(operator)); + + // GoGroupByKey collections + TupleTag leftTag = new TupleTag<>(); + TupleTag rightTag = new TupleTag<>(); + + PCollection> coGrouped = KeyedPCollectionTuple + .of(leftTag, leftKvInput) + .and(rightTag, rightKvInput) + .apply("::co-group-by-key", CoGroupByKey.create()); + + // Join + JoinFn joinFn = chooseJoinFn(operator, leftTag, rightTag); + + return coGrouped.apply(joinFn.getFnName(), ParDo.of(joinFn)); + } + + private PCollection> getKVInputCollection( + PCollection inputPCollection, + UnaryFunction keyExtractor, + Coder keyCoder, Coder valueCoder, String transformName) { + + @SuppressWarnings("unchecked") + PCollection typedInput = (PCollection) inputPCollection; + typedInput.setCoder(valueCoder); + + PCollection> leftKvInput = + typedInput.apply(transformName, ParDo.of(new InputToKvDoFn<>(keyExtractor))); + leftKvInput.setCoder(KvCoder.of(keyCoder, valueCoder)); + + return leftKvInput; + } + + private > JoinFn + chooseJoinFn( + Join operator, TupleTag leftTag, + TupleTag rightTag) { + + JoinFn joinFn; + BinaryFunctor joiner = operator.getJoiner(); + + switch (operator.getType()) { + case INNER: + joinFn = new InnerJoinFn<>(joiner, leftTag, rightTag); + break; + case LEFT: + joinFn = new LeftOuterJoinFn<>(joiner, leftTag, rightTag); + break; + case RIGHT: + joinFn = new RightOuterJoinFn<>(joiner, leftTag, rightTag); + break; + case FULL: + joinFn = new FullJoinFn<>(joiner, leftTag, rightTag); + break; + + default: + throw new UnsupportedOperationException(String.format( + "Cannot translate Euphoria '%s' operator to Beam transformations." + + " Given join type '%s' is not supported.", + Join.class.getSimpleName(), operator.getType())); + } + return joinFn; + } +} diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/ReduceByKeyTranslator.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/ReduceByKeyTranslator.java index ec459e052981f..cb27b4eb7ce9e 100644 --- a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/ReduceByKeyTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/ReduceByKeyTranslator.java @@ -15,7 +15,7 @@ */ package cz.seznam.euphoria.beam; -import cz.seznam.euphoria.beam.window.BeamWindowFn; +import cz.seznam.euphoria.beam.window.WindowingUtils; import cz.seznam.euphoria.core.client.accumulators.AccumulatorProvider; import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.functional.ReduceFunctor; @@ -33,7 +33,6 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -54,24 +53,8 @@ class ReduceByKeyTranslator implements OperatorTranslator { final Coder keyCoder = context.getCoder(keyExtractor); final Coder valueCoder = context.getCoder(valueExtractor); - final PCollection input; - - // ~ apply windowing if specified - if (operator.getWindowing() == null) { - input = context.getInput(operator); - } else { - input = - context - .getInput(operator) - .apply( - operator.getName() + "::windowing", - org.apache.beam.sdk.transforms.windowing.Window.into( - BeamWindowFn.wrap(operator.getWindowing())) - // TODO: trigger - .triggering(AfterWatermark.pastEndOfWindow()) - .discardingFiredPanes() - .withAllowedLateness(context.getAllowedLateness(operator))); - } + final PCollection input = WindowingUtils.applyWindowingIfSpecified(operator, + context.getInput(operator), context.getAllowedLateness(operator)); // ~ create key & value extractor final MapElements> extractor = @@ -120,8 +103,8 @@ public Pair apply(KV in) { private static SerializableFunction, InputT> asCombiner( ReduceFunctor reducer) { - @SuppressWarnings("unchecked") - final ReduceFunctor combiner = (ReduceFunctor) reducer; + @SuppressWarnings("unchecked") final ReduceFunctor combiner = + (ReduceFunctor) reducer; final SingleValueCollector collector = new SingleValueCollector<>(); return (Iterable input) -> { combiner.apply(StreamSupport.stream(input.spliterator(), false), collector); diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/ReduceStateByKeyTranslator.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/ReduceStateByKeyTranslator.java index 3899f02f0af49..11143180fffa4 100644 --- a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/ReduceStateByKeyTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/ReduceStateByKeyTranslator.java @@ -63,6 +63,6 @@ public PCollection translate(ReduceStateByKey operator, BeamExecutorContext c .setCoder(new KryoCoder<>()); } */ - throw new UnsupportedOperationException("Not supported yet"); + throw new UnsupportedOperationException("ReduceStateByKy is not supported yet."); } } diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/common/InputToKvDoFn.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/common/InputToKvDoFn.java new file mode 100644 index 0000000000000..dbac09e738fec --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/common/InputToKvDoFn.java @@ -0,0 +1,26 @@ +package cz.seznam.euphoria.beam.common; + +import cz.seznam.euphoria.core.client.functional.UnaryFunction; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.KV; + +/** + * {@link DoFn} which takes input elements and transforms them to {@link KV} using given key + * extractor. + */ +public class InputToKvDoFn extends DoFn> { + + private final UnaryFunction keyExtractor; + + public InputToKvDoFn(UnaryFunction leftKeyExtractor) { + this.keyExtractor = leftKeyExtractor; + } + + @ProcessElement + public void processElement(ProcessContext c) { + InputT element = c.element(); + K key = keyExtractor.apply(element); + c.output(KV.of(key, element)); + } + +} diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/common/package-info.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/common/package-info.java new file mode 100644 index 0000000000000..c238720a5c4d9 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/common/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * A set of commonly used classes enabling some code reuse. + */ +package cz.seznam.euphoria.beam.common; diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/FullJoinFn.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/FullJoinFn.java new file mode 100644 index 0000000000000..b04faa971d93f --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/FullJoinFn.java @@ -0,0 +1,60 @@ +package cz.seznam.euphoria.beam.join; + +import cz.seznam.euphoria.beam.SingleValueCollector; +import cz.seznam.euphoria.core.client.functional.BinaryFunctor; +import cz.seznam.euphoria.core.client.util.Pair; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Full join implementation of {@link JoinFn}. + */ +public class FullJoinFn extends JoinFn { + + public FullJoinFn(BinaryFunctor joiner, TupleTag leftTag, + TupleTag rightTag) { + super(joiner, leftTag, rightTag); + } + + @Override + public void processElement(ProcessContext c) { + + KV element = c.element(); + CoGbkResult value = element.getValue(); + K key = element.getKey(); + + Iterable leftSideIter = value.getAll(leftTag); + Iterable rightSIdeIter = value.getAll(rightTag); + + SingleValueCollector outCollector = new SingleValueCollector<>(); + + boolean leftHasValues = leftSideIter.iterator().hasNext(); + boolean rightHasValues = rightSIdeIter.iterator().hasNext(); + + if (leftHasValues && rightHasValues) { + for (RightT rightValue : rightSIdeIter) { + for (LeftT leftValue : leftSideIter) { + joiner.apply(leftValue, rightValue, outCollector); + c.output(Pair.of(key, outCollector.get())); + } + } + } else if (leftHasValues && !rightHasValues) { + for (LeftT leftValue : leftSideIter) { + joiner.apply(leftValue, null, outCollector); + c.output(Pair.of(key, outCollector.get())); + } + } else if (!leftHasValues && rightHasValues) { + for (RightT rightValue : rightSIdeIter) { + joiner.apply(null, rightValue, outCollector); + c.output(Pair.of(key, outCollector.get())); + } + } + + } + + @Override + public String getFnName() { + return "::full-join"; + } +} diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/InnerJoinFn.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/InnerJoinFn.java new file mode 100644 index 0000000000000..9aebb8b01d0b9 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/InnerJoinFn.java @@ -0,0 +1,47 @@ +package cz.seznam.euphoria.beam.join; + +import cz.seznam.euphoria.beam.SingleValueCollector; +import cz.seznam.euphoria.core.client.functional.BinaryFunctor; +import cz.seznam.euphoria.core.client.util.Pair; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Inner join implementation of {@link JoinFn}. + */ +public class InnerJoinFn extends JoinFn { + + public InnerJoinFn( + BinaryFunctor functor, + TupleTag leftTag, + TupleTag rightTag) { + super(functor, leftTag, rightTag); + } + + @Override + public void processElement(ProcessContext c) { + + KV element = c.element(); + CoGbkResult value = element.getValue(); + K key = element.getKey(); + + Iterable leftSideIter = value.getAll(leftTag); + Iterable rightSideIter = value.getAll(rightTag); + + SingleValueCollector outCollector = new SingleValueCollector<>(); + + for (LeftT leftItem : leftSideIter) { + for (RightT rightItem : rightSideIter) { + joiner.apply(leftItem, rightItem, outCollector); + c.output(Pair.of(key, outCollector.get())); + } + } + } + + @Override + public String getFnName() { + return "::inner-join"; + } + +} diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/JoinFn.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/JoinFn.java new file mode 100644 index 0000000000000..bee8d74ffa9c1 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/JoinFn.java @@ -0,0 +1,37 @@ +package cz.seznam.euphoria.beam.join; + +import cz.seznam.euphoria.core.client.functional.BinaryFunctor; +import cz.seznam.euphoria.core.client.util.Pair; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Abstract base for joint implementations. + * + * @param type of left-side elements + * @param type of right-side elements + * @param key type + * @param type of output elements + */ +public abstract class JoinFn extends + DoFn, Pair> { + + protected final BinaryFunctor joiner; + protected final TupleTag leftTag; + protected final TupleTag rightTag; + + protected JoinFn( + BinaryFunctor joiner, + TupleTag leftTag, TupleTag rightTag) { + this.joiner = joiner; + this.leftTag = leftTag; + this.rightTag = rightTag; + } + + @ProcessElement + public abstract void processElement(ProcessContext c); + + public abstract String getFnName(); +} diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/LeftOuterJoinFn.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/LeftOuterJoinFn.java new file mode 100644 index 0000000000000..e0d413f584c9f --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/LeftOuterJoinFn.java @@ -0,0 +1,55 @@ +package cz.seznam.euphoria.beam.join; + +import cz.seznam.euphoria.beam.SingleValueCollector; +import cz.seznam.euphoria.core.client.functional.BinaryFunctor; +import cz.seznam.euphoria.core.client.util.Pair; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; + + +/** + * Left outer join implementation of {@link JoinFn}. + */ +public class LeftOuterJoinFn extends JoinFn { + + public LeftOuterJoinFn( + BinaryFunctor joiner, + TupleTag leftTag, + TupleTag rightTag) { + super(joiner, leftTag, rightTag); + } + + @Override + public void processElement(ProcessContext c) { + + KV element = c.element(); + CoGbkResult value = element.getValue(); + K key = element.getKey(); + + Iterable leftSideIter = value.getAll(leftTag); + Iterable rightSIdeIter = value.getAll(rightTag); + + SingleValueCollector outCollector = new SingleValueCollector<>(); + + for (LeftT leftValue : leftSideIter) { + if (rightSIdeIter.iterator().hasNext()) { + for (RightT rightValue : rightSIdeIter) { + joiner.apply(leftValue, rightValue, outCollector); + c.output(Pair.of(key, outCollector.get())); + } + } else { + joiner.apply(leftValue, null, outCollector); + c.output(Pair.of(key, outCollector.get())); + } + } + + + } + + @Override + public String getFnName() { + return "::left-outer-join"; + } + +} diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/RightOuterJoinFn.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/RightOuterJoinFn.java new file mode 100644 index 0000000000000..354cf7b7fa7ab --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/RightOuterJoinFn.java @@ -0,0 +1,52 @@ +package cz.seznam.euphoria.beam.join; + +import cz.seznam.euphoria.beam.SingleValueCollector; +import cz.seznam.euphoria.core.client.functional.BinaryFunctor; +import cz.seznam.euphoria.core.client.util.Pair; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Right outer join implementation of {@link JoinFn}. + */ +public class RightOuterJoinFn extends JoinFn { + + public RightOuterJoinFn( + BinaryFunctor joiner, + TupleTag leftTag, + TupleTag rightTag) { + super(joiner, leftTag, rightTag); + } + + @Override + public void processElement(ProcessContext c) { + + KV element = c.element(); + CoGbkResult value = element.getValue(); + K key = element.getKey(); + + Iterable leftSideIter = value.getAll(leftTag); + Iterable rightSIdeIter = value.getAll(rightTag); + + SingleValueCollector outCollector = new SingleValueCollector<>(); + + for (RightT rightValue : rightSIdeIter) { + if (leftSideIter.iterator().hasNext()) { + for (LeftT leftValue : leftSideIter) { + joiner.apply(leftValue, rightValue, outCollector); + c.output(Pair.of(key, outCollector.get())); + } + } else { + joiner.apply(null, rightValue, outCollector); + c.output(Pair.of(key, outCollector.get())); + } + } + + } + + @Override + public String getFnName() { + return "::right-outer-join"; + } +} diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/package-info.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/package-info.java new file mode 100644 index 0000000000000..e47d9dadca8b4 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * {@link cz.seznam.euphoria.core.client.operator.Join} translation centered classes. + */ +package cz.seznam.euphoria.beam.join; diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/window/BeamWindow.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/window/BeamWindow.java deleted file mode 100644 index 716574c5b080c..0000000000000 --- a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/window/BeamWindow.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright 2016-2018 Seznam.cz, a.s. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package cz.seznam.euphoria.beam.window; - -import cz.seznam.euphoria.core.client.dataset.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.joda.time.Instant; - -/** - * Wrapper around euphoria's {@code Window} into beam. - */ -public class BeamWindow> extends BoundedWindow { - - private static final Instant MAX_TIMESTAMP = BoundedWindow.TIMESTAMP_MAX_VALUE.minus(1); - private final W wrapped; - - private BeamWindow(W wrap) { - this.wrapped = wrap; - } - - public static > BeamWindow wrap(W wrap) { - return new BeamWindow<>(wrap); - } - - @Override - public Instant maxTimestamp() { - // We cannot return more than MAX_TIMESTAMP-1 since beam's WatermarkManager checks every - // window max timestamp to be smaller than BoundedWindow.TIMESTAMP_MAX_VALUE. - - long wrappedWindowMaxTimestamp = wrapped.maxTimestamp(); - if (wrappedWindowMaxTimestamp > MAX_TIMESTAMP.getMillis()) { - return MAX_TIMESTAMP; - } - - return new Instant(wrappedWindowMaxTimestamp); - } - - @Override - public boolean equals(Object obj) { - return obj instanceof BeamWindow && ((BeamWindow) obj).wrapped.equals(wrapped); - } - - @Override - public int hashCode() { - return wrapped.hashCode(); - } - - public W get() { - return wrapped; - } -} diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/window/BeamWindowFn.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/window/BeamWindowFn.java deleted file mode 100644 index 8c3589cc71420..0000000000000 --- a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/window/BeamWindowFn.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright 2016-2018 Seznam.cz, a.s. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package cz.seznam.euphoria.beam.window; - -import cz.seznam.euphoria.beam.io.KryoCoder; -import cz.seznam.euphoria.core.client.dataset.windowing.GlobalWindowing; -import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing; -import cz.seznam.euphoria.core.client.dataset.windowing.Window; -import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; -import java.util.Collection; -import java.util.Objects; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; - -/** - * A {@code WindowFn} wrapper of {@code Windowing}. - */ -public class BeamWindowFn> extends WindowFn> { - - private final Windowing windowing; - - private BeamWindowFn(Windowing windowing) { - this.windowing = Objects.requireNonNull(windowing); - } - - public static > BeamWindowFn wrap(Windowing windowing) { - return new BeamWindowFn<>(windowing); - } - - @Override - public void mergeWindows(MergeContext ctx) throws Exception { - if (windowing instanceof MergingWindowing) { - final MergingWindowing merge = (MergingWindowing) windowing; - merge - .mergeWindows(ctx.windows().stream().map(BeamWindow::get).collect(Collectors.toList())) - .forEach( - p -> { - try { - ctx.merge( - p.getFirst().stream().map(BeamWindow::wrap).collect(Collectors.toList()), - BeamWindow.wrap(p.getSecond())); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - } - } - - @Override - public boolean isNonMerging() { - return !(windowing instanceof MergingWindowing); - } - - @Override - public boolean isCompatible(WindowFn other) { - return other instanceof BeamWindowFn && ((BeamWindowFn) other).windowing.equals(windowing); - } - - @Override - @SuppressWarnings("unchecked") - public Collection> assignWindows(AssignContext ctx) throws Exception { - final Window> window = - ctx.window() instanceof GlobalWindow - ? GlobalWindowing.Window.get() - : ((BeamWindow) ctx.window()).get(); - return StreamSupport.stream( - windowing - .assignWindowsToElement( - BeamWindowedElement.of(ctx.element(), window, ctx.timestamp().getMillis())) - .spliterator(), - false) - .map(BeamWindow::wrap) - .collect(Collectors.toList()); - } - - @Override - public Coder> windowCoder() { - return new KryoCoder<>(); - } - - @Override - public WindowMappingFn> getDefaultWindowMappingFn() { - return new WindowMappingFn>() { - @Override - public BeamWindow getSideInputWindow(BoundedWindow mainWindow) { - throw new UnsupportedOperationException("Not supported yet."); - } - }; - } -} diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/window/BeamWindowedElement.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/window/BeamWindowedElement.java deleted file mode 100644 index e98bdad9bd8fd..0000000000000 --- a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/window/BeamWindowedElement.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2016-2018 Seznam.cz, a.s. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package cz.seznam.euphoria.beam.window; - -import cz.seznam.euphoria.core.client.dataset.windowing.Window; -import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement; -import java.util.Objects; - -/** - * A {@code WindowedElement} created from Beam's element. - */ -public class BeamWindowedElement implements WindowedElement { - - private final T elem; - private final W window; - private final long stamp; - - private BeamWindowedElement(T elem, W window, long stamp) { - this.elem = Objects.requireNonNull(elem); - this.window = Objects.requireNonNull(window); - this.stamp = stamp; - } - - public static BeamWindowedElement of(T elem, W window, long stamp) { - return new BeamWindowedElement<>(elem, window, stamp); - } - - @Override - public W getWindow() { - return window; - } - - @Override - public long getTimestamp() { - return stamp; - } - - @Override - public T getElement() { - return elem; - } -} diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/window/BeamWindowing.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/window/BeamWindowing.java new file mode 100644 index 0000000000000..f38321a414931 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/window/BeamWindowing.java @@ -0,0 +1,75 @@ +/* + * Copyright 2016-2018 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.beam.window; + +import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement; +import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode; + +/** + * Euphoria's {@link Windowing} which wraps Beam's {@link WindowFn}, {@link Trigger} and {@link + * WindowingStrategy.AccumulationMode} to allow Beam's widowing to be defined through Euphoria API. + */ +public class BeamWindowing implements + Windowing { + + private final WindowFn windowFn; + private final Trigger trigger; + private final WindowingStrategy.AccumulationMode accumulationMode; + + private BeamWindowing(WindowFn windowFn, Trigger beamTrigger, + AccumulationMode accumulationMode) { + this.windowFn = windowFn; + this.trigger = beamTrigger; + this.accumulationMode = accumulationMode; + } + + public static BeamWindowing of( + + WindowFn windowFn, Trigger trigger, + WindowingStrategy.AccumulationMode accumulationMode) { + return new BeamWindowing<>(windowFn, trigger, accumulationMode); + } + + + @Override + public Iterable assignWindowsToElement(WindowedElement el) { + throw new UnsupportedOperationException( + "Beam window serves as envelope, it do not supports element to window assignment."); + } + + @Override + public cz.seznam.euphoria.core.client.triggers.Trigger getTrigger() { + throw new UnsupportedOperationException( + "Beam window serves as envelope, it do not contains Euphoria trigger."); + } + + public WindowFn getWindowFn() { + return windowFn; + } + + public AccumulationMode getAccumulationMode() { + return accumulationMode; + } + + public Trigger getBeamTrigger() { + return trigger; + } +} diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/window/UnsupportedWindow.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/window/UnsupportedWindow.java new file mode 100644 index 0000000000000..71ef998637702 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/window/UnsupportedWindow.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.beam.window; + +import cz.seznam.euphoria.core.client.dataset.windowing.Window; +import java.util.Objects; + +/** + * Window used as type parameter of {@link BeamWindowing}. + */ +final class UnsupportedWindow extends Window { + + private UnsupportedWindow(){ + //Do not instantiate + } + + @Override + public int compareTo(UnsupportedWindow o) { + throw new UnsupportedOperationException(); + } + + @Override + public long maxTimestamp() { + throw new UnsupportedOperationException(); + } + + @Override + public int hashCode() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(Object obj) { + return Objects.equals(this, obj); + } + + +} diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/window/WindowingUtils.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/window/WindowingUtils.java new file mode 100644 index 0000000000000..d546deb5f28d2 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/window/WindowingUtils.java @@ -0,0 +1,58 @@ +package cz.seznam.euphoria.beam.window; + +import cz.seznam.euphoria.core.client.dataset.windowing.Window; +import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; +import cz.seznam.euphoria.core.client.operator.WindowWiseOperator; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; + +/** + * Collection of method helpful when dealing with windowing translations. + */ +public class WindowingUtils { + + public static > + PCollection applyWindowingIfSpecified( + WindowWiseOperator operator, + PCollection input, + Duration allowedLateness) { + + Windowing userSpecifiedWindowing = operator.getWindowing(); + + if (userSpecifiedWindowing == null) { + return input; + } + + if (!(userSpecifiedWindowing instanceof BeamWindowing)) { + throw new IllegalStateException(String.format( + "Use of '%s' is only way supported to specify windowing.", + BeamWindowing.class.getSimpleName())); + } + + @SuppressWarnings("unchecked") + BeamWindowing beamWindowing = (BeamWindowing) userSpecifiedWindowing; + + @SuppressWarnings("unchecked") + org.apache.beam.sdk.transforms.windowing.Window beamWindow = + org.apache.beam.sdk.transforms.windowing.Window + .into((WindowFn) beamWindowing.getWindowFn()) + .triggering(beamWindowing.getBeamTrigger()); + + switch (beamWindowing.getAccumulationMode()) { + case DISCARDING_FIRED_PANES: + beamWindow = beamWindow.discardingFiredPanes(); + break; + case ACCUMULATING_FIRED_PANES: + beamWindow = beamWindow.accumulatingFiredPanes(); + break; + default: + throw new IllegalStateException( + "Unsupported accumulation mode '" + beamWindowing.getAccumulationMode() + "'"); + } + + beamWindow = beamWindow.withAllowedLateness(allowedLateness); + + return input.apply(operator.getName() + "::windowing", beamWindow); + } +} diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/JoinTest.java b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/JoinTest.java new file mode 100644 index 0000000000000..7164f0d352255 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/JoinTest.java @@ -0,0 +1,206 @@ +package cz.seznam.euphoria.beam; + +import static java.util.Arrays.asList; + +import cz.seznam.euphoria.core.client.flow.Flow; +import cz.seznam.euphoria.core.client.functional.BinaryFunctor; +import cz.seznam.euphoria.core.client.io.ListDataSink; +import cz.seznam.euphoria.core.client.io.ListDataSource; +import cz.seznam.euphoria.core.client.operator.FullJoin; +import cz.seznam.euphoria.core.client.operator.Join; +import cz.seznam.euphoria.core.client.operator.LeftJoin; +import cz.seznam.euphoria.core.client.operator.RightJoin; +import cz.seznam.euphoria.core.client.util.Pair; +import cz.seznam.euphoria.testing.DatasetAssert; +import java.util.Optional; +import org.junit.Test; + +/** + * Simple test suite for Join operator. + */ +public class JoinTest { + + @Test + public void simpleInnerJoinTest() { + final Flow flow = Flow.create(); + + ListDataSource> left = + ListDataSource.bounded( + asList( + Pair.of(1, "L v1"), Pair.of(1, "L v2"), + Pair.of(2, "L v1"), Pair.of(2, "L v2"), + Pair.of(3, "L v1") + )); + + ListDataSource> right = + ListDataSource.bounded( + asList( + Pair.of(1, 1), Pair.of(1, 10), + Pair.of(2, 20), + Pair.of(4, 40) + )); + + ListDataSink>> output = ListDataSink.get(); + + BinaryFunctor, Pair, Pair> joiner = + (l, r, c) -> c.collect(Pair.of(l.getSecond(), r.getSecond())); + + Join.of(flow.createInput(left), flow.createInput(right)) + .by(Pair::getFirst, Pair::getFirst) + .using(joiner) + .output() + .persist(output); + + BeamExecutor executor = TestUtils.createExecutor(); + executor.execute(flow); + + DatasetAssert.unorderedEquals(output.getOutputs(), + Pair.of(1, Pair.of("L v1", 1)), Pair.of(1, Pair.of("L v1", 10)), + Pair.of(1, Pair.of("L v2", 1)), Pair.of(1, Pair.of("L v2", 10)), + + Pair.of(2, Pair.of("L v1", 20)), Pair.of(2, Pair.of("L v2", 20)) + ); + + } + + @Test + public void simpleLeftJoinTest() { + final Flow flow = Flow.create(); + + ListDataSource> left = + ListDataSource.bounded( + asList( + Pair.of(1, "L v1"), Pair.of(1, "L v2"), + Pair.of(2, "L v1"), Pair.of(2, "L v2"), + Pair.of(3, "L v1") + )); + + ListDataSource> right = + ListDataSource.bounded( + asList( + Pair.of(1, 1), Pair.of(1, 10), + Pair.of(2, 20), + Pair.of(4, 40) + )); + + ListDataSink>> output = ListDataSink.get(); + + BinaryFunctor, Optional>, Pair> + joiner = (l, r, c) -> + c.collect(Pair.of(l.getSecond(), r.orElse(Pair.of(null, null)).getSecond())); + + LeftJoin.of(flow.createInput(left), flow.createInput(right)) + .by(Pair::getFirst, Pair::getFirst) + .using(joiner) + .output() + .persist(output); + + BeamExecutor executor = TestUtils.createExecutor(); + executor.execute(flow); + + DatasetAssert.unorderedEquals(output.getOutputs(), + Pair.of(1, Pair.of("L v1", 1)), Pair.of(1, Pair.of("L v1", 10)), + Pair.of(1, Pair.of("L v2", 1)), Pair.of(1, Pair.of("L v2", 10)), + + Pair.of(2, Pair.of("L v1", 20)), Pair.of(2, Pair.of("L v2", 20)), + + Pair.of(3, Pair.of("L v1", null)) + ); + + } + + @Test + public void simpleRightJoinTest() { + final Flow flow = Flow.create(); + + ListDataSource> left = + ListDataSource.bounded( + asList( + Pair.of(1, "L v1"), Pair.of(1, "L v2"), + Pair.of(2, "L v1"), Pair.of(2, "L v2"), + Pair.of(3, "L v1") + )); + + ListDataSource> right = + ListDataSource.bounded( + asList( + Pair.of(1, 1), Pair.of(1, 10), + Pair.of(2, 20), + Pair.of(4, 40) + )); + + ListDataSink>> output = ListDataSink.get(); + + BinaryFunctor>, Pair, Pair> + joiner = (l, r, c) -> + c.collect(Pair.of(l.orElse(Pair.of(null, null)).getSecond(), r.getSecond())); + + RightJoin.of(flow.createInput(left), flow.createInput(right)) + .by(Pair::getFirst, Pair::getFirst) + .using(joiner) + .output() + .persist(output); + + BeamExecutor executor = TestUtils.createExecutor(); + executor.execute(flow); + + DatasetAssert.unorderedEquals(output.getOutputs(), + Pair.of(1, Pair.of("L v1", 1)), Pair.of(1, Pair.of("L v1", 10)), + Pair.of(1, Pair.of("L v2", 1)), Pair.of(1, Pair.of("L v2", 10)), + + Pair.of(2, Pair.of("L v1", 20)), Pair.of(2, Pair.of("L v2", 20)), + + Pair.of(4, Pair.of(null, 40)) + ); + + } + + @Test + public void simpleFullJoinTest() { + final Flow flow = Flow.create(); + + ListDataSource> left = + ListDataSource.bounded( + asList( + Pair.of(1, "L v1"), Pair.of(1, "L v2"), + Pair.of(2, "L v1"), Pair.of(2, "L v2"), + Pair.of(3, "L v1") + )); + + ListDataSource> right = + ListDataSource.bounded( + asList( + Pair.of(1, 1), Pair.of(1, 10), + Pair.of(2, 20), + Pair.of(4, 40) + )); + + ListDataSink>> output = ListDataSink.get(); + + BinaryFunctor< + Optional>, Optional>, Pair> + joiner = (l, r, c) -> c.collect(Pair.of( + l.orElse(Pair.of(null, null)).getSecond(), r.orElse(Pair.of(null, null)).getSecond())); + + FullJoin.of(flow.createInput(left), flow.createInput(right)) + .by(Pair::getFirst, Pair::getFirst) + .using(joiner) + .output() + .persist(output); + + BeamExecutor executor = TestUtils.createExecutor(); + executor.execute(flow); + + DatasetAssert.unorderedEquals(output.getOutputs(), + Pair.of(1, Pair.of("L v1", 1)), Pair.of(1, Pair.of("L v1", 10)), + Pair.of(1, Pair.of("L v2", 1)), Pair.of(1, Pair.of("L v2", 10)), + + Pair.of(2, Pair.of("L v1", 20)), Pair.of(2, Pair.of("L v2", 20)), + + Pair.of(3, Pair.of("L v1", null)), + Pair.of(4, Pair.of(null, 40)) + ); + + } + +} diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/ReduceByKeyTest.java b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/ReduceByKeyTest.java index 6c9758c07cfaa..da9936ba6e54d 100644 --- a/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/ReduceByKeyTest.java +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/ReduceByKeyTest.java @@ -17,8 +17,8 @@ import static org.junit.Assert.assertTrue; +import cz.seznam.euphoria.beam.window.BeamWindowing; import cz.seznam.euphoria.core.client.dataset.Dataset; -import cz.seznam.euphoria.core.client.dataset.windowing.Time; import cz.seznam.euphoria.core.client.dataset.windowing.TimeInterval; import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement; @@ -43,11 +43,11 @@ import cz.seznam.euphoria.core.client.util.Pair; import cz.seznam.euphoria.core.client.util.Sums; import cz.seznam.euphoria.testing.DatasetAssert; -import java.time.Duration; import java.util.Arrays; import java.util.Collections; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode; import org.junit.Ignore; import org.junit.Test; @@ -56,12 +56,6 @@ */ public class ReduceByKeyTest { - private BeamExecutor createExecutor() { - String[] args = {"--runner=DirectRunner"}; - PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class); - return new BeamExecutor(options).withAllowedLateness(Duration.ofHours(1)); - } - @Test public void testSimpleRBK() { final Flow flow = Flow.create(); @@ -74,11 +68,14 @@ public void testSimpleRBK() { ReduceByKey.of(flow.createInput(input, e -> 1000L * e)) .keyBy(i -> i % 2) .reduceBy(Sums.ofInts()) - .windowBy(Time.of(Duration.ofHours(1))) + .windowBy(BeamWindowing.of( + FixedWindows.of(org.joda.time.Duration.standardHours(1)), + AfterWatermark.pastEndOfWindow(), + AccumulationMode.DISCARDING_FIRED_PANES)) .output() .persist(output); - BeamExecutor executor = createExecutor(); + BeamExecutor executor = TestUtils.createExecutor(); executor.execute(flow); DatasetAssert.unorderedEquals(output.getOutputs(), Pair.of(0, 8), Pair.of(1, 7)); @@ -116,15 +113,19 @@ public void testEventTime() { ListDataSink> sink = ListDataSink.get(); Dataset> input = flow.createInput(source); input = AssignEventTime.of(input).using(Pair::getSecond).output(); + ReduceByKey.of(input) .keyBy(Pair::getFirst) .valueBy(e -> 1L) .combineBy(Sums.ofLongs()) - .windowBy(Time.of(Duration.ofSeconds(1))) + .windowBy(BeamWindowing.of( + FixedWindows.of(org.joda.time.Duration.standardSeconds(1)), + AfterWatermark.pastEndOfWindow(), + AccumulationMode.DISCARDING_FIRED_PANES)) .output() .persist(sink); - BeamExecutor executor = createExecutor(); + BeamExecutor executor = TestUtils.createExecutor(); executor.execute(flow); DatasetAssert.unorderedEquals( @@ -170,7 +171,10 @@ public void testElementTimestamp() { .keyBy(e -> "", TypeHint.ofString()) .valueBy(Pair::getFirst, TypeHint.ofInt()) .combineBy(Sums.ofInts(), TypeHint.ofInt()) - .windowBy(Time.of(Duration.ofSeconds(5))) + .windowBy(BeamWindowing.of( + FixedWindows.of(org.joda.time.Duration.standardSeconds(5)), + AfterWatermark.pastEndOfWindow(), + AccumulationMode.DISCARDING_FIRED_PANES)) .output(); // ~ now use a custom windowing with a trigger which does // the assertions subject to this test (use RSBK which has to @@ -190,7 +194,7 @@ public void testElementTimestamp() { .output() .persist(sink); - createExecutor().execute(flow); + TestUtils.createExecutor().execute(flow); DatasetAssert.unorderedEquals(sink.getOutputs(), 4, 6); } diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/TestUtils.java b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/TestUtils.java new file mode 100644 index 0000000000000..1cc188ca0e13a --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/TestUtils.java @@ -0,0 +1,17 @@ +package cz.seznam.euphoria.beam; + +import java.time.Duration; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +/** + * Collection of methods reused among tests. + */ +public class TestUtils { + + static BeamExecutor createExecutor() { + String[] args = {"--runner=DirectRunner"}; + PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class); + return new BeamExecutor(options).withAllowedLateness(Duration.ofHours(1)); + } +} diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/testkit/BeamExecutorProvider.java b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/testkit/BeamExecutorProvider.java index 9bb875a31f03a..b597c50d8e6a5 100644 --- a/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/testkit/BeamExecutorProvider.java +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/testkit/BeamExecutorProvider.java @@ -28,6 +28,7 @@ */ public interface BeamExecutorProvider extends ExecutorProvider { + @Override default ExecutorEnvironment newExecutorEnvironment() throws Exception { final String[] args = {"--runner=DirectRunner"}; final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class); diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/testkit/BeamOperatorsSuite.java b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/testkit/BeamOperatorsSuite.java index 11a95aa0f3592..b8f2a5990968c 100644 --- a/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/testkit/BeamOperatorsSuite.java +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/testkit/BeamOperatorsSuite.java @@ -16,6 +16,7 @@ package cz.seznam.euphoria.beam.testkit; import cz.seznam.euphoria.operator.test.FlatMapTest; +import cz.seznam.euphoria.operator.test.JoinTest; import cz.seznam.euphoria.operator.test.UnionTest; import cz.seznam.euphoria.operator.test.junit.ExecutorProvider; import cz.seznam.euphoria.operator.test.junit.ExecutorProviderRunner; @@ -33,7 +34,7 @@ // DistinctTest.class, // FilterTest.class, FlatMapTest.class, - // JoinTest.class, + JoinTest.class, // JoinWindowEnforcementTest.class, // MapElementsTest.class, // ReduceByKeyTest.class, diff --git a/sdks/java/extensions/euphoria/euphoria-operator-testkit/build.gradle b/sdks/java/extensions/euphoria/euphoria-operator-testkit/build.gradle index 44ebdb4241032..b2b96567f8273 100644 --- a/sdks/java/extensions/euphoria/euphoria-operator-testkit/build.gradle +++ b/sdks/java/extensions/euphoria/euphoria-operator-testkit/build.gradle @@ -5,5 +5,6 @@ description = "Apache Beam :: SDKs :: Java :: Extensions :: Euphoria Java 8 DSL" dependencies { compile project(':beam-sdks-java-extensions-euphoria-core') + compile project(':beam-sdks-java-extensions-euphoria-beam') compileOnly library.java.junit } diff --git a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/JoinTest.java b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/JoinTest.java index e8ed22e00cd0b..bac992c48cb50 100644 --- a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/JoinTest.java +++ b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/JoinTest.java @@ -17,12 +17,10 @@ import static org.junit.Assert.assertEquals; +import cz.seznam.euphoria.beam.io.KryoCoder; +import cz.seznam.euphoria.beam.window.BeamWindowing; import cz.seznam.euphoria.core.client.dataset.Dataset; -import cz.seznam.euphoria.core.client.dataset.windowing.Session; -import cz.seznam.euphoria.core.client.dataset.windowing.Time; import cz.seznam.euphoria.core.client.dataset.windowing.TimeInterval; -import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement; -import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.io.ListDataSource; @@ -32,23 +30,36 @@ import cz.seznam.euphoria.core.client.operator.LeftJoin; import cz.seznam.euphoria.core.client.operator.MapElements; import cz.seznam.euphoria.core.client.operator.RightJoin; -import cz.seznam.euphoria.core.client.triggers.NoopTrigger; -import cz.seznam.euphoria.core.client.triggers.Trigger; -import cz.seznam.euphoria.core.client.util.Either; import cz.seznam.euphoria.core.client.util.Pair; import cz.seznam.euphoria.core.client.util.Triple; import cz.seznam.euphoria.operator.test.accumulators.SnapshotProvider; import cz.seznam.euphoria.operator.test.junit.AbstractOperatorTest; import cz.seznam.euphoria.operator.test.junit.Processing; -import java.time.Duration; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode; +import org.joda.time.Instant; +import org.junit.Ignore; import org.junit.Test; -/** Test operator {@code Join}. */ +/** + * Test operator {@code Join}. + */ @Processing(Processing.Type.ALL) public class JoinTest extends AbstractOperatorTest { @@ -269,7 +280,10 @@ protected Dataset> getOutput( (Optional l, Optional r, Collector c) -> { c.collect(l.orElse(null) + "+" + r.orElse(null)); }) - .windowBy(new EvenOddWindowing()) + .windowBy(BeamWindowing.of( + new EvenOddWindowFn(), + AfterWatermark.pastEndOfWindow(), + AccumulationMode.DISCARDING_FIRED_PANES)) .output(); } @@ -319,7 +333,10 @@ protected Dataset> getOutput( (Integer l, Optional r, Collector c) -> { c.collect(l + "+" + r.orElse(null)); }) - .windowBy(new EvenOddWindowing()) + .windowBy(BeamWindowing.of( + new EvenOddWindowFn(), + AfterWatermark.pastEndOfWindow(), + AccumulationMode.DISCARDING_FIRED_PANES)) .output(); } @@ -364,7 +381,10 @@ protected Dataset> getOutput( (Optional l, Long r, Collector c) -> { c.collect(l.orElse(null) + "+" + r); }) - .windowBy(new EvenOddWindowing()) + .windowBy(BeamWindowing.of( + new EvenOddWindowFn(), + AfterWatermark.pastEndOfWindow(), + AccumulationMode.DISCARDING_FIRED_PANES)) .output(); } @@ -401,7 +421,7 @@ public List> getUnorderedOutput() { public void joinOnSessionWindowingNoEarlyTriggering() { execute( new JoinTestCase< - Pair, Pair, Triple>() { + Pair, Pair, Pair>() { @Override protected List> getLeftInput() { @@ -414,37 +434,41 @@ protected List> getRightInput() { } @Override - protected Dataset> getOutput( + protected Dataset> getOutput( Dataset> left, Dataset> right) { + left = AssignEventTime.of(left).using(Pair::getSecond).output(); right = AssignEventTime.of(right).using(Pair::getSecond).output(); - Dataset>> joined = + + Dataset>> joined = Join.of(left, right) .by(p -> "", p -> "") .using( - (Pair l, - Pair r, - Collector> c) -> - c.collect( - Triple.of( - (TimeInterval) c.getWindow(), l.getFirst(), r.getFirst()))) - .windowBy(Session.of(Duration.ofMillis(10))) + (Pair l, Pair r, + Collector> c) -> + c.collect(Pair.of(l.getFirst(), r.getFirst()))) + .windowBy(BeamWindowing.of( + Sessions.withGapDuration(org.joda.time.Duration.millis(10)), + AfterWatermark.pastEndOfWindow(), + AccumulationMode.DISCARDING_FIRED_PANES)) .output(); return MapElements.of(joined).using(Pair::getSecond).output(); } @Override - public List> getUnorderedOutput() { - TimeInterval expectedWindow = new TimeInterval(1, 14); + public List> getUnorderedOutput() { return Arrays.asList( - Triple.of(expectedWindow, "fi", "ha"), - Triple.of(expectedWindow, "fi", "ho"), - Triple.of(expectedWindow, "fa", "ha"), - Triple.of(expectedWindow, "fa", "ho")); + Pair.of("fi", "ha"), + Pair.of("fi", "ho"), + Pair.of("fa", "ha"), + Pair.of("fa", "ho")); } }); } + @Ignore( + "This test is based on access to various objects through Environment which is " + + "unsupported feature. It may be possible to add this feature in future.") @Test public void testJoinAccumulators() { execute( @@ -464,8 +488,10 @@ protected List> getRightInput() { @Override protected Dataset> getOutput( Dataset> left, Dataset> right) { + left = AssignEventTime.of(left).using(Pair::getSecond).output(); right = AssignEventTime.of(right).using(Pair::getSecond).output(); + Dataset>> joined = Join.of(left, right) .by(p -> "", p -> "") @@ -478,7 +504,11 @@ protected Dataset> getOutput( c.getHistogram("hist-" + l.getFirst().charAt(1)).add(2345, 8); c.collect(Triple.of(window, l.getFirst(), r.getFirst())); }) - .windowBy(Time.of(Duration.ofMillis(3))) +// .windowBy(Time.of(Duration.ofMillis(3))) + .windowBy(BeamWindowing.of( + FixedWindows.of(org.joda.time.Duration.millis(3)), + AfterWatermark.pastEndOfWindow(), + AccumulationMode.DISCARDING_FIRED_PANES)) .output(); return MapElements.of(joined).using(Pair::getSecond).output(); } @@ -509,11 +539,9 @@ public void validateAccumulators(SnapshotProvider snapshots) { /** * Base for join test cases. - * @param - * @param - * @param */ public abstract static class JoinTestCase implements TestCase { + @Override public Dataset getOutput(Flow flow, boolean bounded) { Dataset left = flow.createInput(ListDataSource.of(bounded, getLeftInput())); @@ -528,36 +556,87 @@ public Dataset getOutput(Flow flow, boolean bounded) { protected abstract List getRightInput(); } - /** Stable windowing for test purposes. */ - static class EvenOddWindowing implements Windowing, IntWindow> { + + /** + * Elements with even numeric values are are assigned to one 'even' window. All others are + * assigned to window named 'win: #', where '#' is value of assigned element. + */ + private static class EvenOddWindowFn extends WindowFn, BoundedWindow> { + + private static final NamedGlobalWindow EVEN_WIN = new NamedGlobalWindow("even"); @Override - public Iterable assignWindowsToElement( - WindowedElement> input) { - int element; - Either unwrapped = input.getElement(); - if (unwrapped.isLeft()) { - element = unwrapped.left(); + @SuppressFBWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") + public Collection assignWindows(AssignContext c) throws Exception { + KV element = c.element(); + + Number value = element.getValue(); + + if (value == null) { + return Collections.singleton(EVEN_WIN); + } + + NamedGlobalWindow win; + if (value.longValue() % 2 == 0) { + win = EVEN_WIN; } else { - element = (int) (long) unwrapped.right(); + win = new NamedGlobalWindow("win: " + value.longValue()); } - final int label = element % 2 == 0 ? 0 : element; - return Collections.singleton(new IntWindow(label)); + + return Collections.singleton(win); } @Override - public Trigger getTrigger() { - return NoopTrigger.get(); + public void mergeWindows(MergeContext c) throws Exception { + // no merging } @Override - public boolean equals(Object obj) { - return obj instanceof EvenOddWindowing; + public boolean isCompatible(WindowFn other) { + return other instanceof EvenOddWindowFn; + } + + @Override + public Coder windowCoder() { + return new KryoCoder<>(); + } + + @Override + @Nullable + public WindowMappingFn getDefaultWindowMappingFn() { + return null; + } + + @Override + public boolean isNonMerging() { + return true; + } + } + + private static class NamedGlobalWindow extends BoundedWindow { + + private String name; + + public NamedGlobalWindow(String name) { + this.name = name; + } + + @Override + public Instant maxTimestamp() { + return GlobalWindow.INSTANCE.maxTimestamp(); + } + + @Override + public boolean equals(Object other) { + if (other instanceof NamedGlobalWindow) { + return name.equals(((NamedGlobalWindow) other).name); + } + return false; } @Override public int hashCode() { - return 0; + return name.hashCode(); } } }