From ec929931199829d574d705d5d0e8fefd44729157 Mon Sep 17 00:00:00 2001 From: "Novotnik, Petr" Date: Thu, 2 Feb 2017 16:01:44 +0100 Subject: [PATCH] #16 Replace Dataset#getPartitioning() with Dataset#getNumPartitions() --- .../euphoria/core/client/dataset/Dataset.java | 13 +++--- .../core/client/dataset/Datasets.java | 24 ++++------ .../euphoria/core/client/flow/Flow.java | 9 ++++ .../core/client/operator/CountByKey.java | 2 +- .../core/client/operator/Distinct.java | 2 +- .../euphoria/core/client/operator/Join.java | 5 +-- .../core/client/operator/ReduceByKey.java | 2 +- .../client/operator/ReduceStateByKey.java | 2 +- .../core/client/operator/ReduceWindow.java | 3 +- .../core/client/operator/Repartition.java | 2 +- ...ateAwareWindowWiseSingleInputOperator.java | 10 ----- .../core/client/operator/SumByKey.java | 2 +- .../core/client/operator/TopPerKey.java | 3 +- .../euphoria/core/client/operator/Union.java | 3 +- .../core/client/operator/FlatMapTest.java | 19 ++++++++ .../core/client/operator/ReduceByKeyTest.java | 44 ++++++++++++++++++- 16 files changed, 98 insertions(+), 47 deletions(-) diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Dataset.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Dataset.java index 4374713c7d16c..f632e31d2f517 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Dataset.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Dataset.java @@ -60,12 +60,15 @@ public interface Dataset extends Serializable { Collection> getConsumers(); /** - * Retrieve partitioning for this dataset. - * The dataset might be partitioned by some other type - * (using some extraction function). + * Determines the parallelism of this data set - if known. Typically, + * a data set is split into multiple partitions which can be processed + * in parallel. + * + * @return {@code < 0} if the partition count is unknown, otherwise the + * count of partitions of this dataset (which can potentially + * be processed in parallel) */ - Partitioning getPartitioning(); - + int getNumPartitions(); /** * @return {@code true} if this is a bounded data set, diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Datasets.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Datasets.java index 2da6f4d30e5ad..f3c660fc2c3ee 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Datasets.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Datasets.java @@ -41,16 +41,13 @@ public class Datasets { public static Dataset createOutputFor( Flow flow, Dataset input, Operator op) { - return new OutputDataset(flow, (Operator) op, input.isBounded()) { + return new OutputDataset(flow, op, input.isBounded()) { @Override - @SuppressWarnings("unchecked") - public Partitioning getPartitioning() { - if (op instanceof PartitioningAware) { - // only partitioning aware operators change the partitioning - PartitioningAware pa = (PartitioningAware) op; - return (Partitioning) pa.getPartitioning(); - } - return input.getPartitioning(); + public int getNumPartitions() { + // only partitioning aware operators can change the partition count + return (op instanceof PartitioningAware) + ? ((PartitioningAware) op).getPartitioning().getNumPartitions() + : input.getNumPartitions(); } }; } @@ -70,13 +67,8 @@ public static Dataset createInputFromSource( return new InputDataset(flow, source, source.isBounded()) { @Override - public Partitioning getPartitioning() { - return new Partitioning() { - @Override - public int getNumPartitions() { - return source.getPartitions().size(); - } - }; + public int getNumPartitions() { + return source.getPartitions().size(); } }; } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/flow/Flow.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/flow/Flow.java index 192a89517d6c5..7646480a08578 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/flow/Flow.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/flow/Flow.java @@ -93,6 +93,15 @@ private Flow(String name, Settings settings) { this.settings = cloneSettings(settings); } + /** + * Creates a new (anonymous) Flow. + * + * @return a new flow with an undefined name, + * i.e. either not named at all or with a system generated name + */ + public static Flow create() { + return create(null); + } /** * Creates a new Flow. diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java index 9f232f17e06b5..d542e0737f4d8 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java @@ -70,7 +70,7 @@ public static class WindowingBuilder WindowingBuilder(String name, Dataset input, UnaryFunction keyExtractor) { // define default partitioning - super(new DefaultPartitioning<>(input.getPartitioning().getNumPartitions())); + super(new DefaultPartitioning<>(input.getNumPartitions())); this.name = Objects.requireNonNull(name); this.input = Objects.requireNonNull(input); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java index a70ec91ccaa40..f05c7d50cf9b0 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java @@ -71,7 +71,7 @@ public static class WindowingBuilder UnaryFunction mapper /* optional */) { // define default partitioning - super(new DefaultPartitioning<>(input.getPartitioning().getNumPartitions())); + super(new DefaultPartitioning<>(input.getNumPartitions())); this.name = Objects.requireNonNull(name); this.input = Objects.requireNonNull(input); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java index adf27e1758d7d..976af510caeb1 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java @@ -141,9 +141,8 @@ public static class WindowingBuilder BinaryFunctor joinFunc) { // define default partitioning - super(new DefaultPartitioning<>(Math.max( - left.getPartitioning().getNumPartitions(), - right.getPartitioning().getNumPartitions()))); + super(new DefaultPartitioning<>( + Math.max(left.getNumPartitions(), right.getNumPartitions()))); this.name = Objects.requireNonNull(name); this.left = Objects.requireNonNull(left); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java index e3c3befa01e64..76857d12be280 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java @@ -144,7 +144,7 @@ public static class DatasetBuilder4 ReduceFunction reducer) { // initialize default partitioning according to input - super(new DefaultPartitioning<>(input.getPartitioning().getNumPartitions())); + super(new DefaultPartitioning<>(input.getNumPartitions())); this.name = Objects.requireNonNull(name); this.input = Objects.requireNonNull(input); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java index 0e3d70d64ad09..ae74137a5e7b0 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java @@ -158,7 +158,7 @@ public static class DatasetBuilder5< CombinableReduceFunction stateCombiner) { // initialize default partitioning according to input - super(new DefaultPartitioning<>(input.getPartitioning().getNumPartitions())); + super(new DefaultPartitioning<>(input.getNumPartitions())); this.name = Objects.requireNonNull(name); this.input = Objects.requireNonNull(input); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java index e9b95426177c4..579f20c5e4e95 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java @@ -175,8 +175,7 @@ public Partitioner getPartitioner() { } @Override public int getNumPartitions() { - return numPartitions > 0 - ? numPartitions : input.getPartitioning().getNumPartitions(); + return numPartitions > 0 ? numPartitions : input.getNumPartitions(); } }); this.reducer = reducer; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Repartition.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Repartition.java index 159a46ff13793..3c94985a44231 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Repartition.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Repartition.java @@ -55,7 +55,7 @@ public static class OutputBuilder private final Dataset input; OutputBuilder(String name, Dataset input) { // initialize default partitioning according to input - super(new DefaultPartitioning<>(input.getPartitioning().getNumPartitions())); + super(new DefaultPartitioning<>(input.getNumPartitions())); this.name = Objects.requireNonNull(name); this.input = Objects.requireNonNull(input); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseSingleInputOperator.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseSingleInputOperator.java index fa37a9d8e8f21..cac790fb335ff 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseSingleInputOperator.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseSingleInputOperator.java @@ -49,16 +49,6 @@ protected StateAwareWindowWiseSingleInputOperator( this.output = createOutput(input); } - protected StateAwareWindowWiseSingleInputOperator( - String name, - Flow flow, - Dataset input, - UnaryFunction extractor, - Windowing windowing /* optional */, - UnaryFunction eventTimeAssigner /* optional */) { - this(name, flow, input, extractor, windowing, eventTimeAssigner, input.getPartitioning()); - } - @Override public Collection> listInputs() { return Collections.singletonList(input); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java index ebd98012f4de2..8d23e2cecbdae 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java @@ -76,7 +76,7 @@ public static class ByBuilder2 private UnaryFunction valueExtractor = e -> 1L; ByBuilder2(String name, Dataset input, UnaryFunction keyExtractor) { // initialize default partitioning according to input - super(new DefaultPartitioning<>(input.getPartitioning().getNumPartitions())); + super(new DefaultPartitioning<>(input.getNumPartitions())); this.name = Objects.requireNonNull(name); this.input = Objects.requireNonNull(input); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java index 92b7f5bd346d5..15e80c7ba992b 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java @@ -170,8 +170,7 @@ public static class WindowByBuilder> UnaryFunction valueFn, UnaryFunction scoreFn) { - super(new DefaultPartitioning<>( - input.getPartitioning().getNumPartitions())); + super(new DefaultPartitioning<>(input.getNumPartitions())); this.name = requireNonNull(name); this.input = requireNonNull(input); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Union.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Union.java index 4ab627d7be555..58c2c4494d0a5 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Union.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Union.java @@ -40,8 +40,7 @@ public static class OfBuilder { this.name = name; } - public OutputBuilder of(Dataset left, Dataset right) - { + public OutputBuilder of(Dataset left, Dataset right) { if (right.getFlow() != left.getFlow()) { throw new IllegalArgumentException("Pass inputs from the same flow"); } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/FlatMapTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/FlatMapTest.java index 9bcfd4f5075a4..ea163c8b7d47b 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/FlatMapTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/FlatMapTest.java @@ -17,6 +17,7 @@ import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.flow.Flow; +import cz.seznam.euphoria.core.client.functional.BinaryFunctor; import cz.seznam.euphoria.core.client.io.Context; import org.junit.Test; @@ -57,4 +58,22 @@ public void testBuild_ImplicitName() { FlatMap map = (FlatMap) flow.operators().iterator().next(); assertEquals("FlatMap", map.getName()); } + + /** + * Verify that the number of partitions of the flat map + * operator's input is preserved in the output. + */ + @Test + public void testOutputNumPartitionsIsUnchanged() { + final int N_PARTITIONS = 78; + + Flow f = Flow.create(); + Dataset input = Util.createMockDataset(f, N_PARTITIONS); + assertEquals(N_PARTITIONS, input.getNumPartitions()); + + Dataset output = FlatMap.of(input) + .using((Object o, Context c) -> c.collect(o)) + .output(); + assertEquals(N_PARTITIONS, output.getNumPartitions()); + } } \ No newline at end of file diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/ReduceByKeyTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/ReduceByKeyTest.java index 9bc3449f0d0fe..7671cff03c99c 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/ReduceByKeyTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/ReduceByKeyTest.java @@ -20,11 +20,12 @@ import cz.seznam.euphoria.core.client.dataset.HashPartitioning; import cz.seznam.euphoria.core.client.dataset.windowing.Time; import cz.seznam.euphoria.core.client.flow.Flow; +import cz.seznam.euphoria.core.client.io.Context; import cz.seznam.euphoria.core.client.util.Pair; +import cz.seznam.euphoria.core.client.util.Sums; import org.junit.Test; import java.time.Duration; -import java.util.List; import java.util.stream.StreamSupport; import static org.junit.Assert.*; @@ -148,4 +149,45 @@ public void testBuild_Partitioner() { assertTrue(reduce.getPartitioning().getPartitioner() instanceof HashPartitioner); assertEquals(5, reduce.getPartitioning().getNumPartitions()); } + + /** + * Verify that the number of partitions of the reduce-by-key + * operator's input is preserved in the output since no partitioning + * is explicitly specified. + */ + @Test + public void testOutputNumPartitionsIsPreserved() { + final int N_PARTITIONS = 78; + + Flow f = Flow.create(); + Dataset> input = Util.createMockDataset(f, N_PARTITIONS); + assertEquals(N_PARTITIONS, input.getNumPartitions()); + + Dataset> output = + ReduceByKey.of(input) + .keyBy(Pair::getFirst) + .valueBy(Pair::getSecond) + .combineBy(Sums.ofLongs()) + .output(); + assertEquals(N_PARTITIONS, output.getNumPartitions()); + } + + @Test + public void testOutputExplicitNumPartitionsIsRespected() { + final int INPUT_PARTITIONS = 78; + final int OUTPUT_PARTITIONS = 13; + + Flow f = Flow.create(); + Dataset> input = Util.createMockDataset(f, INPUT_PARTITIONS); + assertEquals(INPUT_PARTITIONS, input.getNumPartitions()); + + Dataset> output = + ReduceByKey.of(input) + .keyBy(Pair::getFirst) + .valueBy(Pair::getSecond) + .combineBy(Sums.ofLongs()) + .setNumPartitions(OUTPUT_PARTITIONS) + .output(); + assertEquals(OUTPUT_PARTITIONS, output.getNumPartitions()); + } } \ No newline at end of file