From d58c05507e63c10fcf007c84bfc2d437a4a68356 Mon Sep 17 00:00:00 2001 From: Adam Horky Date: Tue, 25 Apr 2017 14:30:32 +0200 Subject: [PATCH] #99 Sort operator support * #! - eclipse compilation * #40 - Sort operator with in-memory RSBK sorting * #40 - Spark SortTranslator * #40 - Flink SortTranslator * #40 - RangePartitioning * #40 - PR#99 --- .../partitioning/RangePartitioner.java | 39 ++ .../partitioning/RangePartitioning.java | 58 +++ .../dataset/windowing/GlobalWindowing.java | 3 +- .../dataset/windowing/MergingWindowing.java | 2 +- .../dataset/windowing/TimeInterval.java | 4 +- .../client/dataset/windowing/TimedWindow.java | 2 +- .../core/client/dataset/windowing/Window.java | 4 +- .../euphoria/core/client/operator/Join.java | 2 +- .../core/client/operator/ReduceWindow.java | 4 - .../euphoria/core/client/operator/Sort.java | 345 ++++++++++++++++++ .../core/client/operator/SortTest.java | 157 ++++++++ ...pPerKeyKeyTest.java => TopPerKeyTest.java} | 2 +- .../java/cz/seznam/euphoria/flink/Utils.java | 15 +- .../flink/batch/BatchFlowTranslator.java | 3 + .../flink/batch/FlatMapTranslator.java | 2 +- .../euphoria/flink/batch/SortTranslator.java | 119 ++++++ .../euphoria/flink/batch/UnionTranslator.java | 2 - .../flink/streaming/FlatMapTranslator.java | 2 +- .../streaming/ReduceStateByKeyTranslator.java | 2 +- .../streaming/RBKAttachedWindowingTest.java | 4 +- .../euphoria/inmem/InMemExecutorTest.java | 7 +- .../operator/test/AllOperatorsSuite.java | 1 + .../euphoria/operator/test/IntWindow.java | 2 +- .../operator/test/ReduceByKeyTest.java | 2 +- .../euphoria/operator/test/SortTest.java | 275 ++++++++++++++ .../euphoria/spark/FlatMapTranslator.java | 2 +- .../euphoria/spark/ReduceByKeyTranslator.java | 5 +- .../spark/ReduceStateByKeyTranslator.java | 4 - .../seznam/euphoria/spark/SortTranslator.java | 172 +++++++++ .../euphoria/spark/SparkFlowTranslator.java | 3 + .../euphoria/spark/UnionTranslator.java | 1 - 31 files changed, 1208 insertions(+), 37 deletions(-) create mode 100644 sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/partitioning/RangePartitioner.java create mode 100644 sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/partitioning/RangePartitioning.java create mode 100644 sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Sort.java create mode 100644 sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/SortTest.java rename sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/{TopPerKeyKeyTest.java => TopPerKeyTest.java} (99%) create mode 100644 sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/SortTranslator.java create mode 100644 sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/SortTest.java create mode 100644 sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/SortTranslator.java diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/partitioning/RangePartitioner.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/partitioning/RangePartitioner.java new file mode 100644 index 0000000000000..d888e4983f289 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/partitioning/RangePartitioner.java @@ -0,0 +1,39 @@ +/** + * Copyright 2016-2017 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.core.client.dataset.partitioning; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; + +class RangePartitioner & Serializable> +implements Partitioner { + + private final List ranges; + + public RangePartitioner(List ranges) { + this.ranges = ranges; + } + + @Override + public int getPartition(T element) { + int search = Collections.binarySearch(ranges, element); + if (search < 0) { + return ((-search) - 1); + } + return search; + } +} \ No newline at end of file diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/partitioning/RangePartitioning.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/partitioning/RangePartitioning.java new file mode 100644 index 0000000000000..b31977bc58757 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/partitioning/RangePartitioning.java @@ -0,0 +1,58 @@ +/** + * Copyright 2016-2017 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.core.client.dataset.partitioning; + +import cz.seznam.euphoria.shaded.guava.com.google.common.base.Preconditions; +import cz.seznam.euphoria.shaded.guava.com.google.common.collect.Comparators; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; + +/** + * Partitioning based on ranges simulating total order. The ranges are expected to be provided + * already sorted. + * @param type of ranges - must be a subtype {@link Comparable} as well as {@link Serializable} + */ +public class RangePartitioning & Serializable> + implements Partitioning { + + private final List ranges; + + public RangePartitioning(List ranges) { + this.ranges = new ArrayList<>(ranges); + Preconditions.checkArgument( + Comparators.isInStrictOrder(ranges, Comparator.naturalOrder()), + "Ranges are expected to be sorted!"); + } + + @SuppressWarnings("unchecked") + public RangePartitioning(T ... ranges) { + this(Arrays.asList(ranges)); + } + + @Override + public Partitioner getPartitioner() { + return new RangePartitioner<>(ranges); + } + + @Override + public int getNumPartitions() { + return ranges.size() + 1; + } +} diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/GlobalWindowing.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/GlobalWindowing.java index 84b46bd529464..625ab4d94c885 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/GlobalWindowing.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/GlobalWindowing.java @@ -29,8 +29,7 @@ public final class GlobalWindowing implements Windowing { public static final class Window - extends cz.seznam.euphoria.core.client.dataset.windowing.Window - implements Comparable { + extends cz.seznam.euphoria.core.client.dataset.windowing.Window { static final Window INSTANCE = new Window(); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/MergingWindowing.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/MergingWindowing.java index 554dc6f70669d..819d24d576df0 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/MergingWindowing.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/MergingWindowing.java @@ -19,7 +19,7 @@ import java.util.Collection; -public interface MergingWindowing> +public interface MergingWindowing> extends Windowing { /** diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/TimeInterval.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/TimeInterval.java index c2591c2c1c0f0..da53196d2d47a 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/TimeInterval.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/TimeInterval.java @@ -16,8 +16,8 @@ package cz.seznam.euphoria.core.client.dataset.windowing; public final class TimeInterval - extends Window - implements TimedWindow, Comparable { + extends Window + implements TimedWindow { private final long startMillis; private final long endMillis; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/TimedWindow.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/TimedWindow.java index 21a78a88e9b9a..9e9d1cbea43e0 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/TimedWindow.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/TimedWindow.java @@ -19,7 +19,7 @@ * Extension to {@link cz.seznam.euphoria.core.client.dataset.windowing.Window} * defining time based constraints on the implementor. */ -public interface TimedWindow { +public interface TimedWindow { /** * Defines the timestamp/watermark until this window is considered open. diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/Window.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/Window.java index 639dd763f3aed..35715181a9e46 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/Window.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/Window.java @@ -22,10 +22,10 @@ * thereby grouping input elements into chunks * for further processing in small (micro-)batches. *

- * Subclasses should implement {@code equals()} and {@code hashCode()} so that logically + * Subclasses should implement {@code equals()}, {@code hashCode()} and {@code compareTo()} so that logically * same windows are treated the same. */ -public abstract class Window implements Serializable { +public abstract class Window> implements Serializable, Comparable { @Override public abstract int hashCode(); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java index 53c1fbdc570e1..57abd46bec068 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java @@ -442,7 +442,7 @@ public BinaryFunctor getJoiner() { OUT, JoinState, W> reduce; name = getName() + "::ReduceStateByKey"; - reduce = new ReduceStateByKey<>( + reduce = new ReduceStateByKey( name, flow, union.output(), diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java index ed8dd98a45d4c..6cad458cf9d9f 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java @@ -203,8 +203,4 @@ public ReduceFunction getReducer() { return DAG.of(reduceByKey, format); } - - - - } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Sort.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Sort.java new file mode 100644 index 0000000000000..06c39431b3782 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Sort.java @@ -0,0 +1,345 @@ +/** + * Copyright 2016-2017 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.core.client.operator; + +import cz.seznam.euphoria.core.annotation.operator.Derived; +import cz.seznam.euphoria.core.annotation.operator.StateComplexity; +import cz.seznam.euphoria.core.client.dataset.Dataset; +import cz.seznam.euphoria.core.client.dataset.partitioning.HashPartitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioner; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.RangePartitioning; +import cz.seznam.euphoria.core.client.dataset.windowing.Window; +import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; +import cz.seznam.euphoria.core.client.flow.Flow; +import cz.seznam.euphoria.core.client.functional.UnaryFunction; +import cz.seznam.euphoria.core.client.graph.DAG; +import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.operator.state.ListStorage; +import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor; +import cz.seznam.euphoria.core.client.operator.state.State; +import cz.seznam.euphoria.core.client.operator.state.StateFactory; +import cz.seznam.euphoria.core.client.operator.state.StorageProvider; +import cz.seznam.euphoria.core.client.util.Pair; +import cz.seznam.euphoria.shaded.guava.com.google.common.base.Preconditions; +import cz.seznam.euphoria.shaded.guava.com.google.common.collect.Lists; +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * Sorts the input dataset.

+ * + * The user is supposed to provide a function that extracts a comparable object + * from the input object. The extracted object is than passed to a provided partitioner + * to partition the result.

+ * + * To ensure that all elements from a specific range end up + * in the same partition - i.e. perform total sort ordering - it is recommended + * to use {@link RangePartitioning}, otherwise the user is responsible for his partitioning.

+ * + * If user does not provide custom {@link Partitioning} or {@link Partitioner} and number of input + * partitions differs from 1 (single partition), the program crashes, because runtime does not + * know how to partition the result. Input sampling is not supported for now.

+ * + * Example: + * + *

{@code
+ *  Dataset> input = ...;
+ *  Dataset> sorted =
+ *         Sort.named("SORTED-BY-SCORE")
+ *            .of(input)
+ *            .by(Pair::getSecond)
+ *            .partitioning(new RangePartitioning(0.2, 0.4, 0.6, 0.8)) // numPartitions = 5
+ *            .output();
+ * }
+ * + * The above example sorts the paired input by the second field. The sorted elements can be + * found in 5 partitions of corresponding intervals as follows: + *
    + *
  • -Inf to 0.2
  • + *
  • 0.2 to 0.4
  • + *
  • 0.4 to 0.6
  • + *
  • 0.6 to 0.8
  • + *
  • 0.8 to Inf
  • + *
+ * + */ +@Derived( + state = StateComplexity.LINEAR, + repartitions = 1 +) +public class Sort, W extends Window> + extends StateAwareWindowWiseSingleInputOperator> { + + private static final class Sorted + extends State + implements StateSupport.MergeFrom> { + + @SuppressWarnings("unchecked") + static final ListStorageDescriptor SORT_STATE_DESCR = + ListStorageDescriptor.of("sort", (Class) Object.class); + + final ListStorage curr; + final Comparator cmp; + + @SuppressWarnings("unchecked") + Sorted(Context context, StorageProvider storageProvider, Comparator cmp) { + super(context); + this.curr = (ListStorage) storageProvider.getListStorage(SORT_STATE_DESCR); + this.cmp = cmp; + } + + @Override + public void add(V element) { + curr.add(element); + } + + @Override + public void flush() { + List toSort = Lists.newArrayList(curr.get()); + Collections.sort(toSort, cmp); + toSort.forEach(getContext()::collect); + } + + @Override + public void close() { + curr.clear(); + } + + @Override + public void mergeFrom(Sorted other) { + for (V v : other.curr.get()) { + add(v); + } + } + } + + public static class OfBuilder { + private final String name; + + OfBuilder(String name) { + this.name = name; + } + + public ByBuilder of(Dataset input) { + return new ByBuilder<>(name, input); + } + } + + public static class ByBuilder { + private final String name; + private final Dataset input; + + ByBuilder(String name, Dataset input) { + this.name = requireNonNull(name); + this.input = requireNonNull(input); + } + + public > WindowByBuilder by(UnaryFunction sortByFn) { + return new WindowByBuilder<>(name, input, requireNonNull(sortByFn)); + } + } + + public static class WindowByBuilder> + extends PartitioningBuilder> + implements cz.seznam.euphoria.core.client.operator.OutputBuilder + { + private final String name; + private final Dataset input; + private final UnaryFunction sortByFn; + + WindowByBuilder(String name, + Dataset input, + UnaryFunction sortByFn) + { + super(new DefaultPartitioning<>(input.getNumPartitions())); + + this.name = requireNonNull(name); + this.input = requireNonNull(input); + this.sortByFn = requireNonNull(sortByFn); + } + + public + OutputBuilder + windowBy(Windowing windowing) { + return windowBy(windowing, null); + } + + public + OutputBuilder + windowBy(Windowing windowing, ExtractEventTime eventTimeAssigner) { + return new OutputBuilder<>(name, input, + sortByFn, this, requireNonNull(windowing), eventTimeAssigner); + } + + @Override + public Dataset output() { + return new OutputBuilder<>( + name, input, sortByFn, this, null, null).output(); + } + } + + public static class OutputBuilder< + IN, S extends Comparable, W extends Window> + extends PartitioningBuilder> + implements cz.seznam.euphoria.core.client.operator.OutputBuilder + { + private final String name; + private final Dataset input; + private final UnaryFunction sortByFn; + @Nullable + private final Windowing windowing; + @Nullable + private final ExtractEventTime eventTimeAssigner; + + OutputBuilder(String name, + Dataset input, + UnaryFunction sortByFn, + PartitioningBuilder partitioning, + @Nullable Windowing windowing, + @Nullable ExtractEventTime eventTimeAssigner) { + + super(partitioning); + + this.name = requireNonNull(name); + this.input = requireNonNull(input); + this.sortByFn = requireNonNull(sortByFn); + this.windowing = windowing; + this.eventTimeAssigner = eventTimeAssigner; + } + + @Override + public Dataset output() { + Preconditions.checkArgument(validPartitioning(getPartitioning()), + "Non-single partitioning with default partitioner is not supported on Sort operator. " + + "Set single partition or define custom partitioner, e.g. RangePartitioner."); + Flow flow = input.getFlow(); + Sort top = + new Sort<>(flow, name, input, + sortByFn, getPartitioning(), windowing, eventTimeAssigner); + flow.add(top); + return top.output(); + } + + private static boolean validPartitioning(Partitioning partitioning) { + return !partitioning.hasDefaultPartitioner() || partitioning.getNumPartitions() == 1; + } + } + + public static ByBuilder of(Dataset input) { + return new ByBuilder<>("Sort", input); + } + + public static OfBuilder named(String name) { + return new OfBuilder(name); + } + + // ~ ----------------------------------------------------------------------------- + + private final UnaryFunction sortByFn; + + Sort(Flow flow, + String name, + Dataset input, + UnaryFunction sortByFn, + Partitioning partitioning, + @Nullable Windowing windowing, + @Nullable ExtractEventTime eventTimeAssigner) { + super(name, flow, input, + // Key is actually the number of the final partition - it ensures that all records + // in one partition (and same window) get into the same state in ReduceStateByKey + // where they are later sorted. + // At the same time the key (partition number) is simply used inside partitioner + // to ensure that partitioning and states work together. + new PartitionKeyExtractor<>(sortByFn, partitioning), + windowing, eventTimeAssigner, + new HashPartitioning<>(partitioning.getNumPartitions())); + + this.sortByFn = sortByFn; + } + + public UnaryFunction getSortByExtractor() { + return sortByFn; + } + + @Override + public DAG> getBasicOps() { + Flow flow = getFlow(); + + final StateSupport.MergeFromStateMerger> stateCombiner = + new StateSupport.MergeFromStateMerger<>(); + final SortByComparator comparator = new SortByComparator<>(sortByFn); + ReduceStateByKey, W> reduce = + new ReduceStateByKey<>(getName() + "::ReduceStateByKey", flow, input, + keyExtractor, + e -> e, + windowing, + eventTimeAssigner, + (StateFactory>) + (ctx, provider) -> new Sorted<>(ctx, provider, comparator), + stateCombiner, + partitioning); + + MapElements, IN> format = + new MapElements<>(getName() + "::MapElements", flow, reduce.output(), + Pair::getSecond); + + DAG> dag = DAG.of(reduce); + dag.add(format, reduce); + return dag; + } + + private static class SortByComparator> + implements Comparator, Serializable { + + private final UnaryFunction sortByFn; + + public SortByComparator(UnaryFunction sortByFn) { + this.sortByFn = sortByFn; + } + + @Override + public int compare(V o1, V o2) { + return sortByFn.apply(o1).compareTo(sortByFn.apply(o2)); + } + } + + private static class PartitionKeyExtractor> + implements UnaryFunction { + + private final UnaryFunction sortByFn; + private final Partitioner partitioner; + private final int numPartitions; + + public PartitionKeyExtractor(UnaryFunction sortByFn, Partitioning partitioning) { + this.sortByFn = sortByFn; + this.partitioner = partitioning.getPartitioner(); + this.numPartitions = partitioning.getNumPartitions(); + } + + @Override + public Integer apply(IN what) { + int partitionId = partitioner.getPartition(sortByFn.apply(what)); + return (partitionId & Integer.MAX_VALUE) % numPartitions; + } + } +} diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/SortTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/SortTest.java new file mode 100644 index 0000000000000..6031175ab32d4 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/SortTest.java @@ -0,0 +1,157 @@ +/** + * Copyright 2016-2017 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.core.client.operator; + +import cz.seznam.euphoria.core.client.dataset.Dataset; +import cz.seznam.euphoria.core.client.dataset.partitioning.HashPartitioner; +import cz.seznam.euphoria.core.client.dataset.partitioning.HashPartitioning; +import cz.seznam.euphoria.core.client.dataset.windowing.Time; +import cz.seznam.euphoria.core.client.flow.Flow; +import cz.seznam.euphoria.shaded.guava.com.google.common.collect.Iterables; +import org.junit.Test; + +import java.time.Duration; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +public class SortTest { + @Test + public void testBuild() { + Flow flow = Flow.create("TEST"); + Dataset dataset = Util.createMockDataset(flow, 2); + + Time windowing = Time.of(Duration.ofHours(1)); + Dataset result = Sort.named("Sort1") + .of(dataset) + .by(s -> 1L) + .windowBy(windowing) + .setNumPartitions(1) + .output(); + + assertEquals(flow, result.getFlow()); + assertEquals(1, flow.size()); + + Sort tpk = (Sort) Iterables.getOnlyElement(flow.operators()); + assertEquals(flow, tpk.getFlow()); + assertEquals("Sort1", tpk.getName()); + assertNotNull(tpk.getKeyExtractor()); + assertNotNull(tpk.getSortByExtractor()); + assertEquals(result, tpk.output()); + assertSame(windowing, tpk.getWindowing()); + assertNull(tpk.getEventTimeAssigner()); + + assertTrue(!tpk.getPartitioning().hasDefaultPartitioner()); + assertTrue(tpk.getPartitioning().getPartitioner() instanceof HashPartitioner); + assertEquals(1, tpk.getPartitioning().getNumPartitions()); + } + + @Test + public void testBuild_ImplicitName() { + Flow flow = Flow.create("TEST"); + Dataset dataset = Util.createMockDataset(flow, 1); + + Dataset result = Sort.of(dataset) + .by(s -> 1L) + .output(); + + Sort tpk = (Sort) Iterables.getOnlyElement(flow.operators()); + assertEquals("Sort", tpk.getName()); + } + + @Test + public void testBuild_Windowing() { + Flow flow = Flow.create("TEST"); + Dataset dataset = Util.createMockDataset(flow, 1); + + Dataset result = Sort.of(dataset) + .by(s -> 1L) + .windowBy(Time.of(Duration.ofHours(1)), (s -> 0L)) + .output(); + + Sort tpk = (Sort) Iterables.getOnlyElement(flow.operators()); + assertNotNull(tpk.getEventTimeAssigner()); + } + + @Test + public void testBuild_Partitioning() { + Flow flow = Flow.create("TEST"); + Dataset dataset = Util.createMockDataset(flow, 2); + + Time windowing = Time.of(Duration.ofHours(1)); + Dataset result = Sort.named("Sort1") + .of(dataset) + .by(s -> 1L) + .setPartitioning(new HashPartitioning<>(1)) + .windowBy(windowing) + .output(); + + Sort tpk = (Sort) Iterables.getOnlyElement(flow.operators()); + assertTrue(!tpk.getPartitioning().hasDefaultPartitioner()); + assertTrue(tpk.getPartitioning().getPartitioner() instanceof HashPartitioner); + assertEquals(1, tpk.getPartitioning().getNumPartitions()); + } + + @Test + public void testBuild_Partitioner() { + Flow flow = Flow.create("TEST"); + Dataset dataset = Util.createMockDataset(flow, 2); + + Time windowing = Time.of(Duration.ofHours(1)); + Dataset result = Sort.named("Sort1") + .of(dataset) + .by(s -> 1L) + .windowBy(windowing) + .setPartitioner(e -> 0) + .setNumPartitions(5) + .output(); + + Sort tpk = (Sort) Iterables.getOnlyElement(flow.operators()); + assertTrue(!tpk.getPartitioning().hasDefaultPartitioner()); + assertTrue(tpk.getPartitioning().getPartitioner() instanceof HashPartitioner); + assertEquals(5, tpk.getPartitioning().getNumPartitions()); + } + + @Test(expected=IllegalArgumentException.class) + public void testBuild_UnsupportedPartitioningImplicit() { + Flow flow = Flow.create("TEST"); + Dataset dataset = Util.createMockDataset(flow, 2); + + Dataset result = Sort.of(dataset) + .by(s -> 1L) + .output(); + + Sort tpk = (Sort) Iterables.getOnlyElement(flow.operators()); + assertEquals("Sort", tpk.getName()); + } + + @Test(expected=IllegalArgumentException.class) + public void testBuild_UnsupportedPartitioningExplicit() { + Flow flow = Flow.create("TEST"); + Dataset dataset = Util.createMockDataset(flow, 1); + + Dataset result = Sort.of(dataset) + .by(s -> 1L) + .setNumPartitions(2) + .output(); + + Sort tpk = (Sort) Iterables.getOnlyElement(flow.operators()); + assertEquals("Sort", tpk.getName()); + } +} \ No newline at end of file diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/TopPerKeyKeyTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/TopPerKeyTest.java similarity index 99% rename from sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/TopPerKeyKeyTest.java rename to sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/TopPerKeyTest.java index cb5f78faa719c..b1bc33117abe6 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/TopPerKeyKeyTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/TopPerKeyTest.java @@ -28,7 +28,7 @@ import static org.junit.Assert.*; -public class TopPerKeyKeyTest { +public class TopPerKeyTest { @Test public void testBuild() { Flow flow = Flow.create("TEST"); diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/Utils.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/Utils.java index 54aa424624fdb..fc94f242f8fb6 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/Utils.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/Utils.java @@ -28,17 +28,20 @@ static class QueryableKeySelector implements KeySelector, ResultTypeQueryable { private final KeySelector inner; - private final Class clz; + private final TypeInformation typeInfo; QueryableKeySelector(KeySelector inner) { this(inner, Object.class); } QueryableKeySelector(KeySelector inner, Class clz) { - this.inner = inner; - this.clz = clz; + this(inner, TypeInformation.of(clz)); } + QueryableKeySelector(KeySelector inner, TypeInformation typeInfo) { + this.inner = inner; + this.typeInfo = typeInfo; + } @Override public V getKey(K in) throws Exception { @@ -47,7 +50,7 @@ public V getKey(K in) throws Exception { @Override public TypeInformation getProducedType() { - return TypeInformation.of(clz); + return typeInfo; } } @@ -83,4 +86,8 @@ public static KeySelector wrapQueryable( return new QueryableKeySelector<>(inner, clz); } + public static KeySelector wrapQueryable( + KeySelector inner, TypeInformation typeInfo) { + return new QueryableKeySelector<>(inner, typeInfo); + } } diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchFlowTranslator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchFlowTranslator.java index ce7aada28ee8a..275f65d55c02a 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchFlowTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchFlowTranslator.java @@ -26,6 +26,7 @@ import cz.seznam.euphoria.core.client.operator.ReduceByKey; import cz.seznam.euphoria.core.client.operator.ReduceStateByKey; import cz.seznam.euphoria.core.client.operator.Repartition; +import cz.seznam.euphoria.core.client.operator.Sort; import cz.seznam.euphoria.core.client.operator.Union; import cz.seznam.euphoria.core.executor.FlowUnfolder; import cz.seznam.euphoria.core.util.Settings; @@ -103,6 +104,8 @@ public BatchFlowTranslator(Settings settings, ExecutionEnvironment env, // derived operators Translation.set(translations, ReduceByKey.class, new ReduceByKeyTranslator(), ReduceByKeyTranslator::wantTranslate); + Translation.set(translations, Sort.class, new SortTranslator(), + SortTranslator::wantTranslate); } @SuppressWarnings("unchecked") diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/FlatMapTranslator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/FlatMapTranslator.java index 75d6d5306c8a9..c20742116b9b5 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/FlatMapTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/FlatMapTranslator.java @@ -29,7 +29,7 @@ public DataSet translate(FlinkOperator operator, DataSet input = context.getSingleInputStream(operator); UnaryFunctor mapper = operator.getOriginalOperator().getFunctor(); return input - .flatMap(new BatchUnaryFunctorWrapper<>(mapper)) + .flatMap(new BatchUnaryFunctorWrapper(mapper)) .returns((Class) BatchElement.class) .setParallelism(operator.getParallelism()) .name(operator.getName()); diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/SortTranslator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/SortTranslator.java new file mode 100644 index 0000000000000..f2051763cce0b --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/SortTranslator.java @@ -0,0 +1,119 @@ +/** + * Copyright 2016-2017 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.flink.batch; + +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioner; +import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing; +import cz.seznam.euphoria.core.client.dataset.windowing.Window; +import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; +import cz.seznam.euphoria.core.client.functional.UnaryFunction; +import cz.seznam.euphoria.core.client.operator.ExtractEventTime; +import cz.seznam.euphoria.core.client.operator.Sort; +import cz.seznam.euphoria.flink.FlinkOperator; +import cz.seznam.euphoria.flink.Utils; +import cz.seznam.euphoria.shaded.guava.com.google.common.collect.Iterables; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Preconditions; + +/** + * Translator of {@code Union} operator. + */ +class SortTranslator implements BatchOperatorTranslator { + + static boolean wantTranslate(Sort operator) { + return (operator.getWindowing() == null + || (!(operator.getWindowing() instanceof MergingWindowing) + && !operator.getWindowing().getTrigger().isStateful())); + } + + @Override + @SuppressWarnings("unchecked") + public DataSet translate( + FlinkOperator operator, + BatchExecutorContext context) { + + int inputParallelism = Iterables.getOnlyElement(context.getInputOperators(operator)).getParallelism(); + DataSet input = Iterables.getOnlyElement(context.getInputStreams(operator)); + + Sort origOperator = operator.getOriginalOperator(); + + final Windowing windowing = + origOperator.getWindowing() == null + ? AttachedWindowing.INSTANCE + : origOperator.getWindowing(); + + // extracts partitionId + final UnaryFunction udfKey = origOperator.getKeyExtractor(); + final UnaryFunction udfSort = origOperator.getSortByExtractor(); + + // ~ extract key/value + timestamp from input elements and assign windows + ExtractEventTime timeAssigner = origOperator.getEventTimeAssigner(); + + DataSet wAssigned = + input.flatMap((i, c) -> { + BatchElement wel = (BatchElement) i; + // assign timestamp if timeAssigner defined + if (timeAssigner != null) { + wel.setTimestamp(timeAssigner.extractTimestamp(wel.getElement())); + } + Iterable assigned = windowing.assignWindowsToElement(wel); + for (Window wid : assigned) { + Object el = wel.getElement(); + c.collect(new BatchElement<>(wid, wel.getTimestamp(), el)); + } + }) + .returns(BatchElement.class) + .name(operator.getName() + "::map-input") + .setParallelism(inputParallelism); + + // ~ repartition and sort partitions + DataSet sorted = wAssigned + .partitionCustom(new SortPartitionerWrapper<>( + origOperator.getPartitioning().getPartitioner()), + Utils.wrapQueryable(we -> udfKey.apply(we.getElement()), Integer.class)) + .setParallelism(operator.getParallelism()) + .sortPartition(Utils.wrapQueryable( + (BatchElement we) -> Tuple2.of(we.getWindow(), udfSort.apply(we.getElement())), + (new TypeHint>() {}).getTypeInfo()), + Order.ASCENDING) + .name(operator.getName() + "::sort"); + + return sorted; + } + + public class SortPartitionerWrapper + implements org.apache.flink.api.common.functions.Partitioner { + + private final Partitioner partitioner; + + public SortPartitionerWrapper(Partitioner partitioner) { + this.partitioner = partitioner; + } + + @Override + public int partition(T elem, int numPartitions) { + int ret = partitioner.getPartition(elem); + // already presume that the partitioner returns the number in the correct range + Preconditions.checkArgument( + ret >= 0 && ret < numPartitions, + "Unexpected partition number " + ret + " with number of partitions " + numPartitions); + return ret; + } + } +} diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/UnionTranslator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/UnionTranslator.java index bd32453649187..850311aa4356a 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/UnionTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/UnionTranslator.java @@ -40,6 +40,4 @@ public DataSet translate( Optional reduce = inputs.stream().reduce((l, r) -> l.union(r)); return reduce.get(); } - - } diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/FlatMapTranslator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/FlatMapTranslator.java index f82c4e3666021..f87bd67945eb6 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/FlatMapTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/FlatMapTranslator.java @@ -30,7 +30,7 @@ public DataStream translate(FlinkOperator operator, DataStream input = context.getSingleInputStream(operator); UnaryFunctor mapper = operator.getOriginalOperator().getFunctor(); return input - .flatMap(new StreamingUnaryFunctorWrapper<>(mapper)) + .flatMap(new StreamingUnaryFunctorWrapper(mapper)) .returns((Class) StreamingElement.class) .name(operator.getName()) .setParallelism(operator.getParallelism()); diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java index 01f8f737d9ec7..782f45613ccc9 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java @@ -113,7 +113,7 @@ public DataStream translate(FlinkOperator operator, .setParallelism(input.getParallelism()); reduced = (DataStream) windowed.keyBy(new KeyedMultiWindowedElementKeyExtractor()) .transform(operator.getName(), TypeInformation.of(StreamingElement.class), - new KeyedMultiWindowedElementWindowOperator<>( + new KeyedMultiWindowedElementWindowOperator( windowing, stateFactory, stateCombiner, context.isLocalMode(), descriptorsCacheMaxSize)) .setParallelism(operator.getParallelism()); diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/streaming/RBKAttachedWindowingTest.java b/sdks/java/extensions/euphoria/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/streaming/RBKAttachedWindowingTest.java index d84d24cc6fe55..4004f1fff3a21 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/streaming/RBKAttachedWindowingTest.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/streaming/RBKAttachedWindowingTest.java @@ -85,7 +85,7 @@ public void testAttachedWindow() throws Exception { reduced = ReduceByKey .of(uniq) .keyBy(e -> "") - .valueBy(new ToHashMap<>(Pair::getFirst, Pair::getSecond)) + .valueBy(new ToHashMap>(Pair::getFirst, Pair::getSecond)) .combineBy(new MergeHashMaps<>()) .setNumPartitions(1) .output(); @@ -319,7 +319,7 @@ public void testDistinctReduction() throws Exception { Dataset>> reduced = ReduceByKey.of(counted) .keyBy(e -> e.getFirst().getStartMillis()) - .valueBy(new ToHashMap<>(Triple::getSecond, Triple::getThird)) + .valueBy(new ToHashMap>(Triple::getSecond, Triple::getThird)) .combineBy(new MergeHashMaps<>()) // ~ partition by the input window start time .setNumPartitions(2) diff --git a/sdks/java/extensions/euphoria/euphoria-inmem/src/test/java/cz/seznam/euphoria/inmem/InMemExecutorTest.java b/sdks/java/extensions/euphoria/euphoria-inmem/src/test/java/cz/seznam/euphoria/inmem/InMemExecutorTest.java index f15d887c4e72d..fe1202f398792 100644 --- a/sdks/java/extensions/euphoria/euphoria-inmem/src/test/java/cz/seznam/euphoria/inmem/InMemExecutorTest.java +++ b/sdks/java/extensions/euphoria/euphoria-inmem/src/test/java/cz/seznam/euphoria/inmem/InMemExecutorTest.java @@ -308,7 +308,7 @@ public void close() { } } // ~ end of SortState - static class SizedCountWindow extends Window { + static class SizedCountWindow extends Window { final int size; int get() { @@ -337,6 +337,11 @@ public boolean equals(Object o) { public int hashCode() { return size; } + + @Override + public int compareTo(SizedCountWindow o) { + return Integer.compare(size, o.size); + } } // ~ end of SizedCountWindow static class SizedCountWindowing diff --git a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/AllOperatorsSuite.java b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/AllOperatorsSuite.java index e2055220a3011..53c5f1dbdd62e 100644 --- a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/AllOperatorsSuite.java +++ b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/AllOperatorsSuite.java @@ -37,6 +37,7 @@ RepartitionTest.class, SumByKeyTest.class, TopPerKeyTest.class, + SortTest.class, UnionTest.class, WindowingTest.class, }) diff --git a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/IntWindow.java b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/IntWindow.java index 7e1c8b4732493..d37ee0f32a329 100644 --- a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/IntWindow.java +++ b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/IntWindow.java @@ -17,7 +17,7 @@ import cz.seznam.euphoria.core.client.dataset.windowing.Window; -class IntWindow extends Window implements Comparable { +class IntWindow extends Window { private int val; IntWindow(int val) { diff --git a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/ReduceByKeyTest.java b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/ReduceByKeyTest.java index 6ee9a1888dbbc..dbd7d73cccf1d 100644 --- a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/ReduceByKeyTest.java +++ b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/ReduceByKeyTest.java @@ -286,7 +286,7 @@ protected Dataset> getOutput(Dataset input) { // ---------------------------------------------------------------------------- // ~ every instance is unique: this allows us to exercise merging - static final class CWindow extends Window implements Comparable { + static final class CWindow extends Window { static int _idCounter = 0; static final Object _idCounterMutex = new Object(); diff --git a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/SortTest.java b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/SortTest.java new file mode 100644 index 0000000000000..5d3122ece5468 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/SortTest.java @@ -0,0 +1,275 @@ +/** + * Copyright 2016-2017 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.operator.test; + +import cz.seznam.euphoria.core.client.dataset.Dataset; +import cz.seznam.euphoria.core.client.dataset.partitioning.RangePartitioning; +import cz.seznam.euphoria.core.client.dataset.windowing.Time; +import cz.seznam.euphoria.core.client.operator.Sort; +import cz.seznam.euphoria.operator.test.junit.AbstractOperatorTest; +import cz.seznam.euphoria.operator.test.junit.Processing; +import org.junit.Assert; +import org.junit.Test; + +import java.io.Serializable; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; + +@Processing(Processing.Type.ALL) +public class SortTest extends AbstractOperatorTest { + + static final class Item implements Serializable { + private final long time; + private final String key; + private final int score; + Item(String key, int score) { + this(0L, key, score); + } + Item(long time, String key, int score) { + this.time = time; + this.key = key; + this.score = score; + } + long getTime() { return time; } + String getKey() { return key; } + int getScore() { return score; } + @Override + public boolean equals(Object o) { + if (o instanceof Item) { + Item item = (Item) o; + return time == item.time + && score == item.score + && Objects.equals(key, item.key); + } + return false; + } + @Override + public int hashCode() { return Objects.hash(time, key, score); } + @Override + public String toString() { + return "Item [" + time + ", " + key + ", " + score + "]"; + } + } + + @Test + public void testOnSinglePartition() { + execute(new AbstractTestCase() { + @Override + protected Dataset + getOutput(Dataset input) { + Dataset sorted = Sort.of(input) + .by(Item::getScore) + .setNumPartitions(1) + .output(); + return sorted; + } + + @Override + public void validate(Partitions partitions) { + Assert.assertEquals(1, partitions.size()); + List items = partitions.get(0); + assertEquals(9, items.size()); + + List scores = items.stream() + .map(Item::getKey) + .collect(Collectors.toList()); + + assertEquals(Arrays.asList( + "1-three", + "one-ZZZ-1", + "one-ZZZ-2", + "one-3", + "two", + "2-three", + "3-three", + "one-XXX-100", + "one-999"), scores); + } + + @Override + protected Partitions getInput() { + return Partitions + .add( + new Item("one-ZZZ-1", 1), + new Item("one-ZZZ-2", 2), + new Item("one-3", 3), + new Item("one-999", 999), + new Item("two", 10), + new Item("1-three", 0), + new Item("2-three", 11)) + .add( + new Item("one-XXX-100", 100), + new Item("3-three", 21)) + .build(); + } + + @Override + public int getNumOutputPartitions() { + return 1; + } + }); + } + + @Test + public void testOnCustomPartitioner() { + execute(new AbstractTestCase() { + @Override + protected Dataset + getOutput(Dataset input) { + Dataset sorted = Sort.of(input) + .by(Item::getScore) + .setNumPartitions(2) + .setPartitioner(i -> i < 20 ? 0 : 1) + .output(); + return sorted; + } + + @Override + public void validate(Partitions partitions) { + Assert.assertEquals(2, partitions.size()); + List lowItems = partitions.get(0); + assertEquals(6, lowItems.size()); + List lowScores = lowItems.stream() + .map(Item::getKey) + .collect(Collectors.toList()); + assertEquals(Arrays.asList( + "1-three", + "one-ZZZ-1", + "one-ZZZ-2", + "one-3", + "two", + "2-three"), lowScores); + + List highItems = partitions.get(1); + assertEquals(3, highItems.size()); + List highScores = highItems.stream() + .map(Item::getKey) + .collect(Collectors.toList()); + assertEquals(Arrays.asList( + "3-three", + "one-XXX-100", + "one-999"), highScores); + } + + @Override + protected Partitions getInput() { + return Partitions + .add( + new Item("one-ZZZ-1", 1), + new Item("one-ZZZ-2", 2), + new Item("one-3", 3), + new Item("one-999", 999), + new Item("two", 10), + new Item("1-three", 0), + new Item("2-three", 11)) + .add( + new Item("one-XXX-100", 100), + new Item("3-three", 21)) + .build(); + } + + @Override + public int getNumOutputPartitions() { + return 2; + } + }); + } + + @Test + public void testOnWindowingWithCustomPartitioner() { + execute(new AbstractTestCase() { + @Override + protected Dataset + getOutput(Dataset input) { + Dataset sorted = Sort.of(input) + .by(Item::getScore) + .windowBy(Time.of(Duration.ofSeconds(2)), Item::getTime) + .setPartitioning(new RangePartitioning<>(10, 20)) + .output(); + return sorted; + } + + @Override + public void validate(Partitions partitions) { + Assert.assertEquals(3, partitions.size()); + + // (0-10> + List lowItems = partitions.get(0); + assertEquals(6, lowItems.size()); + assertEquals(Arrays.asList("two"), between(lowItems, 0, 2000)); + assertEquals(Arrays.asList("1-three", "one-ZZZ-2", "one-3"), between(lowItems, 2000, 4000)); + assertEquals(Arrays.asList("one-ZZZ-1", "2-three"), between(lowItems, 4000, 6000)); + + // (10-20> + List midItems = partitions.get(1); + assertEquals(1, midItems.size()); + assertEquals(Arrays.asList(), between(midItems, 0, 2000)); + assertEquals(Arrays.asList("4-four"), between(midItems, 2000, 4000)); + assertEquals(Arrays.asList(), between(midItems, 4000, 6000)); + + // (20-MAX> + List highItems = partitions.get(2); + assertEquals(3, highItems.size()); + assertEquals(Arrays.asList("one-XXX-100", "one-999"), between(highItems, 0, 2000)); + assertEquals(Arrays.asList(), between(highItems, 2000, 4000)); + assertEquals(Arrays.asList("3-three"), between(highItems, 4000, 6000)); + } + + @Override + protected Partitions getInput() { + return Partitions + .add( + new Item(4000, "one-ZZZ-1", 1), + new Item(3000, "one-ZZZ-2", 2), + new Item(2000, "one-3", 3), + new Item(1000, "one-999", 999), + new Item(0000, "two", 8), + new Item(3000, "1-three", 0), + new Item(4000, "2-three", 9)) + .add( + new Item(1000, "one-XXX-100", 100), + new Item(5000, "3-three", 21), + new Item(3000, "4-four", 11)) + .build(); + } + + @Override + public int getNumOutputPartitions() { + return 3; + } + }); + } + + // take sequential sublist of items between lo inclusive and hi exclusive + private static List between(List items, long lo, long hi) { + List ret = new ArrayList<>(); + for (Item item: items) { + if (item.getTime() >= lo && item.getTime() < hi) { + ret.add(item.getKey()); + // already passed the interval + } else if (!ret.isEmpty()) { + break; + } + } + return ret; + } +} diff --git a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/FlatMapTranslator.java b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/FlatMapTranslator.java index d47dc2c7051c6..c68296aa62689 100644 --- a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/FlatMapTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/FlatMapTranslator.java @@ -30,6 +30,6 @@ public JavaRDD translate(FlatMap operator, final JavaRDD input = context.getSingleInput(operator); final UnaryFunctor mapper = operator.getFunctor(); - return input.flatMap(new UnaryFunctorWrapper<>((UnaryFunctor) mapper)); + return input.flatMap(new UnaryFunctorWrapper((UnaryFunctor) mapper)); } } diff --git a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/ReduceByKeyTranslator.java b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/ReduceByKeyTranslator.java index cb7d3747e5cdd..16edc2b39ef94 100644 --- a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/ReduceByKeyTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/ReduceByKeyTranslator.java @@ -15,7 +15,6 @@ */ package cz.seznam.euphoria.spark; -import cz.seznam.euphoria.shaded.guava.com.google.common.base.Preconditions; import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning; import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing; import cz.seznam.euphoria.core.client.dataset.windowing.TimedWindow; @@ -25,18 +24,18 @@ import cz.seznam.euphoria.core.client.operator.ExtractEventTime; import cz.seznam.euphoria.core.client.operator.ReduceByKey; import cz.seznam.euphoria.core.client.util.Pair; +import cz.seznam.euphoria.shaded.guava.com.google.common.base.Preconditions; +import javax.annotation.Nullable; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import scala.Tuple2; -import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; -import java.util.Set; class ReduceByKeyTranslator implements SparkOperatorTranslator { diff --git a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/ReduceStateByKeyTranslator.java b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/ReduceStateByKeyTranslator.java index c23c47433b29a..79b6bac48278b 100644 --- a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/ReduceStateByKeyTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/ReduceStateByKeyTranslator.java @@ -18,7 +18,6 @@ import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing; import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; -import cz.seznam.euphoria.core.client.functional.CombinableReduceFunction; import cz.seznam.euphoria.core.client.functional.UnaryFunction; import cz.seznam.euphoria.core.client.operator.ExtractEventTime; import cz.seznam.euphoria.core.client.operator.ReduceStateByKey; @@ -150,7 +149,6 @@ public Iterator> call(SparkElement wel) throws Excep } } - private static class StateReducer implements FlatMapFunction>, SparkElement> { @@ -225,7 +223,5 @@ private void flushStates() { } activeReducers.clear(); } - - } } diff --git a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/SortTranslator.java b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/SortTranslator.java new file mode 100644 index 0000000000000..a5e1af287859e --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/SortTranslator.java @@ -0,0 +1,172 @@ +/** + * Copyright 2016-2017 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.spark; + +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning; +import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing; +import cz.seznam.euphoria.core.client.dataset.windowing.TimedWindow; +import cz.seznam.euphoria.core.client.dataset.windowing.Window; +import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; +import cz.seznam.euphoria.core.client.functional.UnaryFunction; +import cz.seznam.euphoria.core.client.operator.ExtractEventTime; +import cz.seznam.euphoria.core.client.operator.Sort; +import javax.annotation.Nullable; +import org.apache.spark.Partitioner; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.PairFlatMapFunction; +import scala.Tuple2; +import scala.Tuple3; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; + +class SortTranslator implements SparkOperatorTranslator { + + static boolean wantTranslate(Sort operator) { + return (operator.getWindowing() == null + || (!(operator.getWindowing() instanceof MergingWindowing) + && !operator.getWindowing().getTrigger().isStateful())); + } + + @Override + @SuppressWarnings("unchecked") + public JavaRDD translate(Sort operator, + SparkExecutorContext context) { + + final JavaRDD input = (JavaRDD) context.getSingleInput(operator); + + final UnaryFunction keyExtractor = operator.getKeyExtractor(); + final UnaryFunction sortByFn = operator.getSortByExtractor(); + final ExtractEventTime eventTimeAssigner = operator.getEventTimeAssigner(); + final Windowing windowing = operator.getWindowing() == null + ? AttachedWindowing.INSTANCE + : operator.getWindowing(); + + // ~ extract key/value + timestamp from input elements and assign windows + JavaPairRDD, TimestampedElement> tuples = input.flatMapToPair( + new CompositeKeyExtractor(keyExtractor, sortByFn, windowing, eventTimeAssigner)); + + Partitioner partitioner = new PartitioningWrapper(operator.getPartitioning().getNumPartitions()); + Comparator comparator = new TripleComparator(); + + JavaPairRDD, TimestampedElement> sorted = + tuples.repartitionAndSortWithinPartitions(partitioner, comparator); + + return sorted.map(t -> { + Tuple3 kw = t._1(); + TimestampedElement el = t._2(); + + // ~ extract timestamp from element rather than from KeyedWindow + // because in KeyedWindow there is the original timestamp from + // pre-reduce age + long timestamp = el.getTimestamp(); + + return new SparkElement<>(kw._2(), timestamp, el.getElement()); + }); + } + + /** + * Extracts {@link Window} from {@link SparkElement} and assigns timestamp + * according to (optional) eventTimeAssigner. + * The result composite key tuple consists of [partitionId, window, sortByKey]. + */ + private static class CompositeKeyExtractor + implements PairFlatMapFunction, TimestampedElement> { + + private final UnaryFunction keyExtractor; + private final UnaryFunction sortByFn; + private final Windowing windowing; + @Nullable + private final ExtractEventTime eventTimeAssigner; + + public CompositeKeyExtractor(UnaryFunction keyExtractor, + UnaryFunction sortByFn, + Windowing windowing, + @Nullable ExtractEventTime eventTimeAssigner) { + this.keyExtractor = keyExtractor; + this.sortByFn = sortByFn; + this.windowing = windowing; + this.eventTimeAssigner = eventTimeAssigner; + } + + @Override + @SuppressWarnings("unchecked") + public Iterator, TimestampedElement>> call(SparkElement wel) + throws Exception { + if (eventTimeAssigner != null) { + wel.setTimestamp(eventTimeAssigner.extractTimestamp(wel.getElement())); + } + + Iterable windows = windowing.assignWindowsToElement(wel); + List, TimestampedElement>> out = new ArrayList<>(); + for (Window wid : windows) { + Object el = wel.getElement(); + long stamp = (wid instanceof TimedWindow) + ? ((TimedWindow) wid).maxTimestamp() + : wel.getTimestamp(); + out.add(new Tuple2<>( + new Tuple3(keyExtractor.apply(el), wid, sortByFn.apply(el)), + new TimestampedElement(stamp, el))); + } + return out.iterator(); + } + } + + /** + * Adapter between Euphoria {@link Partitioning} and Spark {@link Partitioner}. Takes already computed + * partitionId from the triple. + */ + private static class PartitioningWrapper extends Partitioner { + + private final int numPartitions; + + public PartitioningWrapper(int numPartitions) { + this.numPartitions = numPartitions; + } + + @Override + public int numPartitions() { + return numPartitions; + } + + @Override + @SuppressWarnings("unchecked") + public int getPartition(Object el) { + Tuple3 t = (Tuple3) el; + return t._1(); + } + } + + private static class TripleComparator implements Comparator>, Serializable { + + @SuppressWarnings("unchecked") + @Override + public int compare(Tuple3 o1, Tuple3 o2) { + int result = o1._1().compareTo(o2._1()); + if (result == 0) { + result = o1._2().compareTo(o2._2()); + } + if (result == 0) { + result = o1._3().compareTo(o2._3()); + } + return result; + } + } +} diff --git a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/SparkFlowTranslator.java b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/SparkFlowTranslator.java index d2536a1c3fc2a..b9aecf2e29acb 100644 --- a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/SparkFlowTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/SparkFlowTranslator.java @@ -25,6 +25,7 @@ import cz.seznam.euphoria.core.client.operator.ReduceByKey; import cz.seznam.euphoria.core.client.operator.ReduceStateByKey; import cz.seznam.euphoria.core.client.operator.Repartition; +import cz.seznam.euphoria.core.client.operator.Sort; import cz.seznam.euphoria.core.client.operator.Union; import cz.seznam.euphoria.core.executor.FlowUnfolder; import cz.seznam.euphoria.hadoop.output.DataSinkOutputFormat; @@ -94,6 +95,8 @@ public SparkFlowTranslator(JavaSparkContext sparkEnv) { // derived operators Translation.set(translations, ReduceByKey.class, new ReduceByKeyTranslator(), ReduceByKeyTranslator::wantTranslate); + Translation.set(translations, Sort.class, new SortTranslator(), + SortTranslator::wantTranslate); } @SuppressWarnings("unchecked") diff --git a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/UnionTranslator.java b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/UnionTranslator.java index f40d3316affc8..13bb73c3f03a3 100644 --- a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/UnionTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/UnionTranslator.java @@ -20,7 +20,6 @@ import java.util.List; - class UnionTranslator implements SparkOperatorTranslator { @Override