diff --git a/sdks/java/extensions/euphoria/euphoria-beam/build.gradle b/sdks/java/extensions/euphoria/euphoria-beam/build.gradle index 5473642ef13f3..b5f810e70f930 100644 --- a/sdks/java/extensions/euphoria/euphoria-beam/build.gradle +++ b/sdks/java/extensions/euphoria/euphoria-beam/build.gradle @@ -16,7 +16,6 @@ dependencies { testCompile project(':beam-runners-direct-java') testCompile library.java.slf4j_api testCompile library.java.hamcrest_core - } test.testLogging.showStandardStreams = true diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/FlowTranslator.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/FlowTranslator.java index c93eadf327559..f42798e4dd166 100644 --- a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/FlowTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/FlowTranslator.java @@ -20,6 +20,7 @@ import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.io.DataSink; import cz.seznam.euphoria.core.client.operator.FlatMap; +import cz.seznam.euphoria.core.client.operator.Join; import cz.seznam.euphoria.core.client.operator.Operator; import cz.seznam.euphoria.core.client.operator.ReduceByKey; import cz.seznam.euphoria.core.client.operator.ReduceStateByKey; @@ -52,6 +53,7 @@ class FlowTranslator { // extended operators translators.put(ReduceByKey.class, new ReduceByKeyTranslator()); translators.put(ReduceStateByKey.class, new ReduceStateByKeyTranslator()); + translators.put(Join.class, new JoinTranslator()); } static Pipeline toPipeline( diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/JoinTranslator.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/JoinTranslator.java new file mode 100644 index 0000000000000..77622c38a2d32 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/JoinTranslator.java @@ -0,0 +1,125 @@ +package cz.seznam.euphoria.beam; + +import cz.seznam.euphoria.beam.common.InputToKvDoFn; +import cz.seznam.euphoria.beam.io.KryoCoder; +import cz.seznam.euphoria.beam.join.FullJoinFn; +import cz.seznam.euphoria.beam.join.InnerJoinFn; +import cz.seznam.euphoria.beam.join.JoinFn; +import cz.seznam.euphoria.beam.join.LeftOuterJoinFn; +import cz.seznam.euphoria.beam.join.RightOuterJoinFn; +import cz.seznam.euphoria.beam.window.WindowingUtils; +import cz.seznam.euphoria.core.client.dataset.windowing.Window; +import cz.seznam.euphoria.core.client.functional.BinaryFunctor; +import cz.seznam.euphoria.core.client.functional.UnaryFunction; +import cz.seznam.euphoria.core.client.operator.Join; +import cz.seznam.euphoria.core.client.util.Pair; +import java.util.List; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; + + +/** + * {@link OperatorTranslator Translator } for Euphoria {@link Join} operator. + */ +public class JoinTranslator implements OperatorTranslator { + + @Override + @SuppressWarnings("unchecked") + public PCollection translate(Join operator, BeamExecutorContext context) { + return doTranslate(operator, context); + } + + + public > PCollection> + doTranslate(Join operator, BeamExecutorContext context) { + + Coder keyCoder = context.getCoder(operator.getLeftKeyExtractor()); + + // get input data-sets transformed to Pcollections> + List> inputs = context.getInputs(operator); + + PCollection> leftKvInput = getKVInputCollection(inputs.get(0), + operator.getLeftKeyExtractor(), + keyCoder, new KryoCoder<>(), "::extract-keys-left"); + + PCollection> rightKvInput = getKVInputCollection(inputs.get(1), + operator.getRightKeyExtractor(), + keyCoder, new KryoCoder<>(), "::extract-keys-right"); + + // and apply the same widowing on input Pcolections since the documentation states: + //'all of the PCollections you want to group must use the same + // windowing strategy and window sizing' + leftKvInput = WindowingUtils.applyWindowingIfSpecified( + operator, leftKvInput, context.getAllowedLateness(operator)); + rightKvInput = WindowingUtils.applyWindowingIfSpecified( + operator, rightKvInput, context.getAllowedLateness(operator)); + + // GoGroupByKey collections + TupleTag leftTag = new TupleTag<>(); + TupleTag rightTag = new TupleTag<>(); + + PCollection> coGrouped = KeyedPCollectionTuple + .of(leftTag, leftKvInput) + .and(rightTag, rightKvInput) + .apply("::co-group-by-key", CoGroupByKey.create()); + + // Join + JoinFn joinFn = chooseJoinFn(operator, leftTag, rightTag); + + return coGrouped.apply(joinFn.getFnName(), ParDo.of(joinFn)); + } + + private PCollection> getKVInputCollection( + PCollection inputPCollection, + UnaryFunction keyExtractor, + Coder keyCoder, Coder valueCoder, String transformName) { + + @SuppressWarnings("unchecked") + PCollection typedInput = (PCollection) inputPCollection; + typedInput.setCoder(valueCoder); + + PCollection> leftKvInput = + typedInput.apply(transformName, ParDo.of(new InputToKvDoFn<>(keyExtractor))); + leftKvInput.setCoder(KvCoder.of(keyCoder, valueCoder)); + + return leftKvInput; + } + + private > JoinFn + chooseJoinFn( + Join operator, TupleTag leftTag, + TupleTag rightTag) { + + JoinFn joinFn; + BinaryFunctor joiner = operator.getJoiner(); + + switch (operator.getType()) { + case INNER: + joinFn = new InnerJoinFn<>(joiner, leftTag, rightTag); + break; + case LEFT: + joinFn = new LeftOuterJoinFn<>(joiner, leftTag, rightTag); + break; + case RIGHT: + joinFn = new RightOuterJoinFn<>(joiner, leftTag, rightTag); + break; + case FULL: + joinFn = new FullJoinFn<>(joiner, leftTag, rightTag); + break; + + default: + throw new UnsupportedOperationException(String.format( + "Cannot translate Euphoria '%s' operator to Beam transformations." + + " Given join type '%s' is not supported.", + Join.class.getSimpleName(), operator.getType())); + } + return joinFn; + } +} diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/ReduceByKeyTranslator.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/ReduceByKeyTranslator.java index 4e0cad3a5ae8e..a2c6ac6561200 100644 --- a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/ReduceByKeyTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/ReduceByKeyTranslator.java @@ -15,10 +15,9 @@ */ package cz.seznam.euphoria.beam; -import cz.seznam.euphoria.beam.window.BeamWindowing; +import cz.seznam.euphoria.beam.window.WindowingUtils; import cz.seznam.euphoria.core.client.accumulators.AccumulatorProvider; import cz.seznam.euphoria.core.client.dataset.windowing.Window; -import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.functional.ReduceFunctor; import cz.seznam.euphoria.core.client.functional.UnaryFunction; import cz.seznam.euphoria.core.client.operator.ReduceByKey; @@ -46,6 +45,10 @@ class ReduceByKeyTranslator implements OperatorTranslator { private static > PCollection> doTranslate(ReduceByKey operator, BeamExecutorContext context) { + if (operator.getValueComparator() != null) { //TODO Could we even do values sorting ? + throw new UnsupportedOperationException("Values sorting is not supported."); + } + final UnaryFunction keyExtractor = operator.getKeyExtractor(); final UnaryFunction valueExtractor = operator.getValueExtractor(); final ReduceFunctor reducer = operator.getReducer(); @@ -54,8 +57,8 @@ class ReduceByKeyTranslator implements OperatorTranslator { final Coder keyCoder = context.getCoder(keyExtractor); final Coder valueCoder = context.getCoder(valueExtractor); - final PCollection input = applyWindowingIfSpecified(operator, - context.getInput(operator), context); + final PCollection input = WindowingUtils.applyWindowingIfSpecified(operator, + context.getInput(operator), context.getAllowedLateness(operator)); // ~ create key & value extractor final MapElements> extractor = @@ -101,48 +104,6 @@ public Pair apply(KV in) { } } - private static > - PCollection applyWindowingIfSpecified( - ReduceByKey operator, PCollection input, - BeamExecutorContext context) { - - Windowing userSpecifiedWindowing = operator.getWindowing(); - - if (userSpecifiedWindowing == null) { - return input; - } - - if (!(userSpecifiedWindowing instanceof BeamWindowing)) { - throw new IllegalStateException(String.format( - "%s class only is supported to specify windowing.", BeamWindowing.class.getSimpleName())); - } - - @SuppressWarnings("unchecked") - BeamWindowing beamWindowing = (BeamWindowing) userSpecifiedWindowing; - - @SuppressWarnings("unchecked") - org.apache.beam.sdk.transforms.windowing.Window beamWindow = - (org.apache.beam.sdk.transforms.windowing.Window) - org.apache.beam.sdk.transforms.windowing.Window.into(beamWindowing.getWindowFn()) - .triggering(beamWindowing.getBeamTrigger()); - - switch (beamWindowing.getAccumulationMode()) { - case DISCARDING_FIRED_PANES: - beamWindow = beamWindow.discardingFiredPanes(); - break; - case ACCUMULATING_FIRED_PANES: - beamWindow = beamWindow.accumulatingFiredPanes(); - break; - default: - throw new IllegalStateException( - "Unsupported accumulation mode '" + beamWindowing.getAccumulationMode() + "'"); - } - - beamWindow = beamWindow.withAllowedLateness(context.getAllowedLateness(operator)); - - return input.apply(operator.getName() + "::windowing", beamWindow); - } - private static SerializableFunction, InputT> asCombiner( ReduceFunctor reducer) { diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/ReduceStateByKeyTranslator.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/ReduceStateByKeyTranslator.java index 3899f02f0af49..11143180fffa4 100644 --- a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/ReduceStateByKeyTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/ReduceStateByKeyTranslator.java @@ -63,6 +63,6 @@ public PCollection translate(ReduceStateByKey operator, BeamExecutorContext c .setCoder(new KryoCoder<>()); } */ - throw new UnsupportedOperationException("Not supported yet"); + throw new UnsupportedOperationException("ReduceStateByKy is not supported yet."); } } diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/common/InputToKvDoFn.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/common/InputToKvDoFn.java new file mode 100644 index 0000000000000..dbac09e738fec --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/common/InputToKvDoFn.java @@ -0,0 +1,26 @@ +package cz.seznam.euphoria.beam.common; + +import cz.seznam.euphoria.core.client.functional.UnaryFunction; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.KV; + +/** + * {@link DoFn} which takes input elements and transforms them to {@link KV} using given key + * extractor. + */ +public class InputToKvDoFn extends DoFn> { + + private final UnaryFunction keyExtractor; + + public InputToKvDoFn(UnaryFunction leftKeyExtractor) { + this.keyExtractor = leftKeyExtractor; + } + + @ProcessElement + public void processElement(ProcessContext c) { + InputT element = c.element(); + K key = keyExtractor.apply(element); + c.output(KV.of(key, element)); + } + +} diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/common/package-info.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/common/package-info.java new file mode 100644 index 0000000000000..c238720a5c4d9 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/common/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * A set of commonly used classes enabling some code reuse. + */ +package cz.seznam.euphoria.beam.common; diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/FullJoinFn.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/FullJoinFn.java new file mode 100644 index 0000000000000..b04faa971d93f --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/FullJoinFn.java @@ -0,0 +1,60 @@ +package cz.seznam.euphoria.beam.join; + +import cz.seznam.euphoria.beam.SingleValueCollector; +import cz.seznam.euphoria.core.client.functional.BinaryFunctor; +import cz.seznam.euphoria.core.client.util.Pair; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Full join implementation of {@link JoinFn}. + */ +public class FullJoinFn extends JoinFn { + + public FullJoinFn(BinaryFunctor joiner, TupleTag leftTag, + TupleTag rightTag) { + super(joiner, leftTag, rightTag); + } + + @Override + public void processElement(ProcessContext c) { + + KV element = c.element(); + CoGbkResult value = element.getValue(); + K key = element.getKey(); + + Iterable leftSideIter = value.getAll(leftTag); + Iterable rightSIdeIter = value.getAll(rightTag); + + SingleValueCollector outCollector = new SingleValueCollector<>(); + + boolean leftHasValues = leftSideIter.iterator().hasNext(); + boolean rightHasValues = rightSIdeIter.iterator().hasNext(); + + if (leftHasValues && rightHasValues) { + for (RightT rightValue : rightSIdeIter) { + for (LeftT leftValue : leftSideIter) { + joiner.apply(leftValue, rightValue, outCollector); + c.output(Pair.of(key, outCollector.get())); + } + } + } else if (leftHasValues && !rightHasValues) { + for (LeftT leftValue : leftSideIter) { + joiner.apply(leftValue, null, outCollector); + c.output(Pair.of(key, outCollector.get())); + } + } else if (!leftHasValues && rightHasValues) { + for (RightT rightValue : rightSIdeIter) { + joiner.apply(null, rightValue, outCollector); + c.output(Pair.of(key, outCollector.get())); + } + } + + } + + @Override + public String getFnName() { + return "::full-join"; + } +} diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/InnerJoinFn.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/InnerJoinFn.java new file mode 100644 index 0000000000000..9aebb8b01d0b9 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/InnerJoinFn.java @@ -0,0 +1,47 @@ +package cz.seznam.euphoria.beam.join; + +import cz.seznam.euphoria.beam.SingleValueCollector; +import cz.seznam.euphoria.core.client.functional.BinaryFunctor; +import cz.seznam.euphoria.core.client.util.Pair; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Inner join implementation of {@link JoinFn}. + */ +public class InnerJoinFn extends JoinFn { + + public InnerJoinFn( + BinaryFunctor functor, + TupleTag leftTag, + TupleTag rightTag) { + super(functor, leftTag, rightTag); + } + + @Override + public void processElement(ProcessContext c) { + + KV element = c.element(); + CoGbkResult value = element.getValue(); + K key = element.getKey(); + + Iterable leftSideIter = value.getAll(leftTag); + Iterable rightSideIter = value.getAll(rightTag); + + SingleValueCollector outCollector = new SingleValueCollector<>(); + + for (LeftT leftItem : leftSideIter) { + for (RightT rightItem : rightSideIter) { + joiner.apply(leftItem, rightItem, outCollector); + c.output(Pair.of(key, outCollector.get())); + } + } + } + + @Override + public String getFnName() { + return "::inner-join"; + } + +} diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/JoinFn.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/JoinFn.java new file mode 100644 index 0000000000000..bee8d74ffa9c1 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/JoinFn.java @@ -0,0 +1,37 @@ +package cz.seznam.euphoria.beam.join; + +import cz.seznam.euphoria.core.client.functional.BinaryFunctor; +import cz.seznam.euphoria.core.client.util.Pair; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Abstract base for joint implementations. + * + * @param type of left-side elements + * @param type of right-side elements + * @param key type + * @param type of output elements + */ +public abstract class JoinFn extends + DoFn, Pair> { + + protected final BinaryFunctor joiner; + protected final TupleTag leftTag; + protected final TupleTag rightTag; + + protected JoinFn( + BinaryFunctor joiner, + TupleTag leftTag, TupleTag rightTag) { + this.joiner = joiner; + this.leftTag = leftTag; + this.rightTag = rightTag; + } + + @ProcessElement + public abstract void processElement(ProcessContext c); + + public abstract String getFnName(); +} diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/LeftOuterJoinFn.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/LeftOuterJoinFn.java new file mode 100644 index 0000000000000..e0d413f584c9f --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/LeftOuterJoinFn.java @@ -0,0 +1,55 @@ +package cz.seznam.euphoria.beam.join; + +import cz.seznam.euphoria.beam.SingleValueCollector; +import cz.seznam.euphoria.core.client.functional.BinaryFunctor; +import cz.seznam.euphoria.core.client.util.Pair; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; + + +/** + * Left outer join implementation of {@link JoinFn}. + */ +public class LeftOuterJoinFn extends JoinFn { + + public LeftOuterJoinFn( + BinaryFunctor joiner, + TupleTag leftTag, + TupleTag rightTag) { + super(joiner, leftTag, rightTag); + } + + @Override + public void processElement(ProcessContext c) { + + KV element = c.element(); + CoGbkResult value = element.getValue(); + K key = element.getKey(); + + Iterable leftSideIter = value.getAll(leftTag); + Iterable rightSIdeIter = value.getAll(rightTag); + + SingleValueCollector outCollector = new SingleValueCollector<>(); + + for (LeftT leftValue : leftSideIter) { + if (rightSIdeIter.iterator().hasNext()) { + for (RightT rightValue : rightSIdeIter) { + joiner.apply(leftValue, rightValue, outCollector); + c.output(Pair.of(key, outCollector.get())); + } + } else { + joiner.apply(leftValue, null, outCollector); + c.output(Pair.of(key, outCollector.get())); + } + } + + + } + + @Override + public String getFnName() { + return "::left-outer-join"; + } + +} diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/RightOuterJoinFn.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/RightOuterJoinFn.java new file mode 100644 index 0000000000000..354cf7b7fa7ab --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/RightOuterJoinFn.java @@ -0,0 +1,52 @@ +package cz.seznam.euphoria.beam.join; + +import cz.seznam.euphoria.beam.SingleValueCollector; +import cz.seznam.euphoria.core.client.functional.BinaryFunctor; +import cz.seznam.euphoria.core.client.util.Pair; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Right outer join implementation of {@link JoinFn}. + */ +public class RightOuterJoinFn extends JoinFn { + + public RightOuterJoinFn( + BinaryFunctor joiner, + TupleTag leftTag, + TupleTag rightTag) { + super(joiner, leftTag, rightTag); + } + + @Override + public void processElement(ProcessContext c) { + + KV element = c.element(); + CoGbkResult value = element.getValue(); + K key = element.getKey(); + + Iterable leftSideIter = value.getAll(leftTag); + Iterable rightSIdeIter = value.getAll(rightTag); + + SingleValueCollector outCollector = new SingleValueCollector<>(); + + for (RightT rightValue : rightSIdeIter) { + if (leftSideIter.iterator().hasNext()) { + for (LeftT leftValue : leftSideIter) { + joiner.apply(leftValue, rightValue, outCollector); + c.output(Pair.of(key, outCollector.get())); + } + } else { + joiner.apply(null, rightValue, outCollector); + c.output(Pair.of(key, outCollector.get())); + } + } + + } + + @Override + public String getFnName() { + return "::right-outer-join"; + } +} diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/package-info.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/package-info.java new file mode 100644 index 0000000000000..e47d9dadca8b4 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/join/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * {@link cz.seznam.euphoria.core.client.operator.Join} translation centered classes. + */ +package cz.seznam.euphoria.beam.join; diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/window/WindowingUtils.java b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/window/WindowingUtils.java new file mode 100644 index 0000000000000..d546deb5f28d2 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/main/java/cz/seznam/euphoria/beam/window/WindowingUtils.java @@ -0,0 +1,58 @@ +package cz.seznam.euphoria.beam.window; + +import cz.seznam.euphoria.core.client.dataset.windowing.Window; +import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; +import cz.seznam.euphoria.core.client.operator.WindowWiseOperator; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; + +/** + * Collection of method helpful when dealing with windowing translations. + */ +public class WindowingUtils { + + public static > + PCollection applyWindowingIfSpecified( + WindowWiseOperator operator, + PCollection input, + Duration allowedLateness) { + + Windowing userSpecifiedWindowing = operator.getWindowing(); + + if (userSpecifiedWindowing == null) { + return input; + } + + if (!(userSpecifiedWindowing instanceof BeamWindowing)) { + throw new IllegalStateException(String.format( + "Use of '%s' is only way supported to specify windowing.", + BeamWindowing.class.getSimpleName())); + } + + @SuppressWarnings("unchecked") + BeamWindowing beamWindowing = (BeamWindowing) userSpecifiedWindowing; + + @SuppressWarnings("unchecked") + org.apache.beam.sdk.transforms.windowing.Window beamWindow = + org.apache.beam.sdk.transforms.windowing.Window + .into((WindowFn) beamWindowing.getWindowFn()) + .triggering(beamWindowing.getBeamTrigger()); + + switch (beamWindowing.getAccumulationMode()) { + case DISCARDING_FIRED_PANES: + beamWindow = beamWindow.discardingFiredPanes(); + break; + case ACCUMULATING_FIRED_PANES: + beamWindow = beamWindow.accumulatingFiredPanes(); + break; + default: + throw new IllegalStateException( + "Unsupported accumulation mode '" + beamWindowing.getAccumulationMode() + "'"); + } + + beamWindow = beamWindow.withAllowedLateness(allowedLateness); + + return input.apply(operator.getName() + "::windowing", beamWindow); + } +} diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/JoinTest.java b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/JoinTest.java new file mode 100644 index 0000000000000..7164f0d352255 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/JoinTest.java @@ -0,0 +1,206 @@ +package cz.seznam.euphoria.beam; + +import static java.util.Arrays.asList; + +import cz.seznam.euphoria.core.client.flow.Flow; +import cz.seznam.euphoria.core.client.functional.BinaryFunctor; +import cz.seznam.euphoria.core.client.io.ListDataSink; +import cz.seznam.euphoria.core.client.io.ListDataSource; +import cz.seznam.euphoria.core.client.operator.FullJoin; +import cz.seznam.euphoria.core.client.operator.Join; +import cz.seznam.euphoria.core.client.operator.LeftJoin; +import cz.seznam.euphoria.core.client.operator.RightJoin; +import cz.seznam.euphoria.core.client.util.Pair; +import cz.seznam.euphoria.testing.DatasetAssert; +import java.util.Optional; +import org.junit.Test; + +/** + * Simple test suite for Join operator. + */ +public class JoinTest { + + @Test + public void simpleInnerJoinTest() { + final Flow flow = Flow.create(); + + ListDataSource> left = + ListDataSource.bounded( + asList( + Pair.of(1, "L v1"), Pair.of(1, "L v2"), + Pair.of(2, "L v1"), Pair.of(2, "L v2"), + Pair.of(3, "L v1") + )); + + ListDataSource> right = + ListDataSource.bounded( + asList( + Pair.of(1, 1), Pair.of(1, 10), + Pair.of(2, 20), + Pair.of(4, 40) + )); + + ListDataSink>> output = ListDataSink.get(); + + BinaryFunctor, Pair, Pair> joiner = + (l, r, c) -> c.collect(Pair.of(l.getSecond(), r.getSecond())); + + Join.of(flow.createInput(left), flow.createInput(right)) + .by(Pair::getFirst, Pair::getFirst) + .using(joiner) + .output() + .persist(output); + + BeamExecutor executor = TestUtils.createExecutor(); + executor.execute(flow); + + DatasetAssert.unorderedEquals(output.getOutputs(), + Pair.of(1, Pair.of("L v1", 1)), Pair.of(1, Pair.of("L v1", 10)), + Pair.of(1, Pair.of("L v2", 1)), Pair.of(1, Pair.of("L v2", 10)), + + Pair.of(2, Pair.of("L v1", 20)), Pair.of(2, Pair.of("L v2", 20)) + ); + + } + + @Test + public void simpleLeftJoinTest() { + final Flow flow = Flow.create(); + + ListDataSource> left = + ListDataSource.bounded( + asList( + Pair.of(1, "L v1"), Pair.of(1, "L v2"), + Pair.of(2, "L v1"), Pair.of(2, "L v2"), + Pair.of(3, "L v1") + )); + + ListDataSource> right = + ListDataSource.bounded( + asList( + Pair.of(1, 1), Pair.of(1, 10), + Pair.of(2, 20), + Pair.of(4, 40) + )); + + ListDataSink>> output = ListDataSink.get(); + + BinaryFunctor, Optional>, Pair> + joiner = (l, r, c) -> + c.collect(Pair.of(l.getSecond(), r.orElse(Pair.of(null, null)).getSecond())); + + LeftJoin.of(flow.createInput(left), flow.createInput(right)) + .by(Pair::getFirst, Pair::getFirst) + .using(joiner) + .output() + .persist(output); + + BeamExecutor executor = TestUtils.createExecutor(); + executor.execute(flow); + + DatasetAssert.unorderedEquals(output.getOutputs(), + Pair.of(1, Pair.of("L v1", 1)), Pair.of(1, Pair.of("L v1", 10)), + Pair.of(1, Pair.of("L v2", 1)), Pair.of(1, Pair.of("L v2", 10)), + + Pair.of(2, Pair.of("L v1", 20)), Pair.of(2, Pair.of("L v2", 20)), + + Pair.of(3, Pair.of("L v1", null)) + ); + + } + + @Test + public void simpleRightJoinTest() { + final Flow flow = Flow.create(); + + ListDataSource> left = + ListDataSource.bounded( + asList( + Pair.of(1, "L v1"), Pair.of(1, "L v2"), + Pair.of(2, "L v1"), Pair.of(2, "L v2"), + Pair.of(3, "L v1") + )); + + ListDataSource> right = + ListDataSource.bounded( + asList( + Pair.of(1, 1), Pair.of(1, 10), + Pair.of(2, 20), + Pair.of(4, 40) + )); + + ListDataSink>> output = ListDataSink.get(); + + BinaryFunctor>, Pair, Pair> + joiner = (l, r, c) -> + c.collect(Pair.of(l.orElse(Pair.of(null, null)).getSecond(), r.getSecond())); + + RightJoin.of(flow.createInput(left), flow.createInput(right)) + .by(Pair::getFirst, Pair::getFirst) + .using(joiner) + .output() + .persist(output); + + BeamExecutor executor = TestUtils.createExecutor(); + executor.execute(flow); + + DatasetAssert.unorderedEquals(output.getOutputs(), + Pair.of(1, Pair.of("L v1", 1)), Pair.of(1, Pair.of("L v1", 10)), + Pair.of(1, Pair.of("L v2", 1)), Pair.of(1, Pair.of("L v2", 10)), + + Pair.of(2, Pair.of("L v1", 20)), Pair.of(2, Pair.of("L v2", 20)), + + Pair.of(4, Pair.of(null, 40)) + ); + + } + + @Test + public void simpleFullJoinTest() { + final Flow flow = Flow.create(); + + ListDataSource> left = + ListDataSource.bounded( + asList( + Pair.of(1, "L v1"), Pair.of(1, "L v2"), + Pair.of(2, "L v1"), Pair.of(2, "L v2"), + Pair.of(3, "L v1") + )); + + ListDataSource> right = + ListDataSource.bounded( + asList( + Pair.of(1, 1), Pair.of(1, 10), + Pair.of(2, 20), + Pair.of(4, 40) + )); + + ListDataSink>> output = ListDataSink.get(); + + BinaryFunctor< + Optional>, Optional>, Pair> + joiner = (l, r, c) -> c.collect(Pair.of( + l.orElse(Pair.of(null, null)).getSecond(), r.orElse(Pair.of(null, null)).getSecond())); + + FullJoin.of(flow.createInput(left), flow.createInput(right)) + .by(Pair::getFirst, Pair::getFirst) + .using(joiner) + .output() + .persist(output); + + BeamExecutor executor = TestUtils.createExecutor(); + executor.execute(flow); + + DatasetAssert.unorderedEquals(output.getOutputs(), + Pair.of(1, Pair.of("L v1", 1)), Pair.of(1, Pair.of("L v1", 10)), + Pair.of(1, Pair.of("L v2", 1)), Pair.of(1, Pair.of("L v2", 10)), + + Pair.of(2, Pair.of("L v1", 20)), Pair.of(2, Pair.of("L v2", 20)), + + Pair.of(3, Pair.of("L v1", null)), + Pair.of(4, Pair.of(null, 40)) + ); + + } + +} diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/ReduceByKeyTest.java b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/ReduceByKeyTest.java index 7fb681c317afc..da9936ba6e54d 100644 --- a/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/ReduceByKeyTest.java +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/ReduceByKeyTest.java @@ -43,11 +43,8 @@ import cz.seznam.euphoria.core.client.util.Pair; import cz.seznam.euphoria.core.client.util.Sums; import cz.seznam.euphoria.testing.DatasetAssert; -import java.time.Duration; import java.util.Arrays; import java.util.Collections; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode; @@ -59,12 +56,6 @@ */ public class ReduceByKeyTest { - private BeamExecutor createExecutor() { - String[] args = {"--runner=DirectRunner"}; - PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class); - return new BeamExecutor(options).withAllowedLateness(Duration.ofHours(1)); - } - @Test public void testSimpleRBK() { final Flow flow = Flow.create(); @@ -84,7 +75,7 @@ public void testSimpleRBK() { .output() .persist(output); - BeamExecutor executor = createExecutor(); + BeamExecutor executor = TestUtils.createExecutor(); executor.execute(flow); DatasetAssert.unorderedEquals(output.getOutputs(), Pair.of(0, 8), Pair.of(1, 7)); @@ -134,7 +125,7 @@ public void testEventTime() { .output() .persist(sink); - BeamExecutor executor = createExecutor(); + BeamExecutor executor = TestUtils.createExecutor(); executor.execute(flow); DatasetAssert.unorderedEquals( @@ -203,7 +194,7 @@ public void testElementTimestamp() { .output() .persist(sink); - createExecutor().execute(flow); + TestUtils.createExecutor().execute(flow); DatasetAssert.unorderedEquals(sink.getOutputs(), 4, 6); } diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/TestUtils.java b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/TestUtils.java new file mode 100644 index 0000000000000..1cc188ca0e13a --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/TestUtils.java @@ -0,0 +1,17 @@ +package cz.seznam.euphoria.beam; + +import java.time.Duration; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; + +/** + * Collection of methods reused among tests. + */ +public class TestUtils { + + static BeamExecutor createExecutor() { + String[] args = {"--runner=DirectRunner"}; + PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class); + return new BeamExecutor(options).withAllowedLateness(Duration.ofHours(1)); + } +} diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/testkit/BeamExecutorProvider.java b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/testkit/BeamExecutorProvider.java index 9bb875a31f03a..b597c50d8e6a5 100644 --- a/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/testkit/BeamExecutorProvider.java +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/testkit/BeamExecutorProvider.java @@ -28,6 +28,7 @@ */ public interface BeamExecutorProvider extends ExecutorProvider { + @Override default ExecutorEnvironment newExecutorEnvironment() throws Exception { final String[] args = {"--runner=DirectRunner"}; final PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class); diff --git a/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/testkit/BeamOperatorsSuite.java b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/testkit/BeamOperatorsSuite.java index 11a95aa0f3592..445e6bb02f34e 100644 --- a/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/testkit/BeamOperatorsSuite.java +++ b/sdks/java/extensions/euphoria/euphoria-beam/src/test/java/cz/seznam/euphoria/beam/testkit/BeamOperatorsSuite.java @@ -16,6 +16,8 @@ package cz.seznam.euphoria.beam.testkit; import cz.seznam.euphoria.operator.test.FlatMapTest; +import cz.seznam.euphoria.operator.test.JoinTest; +import cz.seznam.euphoria.operator.test.ReduceByKeyTest; import cz.seznam.euphoria.operator.test.UnionTest; import cz.seznam.euphoria.operator.test.junit.ExecutorProvider; import cz.seznam.euphoria.operator.test.junit.ExecutorProviderRunner; @@ -33,10 +35,10 @@ // DistinctTest.class, // FilterTest.class, FlatMapTest.class, - // JoinTest.class, + JoinTest.class, // JoinWindowEnforcementTest.class, // MapElementsTest.class, - // ReduceByKeyTest.class, + ReduceByKeyTest.class, // ReduceStateByKeyTest.class, // SumByKeyTest.class, // TopPerKeyTest.class, diff --git a/sdks/java/extensions/euphoria/euphoria-operator-testkit/build.gradle b/sdks/java/extensions/euphoria/euphoria-operator-testkit/build.gradle index 44ebdb4241032..b2b96567f8273 100644 --- a/sdks/java/extensions/euphoria/euphoria-operator-testkit/build.gradle +++ b/sdks/java/extensions/euphoria/euphoria-operator-testkit/build.gradle @@ -5,5 +5,6 @@ description = "Apache Beam :: SDKs :: Java :: Extensions :: Euphoria Java 8 DSL" dependencies { compile project(':beam-sdks-java-extensions-euphoria-core') + compile project(':beam-sdks-java-extensions-euphoria-beam') compileOnly library.java.junit } diff --git a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/JoinTest.java b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/JoinTest.java index e8ed22e00cd0b..bac992c48cb50 100644 --- a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/JoinTest.java +++ b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/JoinTest.java @@ -17,12 +17,10 @@ import static org.junit.Assert.assertEquals; +import cz.seznam.euphoria.beam.io.KryoCoder; +import cz.seznam.euphoria.beam.window.BeamWindowing; import cz.seznam.euphoria.core.client.dataset.Dataset; -import cz.seznam.euphoria.core.client.dataset.windowing.Session; -import cz.seznam.euphoria.core.client.dataset.windowing.Time; import cz.seznam.euphoria.core.client.dataset.windowing.TimeInterval; -import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement; -import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.io.ListDataSource; @@ -32,23 +30,36 @@ import cz.seznam.euphoria.core.client.operator.LeftJoin; import cz.seznam.euphoria.core.client.operator.MapElements; import cz.seznam.euphoria.core.client.operator.RightJoin; -import cz.seznam.euphoria.core.client.triggers.NoopTrigger; -import cz.seznam.euphoria.core.client.triggers.Trigger; -import cz.seznam.euphoria.core.client.util.Either; import cz.seznam.euphoria.core.client.util.Pair; import cz.seznam.euphoria.core.client.util.Triple; import cz.seznam.euphoria.operator.test.accumulators.SnapshotProvider; import cz.seznam.euphoria.operator.test.junit.AbstractOperatorTest; import cz.seznam.euphoria.operator.test.junit.Processing; -import java.time.Duration; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode; +import org.joda.time.Instant; +import org.junit.Ignore; import org.junit.Test; -/** Test operator {@code Join}. */ +/** + * Test operator {@code Join}. + */ @Processing(Processing.Type.ALL) public class JoinTest extends AbstractOperatorTest { @@ -269,7 +280,10 @@ protected Dataset> getOutput( (Optional l, Optional r, Collector c) -> { c.collect(l.orElse(null) + "+" + r.orElse(null)); }) - .windowBy(new EvenOddWindowing()) + .windowBy(BeamWindowing.of( + new EvenOddWindowFn(), + AfterWatermark.pastEndOfWindow(), + AccumulationMode.DISCARDING_FIRED_PANES)) .output(); } @@ -319,7 +333,10 @@ protected Dataset> getOutput( (Integer l, Optional r, Collector c) -> { c.collect(l + "+" + r.orElse(null)); }) - .windowBy(new EvenOddWindowing()) + .windowBy(BeamWindowing.of( + new EvenOddWindowFn(), + AfterWatermark.pastEndOfWindow(), + AccumulationMode.DISCARDING_FIRED_PANES)) .output(); } @@ -364,7 +381,10 @@ protected Dataset> getOutput( (Optional l, Long r, Collector c) -> { c.collect(l.orElse(null) + "+" + r); }) - .windowBy(new EvenOddWindowing()) + .windowBy(BeamWindowing.of( + new EvenOddWindowFn(), + AfterWatermark.pastEndOfWindow(), + AccumulationMode.DISCARDING_FIRED_PANES)) .output(); } @@ -401,7 +421,7 @@ public List> getUnorderedOutput() { public void joinOnSessionWindowingNoEarlyTriggering() { execute( new JoinTestCase< - Pair, Pair, Triple>() { + Pair, Pair, Pair>() { @Override protected List> getLeftInput() { @@ -414,37 +434,41 @@ protected List> getRightInput() { } @Override - protected Dataset> getOutput( + protected Dataset> getOutput( Dataset> left, Dataset> right) { + left = AssignEventTime.of(left).using(Pair::getSecond).output(); right = AssignEventTime.of(right).using(Pair::getSecond).output(); - Dataset>> joined = + + Dataset>> joined = Join.of(left, right) .by(p -> "", p -> "") .using( - (Pair l, - Pair r, - Collector> c) -> - c.collect( - Triple.of( - (TimeInterval) c.getWindow(), l.getFirst(), r.getFirst()))) - .windowBy(Session.of(Duration.ofMillis(10))) + (Pair l, Pair r, + Collector> c) -> + c.collect(Pair.of(l.getFirst(), r.getFirst()))) + .windowBy(BeamWindowing.of( + Sessions.withGapDuration(org.joda.time.Duration.millis(10)), + AfterWatermark.pastEndOfWindow(), + AccumulationMode.DISCARDING_FIRED_PANES)) .output(); return MapElements.of(joined).using(Pair::getSecond).output(); } @Override - public List> getUnorderedOutput() { - TimeInterval expectedWindow = new TimeInterval(1, 14); + public List> getUnorderedOutput() { return Arrays.asList( - Triple.of(expectedWindow, "fi", "ha"), - Triple.of(expectedWindow, "fi", "ho"), - Triple.of(expectedWindow, "fa", "ha"), - Triple.of(expectedWindow, "fa", "ho")); + Pair.of("fi", "ha"), + Pair.of("fi", "ho"), + Pair.of("fa", "ha"), + Pair.of("fa", "ho")); } }); } + @Ignore( + "This test is based on access to various objects through Environment which is " + + "unsupported feature. It may be possible to add this feature in future.") @Test public void testJoinAccumulators() { execute( @@ -464,8 +488,10 @@ protected List> getRightInput() { @Override protected Dataset> getOutput( Dataset> left, Dataset> right) { + left = AssignEventTime.of(left).using(Pair::getSecond).output(); right = AssignEventTime.of(right).using(Pair::getSecond).output(); + Dataset>> joined = Join.of(left, right) .by(p -> "", p -> "") @@ -478,7 +504,11 @@ protected Dataset> getOutput( c.getHistogram("hist-" + l.getFirst().charAt(1)).add(2345, 8); c.collect(Triple.of(window, l.getFirst(), r.getFirst())); }) - .windowBy(Time.of(Duration.ofMillis(3))) +// .windowBy(Time.of(Duration.ofMillis(3))) + .windowBy(BeamWindowing.of( + FixedWindows.of(org.joda.time.Duration.millis(3)), + AfterWatermark.pastEndOfWindow(), + AccumulationMode.DISCARDING_FIRED_PANES)) .output(); return MapElements.of(joined).using(Pair::getSecond).output(); } @@ -509,11 +539,9 @@ public void validateAccumulators(SnapshotProvider snapshots) { /** * Base for join test cases. - * @param - * @param - * @param */ public abstract static class JoinTestCase implements TestCase { + @Override public Dataset getOutput(Flow flow, boolean bounded) { Dataset left = flow.createInput(ListDataSource.of(bounded, getLeftInput())); @@ -528,36 +556,87 @@ public Dataset getOutput(Flow flow, boolean bounded) { protected abstract List getRightInput(); } - /** Stable windowing for test purposes. */ - static class EvenOddWindowing implements Windowing, IntWindow> { + + /** + * Elements with even numeric values are are assigned to one 'even' window. All others are + * assigned to window named 'win: #', where '#' is value of assigned element. + */ + private static class EvenOddWindowFn extends WindowFn, BoundedWindow> { + + private static final NamedGlobalWindow EVEN_WIN = new NamedGlobalWindow("even"); @Override - public Iterable assignWindowsToElement( - WindowedElement> input) { - int element; - Either unwrapped = input.getElement(); - if (unwrapped.isLeft()) { - element = unwrapped.left(); + @SuppressFBWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") + public Collection assignWindows(AssignContext c) throws Exception { + KV element = c.element(); + + Number value = element.getValue(); + + if (value == null) { + return Collections.singleton(EVEN_WIN); + } + + NamedGlobalWindow win; + if (value.longValue() % 2 == 0) { + win = EVEN_WIN; } else { - element = (int) (long) unwrapped.right(); + win = new NamedGlobalWindow("win: " + value.longValue()); } - final int label = element % 2 == 0 ? 0 : element; - return Collections.singleton(new IntWindow(label)); + + return Collections.singleton(win); } @Override - public Trigger getTrigger() { - return NoopTrigger.get(); + public void mergeWindows(MergeContext c) throws Exception { + // no merging } @Override - public boolean equals(Object obj) { - return obj instanceof EvenOddWindowing; + public boolean isCompatible(WindowFn other) { + return other instanceof EvenOddWindowFn; + } + + @Override + public Coder windowCoder() { + return new KryoCoder<>(); + } + + @Override + @Nullable + public WindowMappingFn getDefaultWindowMappingFn() { + return null; + } + + @Override + public boolean isNonMerging() { + return true; + } + } + + private static class NamedGlobalWindow extends BoundedWindow { + + private String name; + + public NamedGlobalWindow(String name) { + this.name = name; + } + + @Override + public Instant maxTimestamp() { + return GlobalWindow.INSTANCE.maxTimestamp(); + } + + @Override + public boolean equals(Object other) { + if (other instanceof NamedGlobalWindow) { + return name.equals(((NamedGlobalWindow) other).name); + } + return false; } @Override public int hashCode() { - return 0; + return name.hashCode(); } } } diff --git a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/ReduceByKeyTest.java b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/ReduceByKeyTest.java index 7408d153a6cb3..96d5acbf59d1f 100644 --- a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/ReduceByKeyTest.java +++ b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/ReduceByKeyTest.java @@ -21,12 +21,11 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import cz.seznam.euphoria.beam.io.KryoCoder; +import cz.seznam.euphoria.beam.window.BeamWindowing; import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.dataset.windowing.Count; -import cz.seznam.euphoria.core.client.dataset.windowing.GlobalWindowing; import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing; -import cz.seznam.euphoria.core.client.dataset.windowing.Session; -import cz.seznam.euphoria.core.client.dataset.windowing.Time; import cz.seznam.euphoria.core.client.dataset.windowing.TimeInterval; import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement; @@ -42,7 +41,6 @@ import cz.seznam.euphoria.core.client.operator.state.StateContext; import cz.seznam.euphoria.core.client.operator.state.ValueStorage; import cz.seznam.euphoria.core.client.operator.state.ValueStorageDescriptor; -import cz.seznam.euphoria.core.client.triggers.CountTrigger; import cz.seznam.euphoria.core.client.triggers.NoopTrigger; import cz.seznam.euphoria.core.client.triggers.Trigger; import cz.seznam.euphoria.core.client.triggers.TriggerContext; @@ -54,7 +52,6 @@ import cz.seznam.euphoria.operator.test.accumulators.SnapshotProvider; import cz.seznam.euphoria.operator.test.junit.AbstractOperatorTest; import cz.seznam.euphoria.operator.test.junit.Processing; -import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -64,22 +61,36 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.Stream; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; +import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode; +import org.joda.time.Instant; +import org.junit.Ignore; import org.junit.Test; -/** Test operator {@code ReduceByKey}. */ +/** + * Test operator {@code ReduceByKey}. + */ @Processing(Processing.Type.ALL) public class ReduceByKeyTest extends AbstractOperatorTest { - /** Validates the output type upon a `.reduceBy` operation on windows of size one. */ + /** + * Validates the output type upon a `.reduceBy` operation on global window. + */ @Test public void testReductionType0() { execute( - new AbstractTestCase>>( - /* don't parallelize this test, because it doesn't work - * well with count windows */ - 1) { + new AbstractTestCase>>() { @Override protected List getInput() { return Arrays.asList(1, 2, 3, 4, 5, 6, 7, 9); @@ -91,7 +102,10 @@ protected Dataset>> getOutput(Dataset input) .keyBy(e -> e % 2) .valueBy(e -> e) .reduceBy(s -> s.collect(Collectors.toSet())) - .windowBy(Count.of(3)) + .windowBy(BeamWindowing.of( + new GlobalWindows(), + AfterWatermark.pastEndOfWindow(), + AccumulationMode.DISCARDING_FIRED_PANES)) .output(); } @@ -99,20 +113,18 @@ protected Dataset>> getOutput(Dataset input) public List>> getUnorderedOutput() { return Arrays.asList( Pair.of(0, Sets.newHashSet(2, 4, 6)), - Pair.of(1, Sets.newHashSet(1, 3, 5)), - Pair.of(1, Sets.newHashSet(7, 9))); + Pair.of(1, Sets.newHashSet(1, 3, 5, 7, 9))); } }); } - /** Validates the output type upon a `.reduceBy` operation on windows of size one. */ + /** + * Validates the output type upon a `.reduceBy` operation on global window. + */ @Test public void testReductionType0_outputValues() { execute( - new AbstractTestCase>( - /* don't parallelize this test, because it doesn't work - * well with count windows */ - 1) { + new AbstractTestCase>() { @Override protected List getInput() { return Arrays.asList(1, 2, 3, 4, 5, 6, 7, 9); @@ -124,19 +136,25 @@ protected Dataset> getOutput(Dataset input) { .keyBy(e -> e % 2) .valueBy(e -> e) .reduceBy(s -> s.collect(Collectors.toSet())) - .windowBy(Count.of(3)) + .windowBy(BeamWindowing.of( + new GlobalWindows(), + AfterWatermark.pastEndOfWindow(), + AccumulationMode.DISCARDING_FIRED_PANES)) .outputValues(); } @Override public List> getUnorderedOutput() { return Arrays.asList( - Sets.newHashSet(2, 4, 6), Sets.newHashSet(1, 3, 5), Sets.newHashSet(7, 9)); + Sets.newHashSet(2, 4, 6), Sets.newHashSet(1, 3, 5, 7, 9)); } }); } - /** Validates the output type upon a `.reduceBy` operation on windows of size one. */ + /** + * Validates the output type upon a `.reduceBy` operation on global window. + */ + @Ignore("Sorting of values is not supported yet.") @Test public void testReductionType0WithSortedValues() { execute( @@ -172,7 +190,10 @@ protected Dataset>>> getOutput(Dataset } return cmp; }) - .windowBy(GlobalWindowing.get()) + .windowBy(BeamWindowing.of( + new GlobalWindows(), + AfterWatermark.pastEndOfWindow(), + AccumulationMode.DISCARDING_FIRED_PANES)) .output(); } @@ -192,7 +213,9 @@ public void validate(List>>> outputs) }); } - /** Validates the output type upon a `.reduceBy` operation on windows of size one. */ + /** + * Validates the output type upon a `.reduceBy` operation on windows of size one. + */ @Test public void testReductionType0MultiValues() { execute( @@ -211,7 +234,11 @@ protected Dataset> getOutput(Dataset input) { return ReduceByKey.of(input) .keyBy(e -> e % 2) .reduceBy(Fold.whileEmittingEach(0, (a, b) -> a + b)) - .windowBy(Count.of(3)) +// .windowBy(Count.of(3)) + .windowBy(BeamWindowing.of( + new GlobalWindows(), + AfterWatermark.pastEndOfWindow(), + AccumulationMode.DISCARDING_FIRED_PANES)) .output(); } @@ -232,12 +259,13 @@ public void validate(List> output) { assertEquals(Arrays.asList(2, 6, 12), byKey.get(0)); assertNotNull(byKey.get(1)); - assertEquals(Sets.newHashSet(1, 4, 9, 7, 16), new HashSet<>(byKey.get(1))); + assertEquals(Sets.newHashSet(1, 4, 9, 16, 25), new HashSet<>(byKey.get(1))); } @Override public List> getUnorderedOutput() { - return Arrays.asList(Pair.of(0, 12), Pair.of(1, 9), Pair.of(1, 16)); +// return Arrays.asList(Pair.of(0, 12), Pair.of(1, 9), Pair.of(1, 16)); + return Arrays.asList(Pair.of(0, 12), Pair.of(1, 25)); } }); } @@ -254,7 +282,11 @@ protected Dataset> getOutput(Dataset> in .keyBy(Pair::getFirst) .valueBy(e -> 1L) .combineBy(Sums.ofLongs()) - .windowBy(Time.of(Duration.ofSeconds(1))) +// .windowBy(Time.of(Duration.ofSeconds(1))) + .windowBy(BeamWindowing.of( + FixedWindows.of(org.joda.time.Duration.standardSeconds(1)), + AfterWatermark.pastEndOfWindow(), + AccumulationMode.DISCARDING_FIRED_PANES)) .output(); } @@ -316,7 +348,11 @@ protected Dataset> getOutput(Dataset input) { .keyBy(e -> e % 3) .valueBy(e -> 1L) .combineBy(Sums.ofLongs()) - .windowBy(new TestWindowing()) +// .windowBy(new TestWindowing()) + .windowBy(BeamWindowing.of( + new TestWindowFn(), + AfterWatermark.pastEndOfWindow(), + AccumulationMode.DISCARDING_FIRED_PANES)) .output(); } @@ -396,6 +432,7 @@ protected Dataset> getOutput(Dataset input) { }); } + @Ignore("Sorting of values is not supported yet.") @Processing(Processing.Type.BOUNDED) @Test public void testReduceSorted() { @@ -438,6 +475,7 @@ protected Dataset>> getOutput(Dataset }); } + @Ignore("Test adaption to Beam windowing failed so far.") @Test public void testMergingAndTriggering() { execute( @@ -447,14 +485,14 @@ public void testMergingAndTriggering() { protected List> getInput() { return Arrays.asList( Pair.of("a", 20L), - Pair.of("c", 3000L), + Pair.of("c", 3_000L), Pair.of("b", 10L), Pair.of("b", 100L), - Pair.of("a", 4000L), + Pair.of("a", 4_000L), Pair.of("c", 300L), - Pair.of("b", 1000L), - Pair.of("b", 50000L), - Pair.of("a", 100000L), + Pair.of("b", 1_000L), + Pair.of("b", 50_000L), + Pair.of("a", 100_000L), Pair.of("a", 800L), Pair.of("a", 80L)); } @@ -465,7 +503,11 @@ protected Dataset> getOutput(Dataset> inpu .keyBy(Pair::getFirst) .valueBy(Pair::getSecond) .combineBy(Sums.ofLongs()) - .windowBy(new CWindowing<>(3)) +// .windowBy(new CWindowing<>(3)) + .windowBy(BeamWindowing.of( + new MergingByBucketSizeWindowFn<>(3), + AfterWatermark.pastEndOfWindow(), + AccumulationMode.DISCARDING_FIRED_PANES)) .output(); } @@ -474,16 +516,16 @@ protected Dataset> getOutput(Dataset> inpu public List> getUnorderedOutput() { return Arrays.asList( Pair.of("a", 880L), - Pair.of("a", 104020L), - Pair.of("b", 1110L), - Pair.of("b", 50000L), - Pair.of("c", 3300L)); + Pair.of("a", 104_020L), + Pair.of("b", 1_110L), + Pair.of("b", 50_000L), + Pair.of("c", 3_300L)); } }); } // ---------------------------------------------------------------------------- - + @Ignore("Test depends on yet unsupported functionality (access to window from Collector). ") @Test public void testSessionWindowing() { execute( @@ -512,13 +554,16 @@ protected Dataset>> getOutput( .keyBy(e -> e.getFirst().charAt(0) - '0') .valueBy(Pair::getFirst) .reduceBy(s -> s.collect(Collectors.toSet())) - .windowBy(Session.of(Duration.ofMillis(5))) + .windowBy(BeamWindowing.of( + FixedWindows.of(org.joda.time.Duration.millis(5)), + AfterWatermark.pastEndOfWindow(), + AccumulationMode.DISCARDING_FIRED_PANES)) .output(); return FlatMap.of(reduced) .using( (UnaryFunctor< - Pair>, Triple>>) + Pair>, Triple>>) (elem, context) -> context.collect( Triple.of( @@ -543,46 +588,45 @@ public List>> getUnorderedOutput() { }); } + + @Ignore("Test depends on unsupported ReduceStateByKey operator.") @Test public void testElementTimestamp() { - class AssertingWindowing implements Windowing { + + class AssertingWindowFn extends WindowFn { + @Override - public Iterable assignWindowsToElement(WindowedElement el) { + public Collection assignWindows(AssignContext c) throws Exception { + long timestamp = c.timestamp().getMillis(); + // ~ we expect the 'element time' to be the end of the window which produced the // element in the preceding upstream (stateful and windowed) operator assertTrue( - "Invalid timestamp " + el.getTimestamp(), - el.getTimestamp() == 15_000L - 1 || el.getTimestamp() == 25_000L - 1); - return Collections.singleton(new TimeInterval(0, Long.MAX_VALUE)); + "Invalid timestamp " + timestamp, + timestamp == 15_000L - 1 || timestamp == 25_000L - 1); + + return Collections.singleton(GlobalWindow.INSTANCE); } - @SuppressWarnings("unchecked") @Override - public Trigger getTrigger() { - return new CountTrigger(1) { - @Override - public boolean isStateful() { - return false; - } + public void mergeWindows(MergeContext c) throws Exception { - @Override - public TriggerResult onElement(long time, Window window, TriggerContext ctx) { - // ~ we expect the 'time' to be the end of the window which produced the - // element in the preceding upstream (stateful and windowed) operator - assertTrue("Invalid timestamp " + time, time == 15_000L - 1 || time == 25_000L - 1); - return super.onElement(time, window, ctx); - } - }; } @Override - public boolean equals(Object obj) { - return obj instanceof AssertingWindowing; + public boolean isCompatible(WindowFn other) { + return other instanceof GlobalWindows; + } + + @Override + public Coder windowCoder() { + return new KryoCoder<>(); } @Override - public int hashCode() { - return 0; + @Nullable + public WindowMappingFn getDefaultWindowMappingFn() { + return null; } } @@ -610,7 +654,11 @@ protected Dataset getOutput(Dataset> input) { .keyBy(e -> "", TypeHint.ofString()) .valueBy(Pair::getFirst, TypeHint.ofInt()) .combineBy(Sums.ofInts(), TypeHint.ofInt()) - .windowBy(Time.of(Duration.ofSeconds(5))) +// .windowBy(Time.of(Duration.ofSeconds(5))) + .windowBy(BeamWindowing.of( + FixedWindows.of(org.joda.time.Duration.standardSeconds(5)), + AfterWatermark.pastEndOfWindow(), + AccumulationMode.DISCARDING_FIRED_PANES)) .output(); // ~ now use a custom windowing with a trigger which does // the assertions subject to this test (use RSBK which has to @@ -621,7 +669,10 @@ protected Dataset getOutput(Dataset> input) { .valueBy(Pair::getSecond) .stateFactory(SumState::new) .mergeStatesBy(SumState::combine) - .windowBy(new AssertingWindowing<>()) + .windowBy(BeamWindowing.of( + new AssertingWindowFn<>(), + AfterWatermark.pastEndOfWindow(), + AccumulationMode.DISCARDING_FIRED_PANES)) .output(); return FlatMap.of(output) .using( @@ -649,7 +700,11 @@ protected Dataset> getOutput(Dataset> input) { .keyBy(Pair::getFirst) .valueBy(e -> 1L) .combineBy(Sums.ofLongs()) - .windowBy(Time.of(Duration.ofSeconds(1))) +// .windowBy(Time.of(Duration.ofSeconds(1))) + .windowBy(BeamWindowing.of( + FixedWindows.of(org.joda.time.Duration.standardSeconds(1)), + AfterWatermark.pastEndOfWindow(), + AccumulationMode.DISCARDING_FIRED_PANES)) .output(); } @@ -701,7 +756,11 @@ protected Dataset> getOutput(Dataset input) { } ctx.collect(a + b); })) - .windowBy(GlobalWindowing.get()) +// .windowBy(GlobalWindowing.get()) + .windowBy(BeamWindowing.of( + new GlobalWindows(), + AfterWatermark.pastEndOfWindow(), + AccumulationMode.DISCARDING_FIRED_PANES)) .output(); } @@ -743,8 +802,101 @@ public int hashCode() { } } + private static class TestWindowFn extends WindowFn { + + @Override + public Collection assignWindows(AssignContext c) throws Exception { + Number element = c.element(); + return Collections.singleton(new CountWindow(element.longValue() / 4)); + } + + @Override + public void mergeWindows(MergeContext c) throws Exception { + + } + + @Override + public boolean isNonMerging() { + return true; + } + + @Override + public boolean isCompatible(WindowFn other) { + return false; + } + + @Override + public Coder windowCoder() { + return new KryoCoder<>(); + } + + @Override + @Nullable + public WindowMappingFn getDefaultWindowMappingFn() { + return null; + } + } + // ~ ------------------------------------------------------------------------------ + private static class CountWindow extends BoundedWindow { + + private long value; + + public CountWindow(long value) { + this.value = value; + } + + @Override + public Instant maxTimestamp() { + return GlobalWindow.INSTANCE.maxTimestamp(); + } + + @Override + public boolean equals(Object other) { + if (other instanceof CountWindow) { + return value == (((CountWindow) other).value); + } + return false; + } + + @Override + public int hashCode() { + return Long.hashCode(value); + } + } + + private static class UniqueWindow extends BoundedWindow { + + private static final AtomicInteger idCounter = new AtomicInteger(); + private final int id; + + public UniqueWindow() { + this.id = idCounter.getAndIncrement(); + } + + @Override + public Instant maxTimestamp() { + return GlobalWindow.INSTANCE.maxTimestamp(); + } + + @Override + public int hashCode() { + return Integer.hashCode(id); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof UniqueWindow + && this.id == ((UniqueWindow) obj).id; + } + + @Override + public String toString() { + return "UniqueWindow{id=" + id + "}"; + } + } + /** * Every instance is unique: this allows us to exercise merging. */ @@ -789,6 +941,7 @@ public String toString() { // count windowing; firing based on window.bucket (size of the window) static final class CWindowTrigger implements Trigger { + private final ValueStorageDescriptor countDesc = ValueStorageDescriptor.of("count", Long.class, 0L, (x, y) -> x + y); @@ -818,7 +971,62 @@ public void onMerge(CWindow w, TriggerContext.TriggerMergeContext ctx) { } } + private static class MergingByBucketSizeWindowFn extends WindowFn { + + private final int bucketSize; + + private MergingByBucketSizeWindowFn(int bucketSize) { + this.bucketSize = bucketSize; + } + + @Override + public Collection assignWindows(AssignContext c) throws Exception { + return Collections.singleton(new UniqueWindow()); + } + + @Override + public void mergeWindows(MergeContext c) throws Exception { + +// merge windows up to bucket size + Collection windows = c.windows(); + List merges = new ArrayList<>(); + for (UniqueWindow w : windows) { + + merges.add(w); + + if (merges.size() == bucketSize) { // time to merge + c.merge(merges, w); + merges.clear(); + } + + } + + if (merges.size() > 1) { + c.merge(merges, merges.get(merges.size() - 1)); + } + + } + + @Override + public boolean isCompatible(WindowFn other) { + return other instanceof MergingByBucketSizeWindowFn + && this.bucketSize == ((MergingByBucketSizeWindowFn) other).bucketSize; + } + + @Override + public Coder windowCoder() { + return new KryoCoder<>(); + } + + @Override + @Nullable + public WindowMappingFn getDefaultWindowMappingFn() { + return null; + } + } + static final class CWindowing implements MergingWindowing { + private final int size; CWindowing(int size) { @@ -867,6 +1075,7 @@ public int hashCode() { } static class SumState implements State { + private final ValueStorage sum; SumState(StateContext context, Collector collector) { @@ -898,7 +1107,9 @@ public void close() { } } - /** String with invalid hash code implementation returning constant. */ + /** + * String with invalid hash code implementation returning constant. + */ public static class Word { private final String str;