From ec929931199829d574d705d5d0e8fefd44729157 Mon Sep 17 00:00:00 2001 From: "Novotnik, Petr" Date: Thu, 2 Feb 2017 16:01:44 +0100 Subject: [PATCH 1/2] #16 Replace Dataset#getPartitioning() with Dataset#getNumPartitions() --- .../euphoria/core/client/dataset/Dataset.java | 13 +++--- .../core/client/dataset/Datasets.java | 24 ++++------ .../euphoria/core/client/flow/Flow.java | 9 ++++ .../core/client/operator/CountByKey.java | 2 +- .../core/client/operator/Distinct.java | 2 +- .../euphoria/core/client/operator/Join.java | 5 +-- .../core/client/operator/ReduceByKey.java | 2 +- .../client/operator/ReduceStateByKey.java | 2 +- .../core/client/operator/ReduceWindow.java | 3 +- .../core/client/operator/Repartition.java | 2 +- ...ateAwareWindowWiseSingleInputOperator.java | 10 ----- .../core/client/operator/SumByKey.java | 2 +- .../core/client/operator/TopPerKey.java | 3 +- .../euphoria/core/client/operator/Union.java | 3 +- .../core/client/operator/FlatMapTest.java | 19 ++++++++ .../core/client/operator/ReduceByKeyTest.java | 44 ++++++++++++++++++- 16 files changed, 98 insertions(+), 47 deletions(-) diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Dataset.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Dataset.java index 4374713c7d16c..f632e31d2f517 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Dataset.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Dataset.java @@ -60,12 +60,15 @@ public interface Dataset extends Serializable { Collection> getConsumers(); /** - * Retrieve partitioning for this dataset. - * The dataset might be partitioned by some other type - * (using some extraction function). + * Determines the parallelism of this data set - if known. Typically, + * a data set is split into multiple partitions which can be processed + * in parallel. + * + * @return {@code < 0} if the partition count is unknown, otherwise the + * count of partitions of this dataset (which can potentially + * be processed in parallel) */ - Partitioning getPartitioning(); - + int getNumPartitions(); /** * @return {@code true} if this is a bounded data set, diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Datasets.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Datasets.java index 2da6f4d30e5ad..f3c660fc2c3ee 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Datasets.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Datasets.java @@ -41,16 +41,13 @@ public class Datasets { public static Dataset createOutputFor( Flow flow, Dataset input, Operator op) { - return new OutputDataset(flow, (Operator) op, input.isBounded()) { + return new OutputDataset(flow, op, input.isBounded()) { @Override - @SuppressWarnings("unchecked") - public Partitioning getPartitioning() { - if (op instanceof PartitioningAware) { - // only partitioning aware operators change the partitioning - PartitioningAware pa = (PartitioningAware) op; - return (Partitioning) pa.getPartitioning(); - } - return input.getPartitioning(); + public int getNumPartitions() { + // only partitioning aware operators can change the partition count + return (op instanceof PartitioningAware) + ? ((PartitioningAware) op).getPartitioning().getNumPartitions() + : input.getNumPartitions(); } }; } @@ -70,13 +67,8 @@ public static Dataset createInputFromSource( return new InputDataset(flow, source, source.isBounded()) { @Override - public Partitioning getPartitioning() { - return new Partitioning() { - @Override - public int getNumPartitions() { - return source.getPartitions().size(); - } - }; + public int getNumPartitions() { + return source.getPartitions().size(); } }; } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/flow/Flow.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/flow/Flow.java index 192a89517d6c5..7646480a08578 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/flow/Flow.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/flow/Flow.java @@ -93,6 +93,15 @@ private Flow(String name, Settings settings) { this.settings = cloneSettings(settings); } + /** + * Creates a new (anonymous) Flow. + * + * @return a new flow with an undefined name, + * i.e. either not named at all or with a system generated name + */ + public static Flow create() { + return create(null); + } /** * Creates a new Flow. diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java index 9f232f17e06b5..d542e0737f4d8 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java @@ -70,7 +70,7 @@ public static class WindowingBuilder WindowingBuilder(String name, Dataset input, UnaryFunction keyExtractor) { // define default partitioning - super(new DefaultPartitioning<>(input.getPartitioning().getNumPartitions())); + super(new DefaultPartitioning<>(input.getNumPartitions())); this.name = Objects.requireNonNull(name); this.input = Objects.requireNonNull(input); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java index a70ec91ccaa40..f05c7d50cf9b0 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java @@ -71,7 +71,7 @@ public static class WindowingBuilder UnaryFunction mapper /* optional */) { // define default partitioning - super(new DefaultPartitioning<>(input.getPartitioning().getNumPartitions())); + super(new DefaultPartitioning<>(input.getNumPartitions())); this.name = Objects.requireNonNull(name); this.input = Objects.requireNonNull(input); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java index adf27e1758d7d..976af510caeb1 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java @@ -141,9 +141,8 @@ public static class WindowingBuilder BinaryFunctor joinFunc) { // define default partitioning - super(new DefaultPartitioning<>(Math.max( - left.getPartitioning().getNumPartitions(), - right.getPartitioning().getNumPartitions()))); + super(new DefaultPartitioning<>( + Math.max(left.getNumPartitions(), right.getNumPartitions()))); this.name = Objects.requireNonNull(name); this.left = Objects.requireNonNull(left); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java index e3c3befa01e64..76857d12be280 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java @@ -144,7 +144,7 @@ public static class DatasetBuilder4 ReduceFunction reducer) { // initialize default partitioning according to input - super(new DefaultPartitioning<>(input.getPartitioning().getNumPartitions())); + super(new DefaultPartitioning<>(input.getNumPartitions())); this.name = Objects.requireNonNull(name); this.input = Objects.requireNonNull(input); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java index 0e3d70d64ad09..ae74137a5e7b0 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java @@ -158,7 +158,7 @@ public static class DatasetBuilder5< CombinableReduceFunction stateCombiner) { // initialize default partitioning according to input - super(new DefaultPartitioning<>(input.getPartitioning().getNumPartitions())); + super(new DefaultPartitioning<>(input.getNumPartitions())); this.name = Objects.requireNonNull(name); this.input = Objects.requireNonNull(input); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java index e9b95426177c4..579f20c5e4e95 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java @@ -175,8 +175,7 @@ public Partitioner getPartitioner() { } @Override public int getNumPartitions() { - return numPartitions > 0 - ? numPartitions : input.getPartitioning().getNumPartitions(); + return numPartitions > 0 ? numPartitions : input.getNumPartitions(); } }); this.reducer = reducer; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Repartition.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Repartition.java index 159a46ff13793..3c94985a44231 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Repartition.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Repartition.java @@ -55,7 +55,7 @@ public static class OutputBuilder private final Dataset input; OutputBuilder(String name, Dataset input) { // initialize default partitioning according to input - super(new DefaultPartitioning<>(input.getPartitioning().getNumPartitions())); + super(new DefaultPartitioning<>(input.getNumPartitions())); this.name = Objects.requireNonNull(name); this.input = Objects.requireNonNull(input); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseSingleInputOperator.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseSingleInputOperator.java index fa37a9d8e8f21..cac790fb335ff 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseSingleInputOperator.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseSingleInputOperator.java @@ -49,16 +49,6 @@ protected StateAwareWindowWiseSingleInputOperator( this.output = createOutput(input); } - protected StateAwareWindowWiseSingleInputOperator( - String name, - Flow flow, - Dataset input, - UnaryFunction extractor, - Windowing windowing /* optional */, - UnaryFunction eventTimeAssigner /* optional */) { - this(name, flow, input, extractor, windowing, eventTimeAssigner, input.getPartitioning()); - } - @Override public Collection> listInputs() { return Collections.singletonList(input); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java index ebd98012f4de2..8d23e2cecbdae 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java @@ -76,7 +76,7 @@ public static class ByBuilder2 private UnaryFunction valueExtractor = e -> 1L; ByBuilder2(String name, Dataset input, UnaryFunction keyExtractor) { // initialize default partitioning according to input - super(new DefaultPartitioning<>(input.getPartitioning().getNumPartitions())); + super(new DefaultPartitioning<>(input.getNumPartitions())); this.name = Objects.requireNonNull(name); this.input = Objects.requireNonNull(input); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java index 92b7f5bd346d5..15e80c7ba992b 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java @@ -170,8 +170,7 @@ public static class WindowByBuilder> UnaryFunction valueFn, UnaryFunction scoreFn) { - super(new DefaultPartitioning<>( - input.getPartitioning().getNumPartitions())); + super(new DefaultPartitioning<>(input.getNumPartitions())); this.name = requireNonNull(name); this.input = requireNonNull(input); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Union.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Union.java index 4ab627d7be555..58c2c4494d0a5 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Union.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Union.java @@ -40,8 +40,7 @@ public static class OfBuilder { this.name = name; } - public OutputBuilder of(Dataset left, Dataset right) - { + public OutputBuilder of(Dataset left, Dataset right) { if (right.getFlow() != left.getFlow()) { throw new IllegalArgumentException("Pass inputs from the same flow"); } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/FlatMapTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/FlatMapTest.java index 9bcfd4f5075a4..ea163c8b7d47b 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/FlatMapTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/FlatMapTest.java @@ -17,6 +17,7 @@ import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.flow.Flow; +import cz.seznam.euphoria.core.client.functional.BinaryFunctor; import cz.seznam.euphoria.core.client.io.Context; import org.junit.Test; @@ -57,4 +58,22 @@ public void testBuild_ImplicitName() { FlatMap map = (FlatMap) flow.operators().iterator().next(); assertEquals("FlatMap", map.getName()); } + + /** + * Verify that the number of partitions of the flat map + * operator's input is preserved in the output. + */ + @Test + public void testOutputNumPartitionsIsUnchanged() { + final int N_PARTITIONS = 78; + + Flow f = Flow.create(); + Dataset input = Util.createMockDataset(f, N_PARTITIONS); + assertEquals(N_PARTITIONS, input.getNumPartitions()); + + Dataset output = FlatMap.of(input) + .using((Object o, Context c) -> c.collect(o)) + .output(); + assertEquals(N_PARTITIONS, output.getNumPartitions()); + } } \ No newline at end of file diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/ReduceByKeyTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/ReduceByKeyTest.java index 9bc3449f0d0fe..7671cff03c99c 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/ReduceByKeyTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/ReduceByKeyTest.java @@ -20,11 +20,12 @@ import cz.seznam.euphoria.core.client.dataset.HashPartitioning; import cz.seznam.euphoria.core.client.dataset.windowing.Time; import cz.seznam.euphoria.core.client.flow.Flow; +import cz.seznam.euphoria.core.client.io.Context; import cz.seznam.euphoria.core.client.util.Pair; +import cz.seznam.euphoria.core.client.util.Sums; import org.junit.Test; import java.time.Duration; -import java.util.List; import java.util.stream.StreamSupport; import static org.junit.Assert.*; @@ -148,4 +149,45 @@ public void testBuild_Partitioner() { assertTrue(reduce.getPartitioning().getPartitioner() instanceof HashPartitioner); assertEquals(5, reduce.getPartitioning().getNumPartitions()); } + + /** + * Verify that the number of partitions of the reduce-by-key + * operator's input is preserved in the output since no partitioning + * is explicitly specified. + */ + @Test + public void testOutputNumPartitionsIsPreserved() { + final int N_PARTITIONS = 78; + + Flow f = Flow.create(); + Dataset> input = Util.createMockDataset(f, N_PARTITIONS); + assertEquals(N_PARTITIONS, input.getNumPartitions()); + + Dataset> output = + ReduceByKey.of(input) + .keyBy(Pair::getFirst) + .valueBy(Pair::getSecond) + .combineBy(Sums.ofLongs()) + .output(); + assertEquals(N_PARTITIONS, output.getNumPartitions()); + } + + @Test + public void testOutputExplicitNumPartitionsIsRespected() { + final int INPUT_PARTITIONS = 78; + final int OUTPUT_PARTITIONS = 13; + + Flow f = Flow.create(); + Dataset> input = Util.createMockDataset(f, INPUT_PARTITIONS); + assertEquals(INPUT_PARTITIONS, input.getNumPartitions()); + + Dataset> output = + ReduceByKey.of(input) + .keyBy(Pair::getFirst) + .valueBy(Pair::getSecond) + .combineBy(Sums.ofLongs()) + .setNumPartitions(OUTPUT_PARTITIONS) + .output(); + assertEquals(OUTPUT_PARTITIONS, output.getNumPartitions()); + } } \ No newline at end of file From f133030215e73d6ab2c68550dbe8cb421905a560 Mon Sep 17 00:00:00 2001 From: "Novotnik, Petr" Date: Thu, 2 Feb 2017 16:24:12 +0100 Subject: [PATCH 2/2] #16 Move Partitioning related classes to a dedicated package --- .../dataset/{ => partitioning}/DefaultPartitioner.java | 2 +- .../client/dataset/{ => partitioning}/HashPartitioner.java | 2 +- .../client/dataset/{ => partitioning}/HashPartitioning.java | 2 +- .../core/client/dataset/{ => partitioning}/Partitioner.java | 2 +- .../core/client/dataset/{ => partitioning}/Partitioning.java | 4 ++-- .../cz/seznam/euphoria/core/client/operator/CountByKey.java | 2 +- .../euphoria/core/client/operator/DefaultPartitioning.java | 4 ++-- .../cz/seznam/euphoria/core/client/operator/Distinct.java | 2 +- .../java/cz/seznam/euphoria/core/client/operator/Join.java | 2 +- .../euphoria/core/client/operator/PartitioningAware.java | 4 ++-- .../cz/seznam/euphoria/core/client/operator/ReduceByKey.java | 2 +- .../euphoria/core/client/operator/ReduceStateByKey.java | 2 +- .../seznam/euphoria/core/client/operator/ReduceWindow.java | 4 ++-- .../cz/seznam/euphoria/core/client/operator/Repartition.java | 2 +- .../core/client/operator/StateAwareElementWiseOperator.java | 2 +- .../core/client/operator/StateAwareWindowWiseOperator.java | 4 ++-- .../operator/StateAwareWindowWiseSingleInputOperator.java | 2 +- .../cz/seznam/euphoria/core/client/operator/SumByKey.java | 3 +-- .../cz/seznam/euphoria/core/client/operator/TopPerKey.java | 2 +- .../seznam/euphoria/core/client/operator/CountByKeyTest.java | 4 ++-- .../seznam/euphoria/core/client/operator/DistinctTest.java | 4 ++-- .../cz/seznam/euphoria/core/client/operator/JoinTest.java | 4 ++-- .../euphoria/core/client/operator/ReduceByKeyTest.java | 5 ++--- .../euphoria/core/client/operator/ReduceStateByKeyTest.java | 4 ++-- .../euphoria/core/client/operator/RepartitionTest.java | 2 +- .../seznam/euphoria/core/client/operator/SumByKeyTest.java | 5 ++--- .../euphoria/core/client/operator/TopPerKeyKeyTest.java | 4 ++-- .../seznam/euphoria/flink/batch/RepartitionTranslator.java | 2 +- .../seznam/euphoria/flink/functions/PartitionerWrapper.java | 2 +- .../euphoria/flink/streaming/RepartitionTranslator.java | 2 +- .../src/main/java/cz/seznam/euphoria/fluent/Dataset.java | 2 +- .../main/java/cz/seznam/euphoria/inmem/InMemExecutor.java | 2 +- .../test/java/cz/seznam/euphoria/inmem/WindowingTest.java | 2 +- .../java/cz/seznam/euphoria/spark/PartitioningWrapper.java | 4 ++-- .../java/cz/seznam/euphoria/spark/ReduceByKeyTranslator.java | 2 +- .../java/cz/seznam/euphoria/spark/RepartitionTranslator.java | 4 ++-- 36 files changed, 50 insertions(+), 53 deletions(-) rename sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/{ => partitioning}/DefaultPartitioner.java (93%) rename sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/{ => partitioning}/HashPartitioner.java (92%) rename sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/{ => partitioning}/HashPartitioning.java (94%) rename sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/{ => partitioning}/Partitioner.java (94%) rename sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/{ => partitioning}/Partitioning.java (95%) diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/DefaultPartitioner.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/partitioning/DefaultPartitioner.java similarity index 93% rename from sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/DefaultPartitioner.java rename to sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/partitioning/DefaultPartitioner.java index 131e327876e79..2a1c5bcaee985 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/DefaultPartitioner.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/partitioning/DefaultPartitioner.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package cz.seznam.euphoria.core.client.dataset; +package cz.seznam.euphoria.core.client.dataset.partitioning; /** * Default partitioner used in {@link Partitioning}. It is has its own type diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/HashPartitioner.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/partitioning/HashPartitioner.java similarity index 92% rename from sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/HashPartitioner.java rename to sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/partitioning/HashPartitioner.java index 10912a678bba7..a8025a0e04b73 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/HashPartitioner.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/partitioning/HashPartitioner.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package cz.seznam.euphoria.core.client.dataset; +package cz.seznam.euphoria.core.client.dataset.partitioning; /** * Partitioner by hash of input. diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/HashPartitioning.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/partitioning/HashPartitioning.java similarity index 94% rename from sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/HashPartitioning.java rename to sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/partitioning/HashPartitioning.java index 7f92a3e70b753..8594c1d2ed210 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/HashPartitioning.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/partitioning/HashPartitioning.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package cz.seznam.euphoria.core.client.dataset; +package cz.seznam.euphoria.core.client.dataset.partitioning; /** * Partitioning by hashcode of input. diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Partitioner.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/partitioning/Partitioner.java similarity index 94% rename from sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Partitioner.java rename to sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/partitioning/Partitioner.java index cbc7694a7d746..d35d14099d216 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Partitioner.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/partitioning/Partitioner.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package cz.seznam.euphoria.core.client.dataset; +package cz.seznam.euphoria.core.client.dataset.partitioning; import java.io.Serializable; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Partitioning.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/partitioning/Partitioning.java similarity index 95% rename from sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Partitioning.java rename to sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/partitioning/Partitioning.java index 56f6847cb81c4..1ae5200c22b89 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/Partitioning.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/partitioning/Partitioning.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package cz.seznam.euphoria.core.client.dataset; +package cz.seznam.euphoria.core.client.dataset.partitioning; import java.io.Serializable; @@ -23,7 +23,7 @@ * @param the type of elements this partitioning scheme is able to handle */ public interface Partitioning extends Serializable { - + Partitioner DEFAULT_PARTITIONER = new DefaultPartitioner(); /** @return the actual partitioner */ diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java index d542e0737f4d8..64604c78a5294 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/CountByKey.java @@ -18,7 +18,7 @@ import cz.seznam.euphoria.core.annotation.operator.Derived; import cz.seznam.euphoria.core.annotation.operator.StateComplexity; import cz.seznam.euphoria.core.client.dataset.Dataset; -import cz.seznam.euphoria.core.client.dataset.Partitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning; import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.flow.Flow; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/DefaultPartitioning.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/DefaultPartitioning.java index b422bab10d1e7..74ad19df86bf4 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/DefaultPartitioning.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/DefaultPartitioning.java @@ -15,8 +15,8 @@ */ package cz.seznam.euphoria.core.client.operator; -import cz.seznam.euphoria.core.client.dataset.Partitioner; -import cz.seznam.euphoria.core.client.dataset.Partitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioner; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning; import java.util.Objects; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java index f05c7d50cf9b0..ae2b98877dd4d 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Distinct.java @@ -18,7 +18,7 @@ import cz.seznam.euphoria.core.annotation.operator.Recommended; import cz.seznam.euphoria.core.annotation.operator.StateComplexity; import cz.seznam.euphoria.core.client.dataset.Dataset; -import cz.seznam.euphoria.core.client.dataset.Partitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning; import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.flow.Flow; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java index 976af510caeb1..885b151b18314 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java @@ -18,7 +18,7 @@ import cz.seznam.euphoria.core.annotation.operator.Recommended; import cz.seznam.euphoria.core.annotation.operator.StateComplexity; import cz.seznam.euphoria.core.client.dataset.Dataset; -import cz.seznam.euphoria.core.client.dataset.Partitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning; import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.flow.Flow; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/PartitioningAware.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/PartitioningAware.java index c44bbcd0b2318..928d64f3506d0 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/PartitioningAware.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/PartitioningAware.java @@ -15,8 +15,8 @@ */ package cz.seznam.euphoria.core.client.operator; -import cz.seznam.euphoria.core.client.dataset.Partitioner; -import cz.seznam.euphoria.core.client.dataset.Partitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioner; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning; import java.util.Objects; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java index 76857d12be280..9520330aac7fb 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java @@ -19,7 +19,7 @@ import cz.seznam.euphoria.core.annotation.operator.StateComplexity; import cz.seznam.euphoria.core.client.operator.state.State; import cz.seznam.euphoria.core.client.dataset.Dataset; -import cz.seznam.euphoria.core.client.dataset.Partitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning; import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.flow.Flow; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java index ae74137a5e7b0..674229d96d158 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java @@ -18,7 +18,7 @@ import cz.seznam.euphoria.core.annotation.operator.Basic; import cz.seznam.euphoria.core.annotation.operator.StateComplexity; import cz.seznam.euphoria.core.client.dataset.Dataset; -import cz.seznam.euphoria.core.client.dataset.Partitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning; import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.flow.Flow; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java index 579f20c5e4e95..2d572c5b106dd 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java @@ -18,8 +18,8 @@ import cz.seznam.euphoria.core.annotation.operator.Derived; import cz.seznam.euphoria.core.annotation.operator.StateComplexity; import cz.seznam.euphoria.core.client.dataset.Dataset; -import cz.seznam.euphoria.core.client.dataset.Partitioner; -import cz.seznam.euphoria.core.client.dataset.Partitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioner; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning; import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.flow.Flow; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Repartition.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Repartition.java index 3c94985a44231..24c31c69cd40f 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Repartition.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Repartition.java @@ -18,7 +18,7 @@ import cz.seznam.euphoria.core.annotation.operator.Basic; import cz.seznam.euphoria.core.annotation.operator.StateComplexity; import cz.seznam.euphoria.core.client.dataset.Dataset; -import cz.seznam.euphoria.core.client.dataset.Partitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning; import cz.seznam.euphoria.core.client.flow.Flow; import java.util.Objects; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareElementWiseOperator.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareElementWiseOperator.java index e29a0167a9788..d3c4c1505c668 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareElementWiseOperator.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareElementWiseOperator.java @@ -17,7 +17,7 @@ package cz.seznam.euphoria.core.client.operator; import cz.seznam.euphoria.core.client.dataset.Dataset; -import cz.seznam.euphoria.core.client.dataset.Partitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.UnaryFunction; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseOperator.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseOperator.java index 74cf446ece3a9..891b20b5ee7e1 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseOperator.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseOperator.java @@ -15,8 +15,8 @@ */ package cz.seznam.euphoria.core.client.operator; -import cz.seznam.euphoria.core.client.dataset.Partitioner; -import cz.seznam.euphoria.core.client.dataset.Partitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioner; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning; import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.flow.Flow; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseSingleInputOperator.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseSingleInputOperator.java index cac790fb335ff..1785a70f5845a 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseSingleInputOperator.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/StateAwareWindowWiseSingleInputOperator.java @@ -16,7 +16,7 @@ package cz.seznam.euphoria.core.client.operator; import cz.seznam.euphoria.core.client.dataset.Dataset; -import cz.seznam.euphoria.core.client.dataset.Partitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning; import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.flow.Flow; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java index 8d23e2cecbdae..698a96370b7f4 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/SumByKey.java @@ -17,9 +17,8 @@ import cz.seznam.euphoria.core.annotation.operator.Derived; import cz.seznam.euphoria.core.annotation.operator.StateComplexity; -import cz.seznam.euphoria.core.client.dataset.windowing.Batch; import cz.seznam.euphoria.core.client.dataset.Dataset; -import cz.seznam.euphoria.core.client.dataset.Partitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning; import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.flow.Flow; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java index 15e80c7ba992b..c0bcbec4b6e58 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java @@ -21,7 +21,7 @@ import cz.seznam.euphoria.core.client.operator.state.StorageProvider; import cz.seznam.euphoria.core.client.operator.state.ValueStorage; import cz.seznam.euphoria.core.client.dataset.Dataset; -import cz.seznam.euphoria.core.client.dataset.Partitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning; import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.flow.Flow; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/CountByKeyTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/CountByKeyTest.java index 3b475601d74d6..404737a1fcd9a 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/CountByKeyTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/CountByKeyTest.java @@ -16,8 +16,8 @@ package cz.seznam.euphoria.core.client.operator; import cz.seznam.euphoria.core.client.dataset.Dataset; -import cz.seznam.euphoria.core.client.dataset.HashPartitioner; -import cz.seznam.euphoria.core.client.dataset.HashPartitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.HashPartitioner; +import cz.seznam.euphoria.core.client.dataset.partitioning.HashPartitioning; import cz.seznam.euphoria.core.client.dataset.windowing.Time; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.util.Pair; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/DistinctTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/DistinctTest.java index 657cae7fc6455..deb9f4ec5a408 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/DistinctTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/DistinctTest.java @@ -16,8 +16,8 @@ package cz.seznam.euphoria.core.client.operator; import cz.seznam.euphoria.core.client.dataset.Dataset; -import cz.seznam.euphoria.core.client.dataset.HashPartitioner; -import cz.seznam.euphoria.core.client.dataset.HashPartitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.HashPartitioner; +import cz.seznam.euphoria.core.client.dataset.partitioning.HashPartitioning; import cz.seznam.euphoria.core.client.dataset.windowing.Time; import cz.seznam.euphoria.core.client.flow.Flow; import org.junit.Test; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/JoinTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/JoinTest.java index 57302902706af..22185ded6e692 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/JoinTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/JoinTest.java @@ -16,8 +16,8 @@ package cz.seznam.euphoria.core.client.operator; import cz.seznam.euphoria.core.client.dataset.Dataset; -import cz.seznam.euphoria.core.client.dataset.HashPartitioner; -import cz.seznam.euphoria.core.client.dataset.HashPartitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.HashPartitioner; +import cz.seznam.euphoria.core.client.dataset.partitioning.HashPartitioning; import cz.seznam.euphoria.core.client.dataset.windowing.Time; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.io.Context; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/ReduceByKeyTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/ReduceByKeyTest.java index 7671cff03c99c..8f15a732bbc3c 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/ReduceByKeyTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/ReduceByKeyTest.java @@ -16,11 +16,10 @@ package cz.seznam.euphoria.core.client.operator; import cz.seznam.euphoria.core.client.dataset.Dataset; -import cz.seznam.euphoria.core.client.dataset.HashPartitioner; -import cz.seznam.euphoria.core.client.dataset.HashPartitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.HashPartitioner; +import cz.seznam.euphoria.core.client.dataset.partitioning.HashPartitioning; import cz.seznam.euphoria.core.client.dataset.windowing.Time; import cz.seznam.euphoria.core.client.flow.Flow; -import cz.seznam.euphoria.core.client.io.Context; import cz.seznam.euphoria.core.client.util.Pair; import cz.seznam.euphoria.core.client.util.Sums; import org.junit.Test; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKeyTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKeyTest.java index d0a173c395929..5e835b6cf992d 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKeyTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKeyTest.java @@ -16,8 +16,8 @@ package cz.seznam.euphoria.core.client.operator; import cz.seznam.euphoria.core.client.dataset.Dataset; -import cz.seznam.euphoria.core.client.dataset.HashPartitioner; -import cz.seznam.euphoria.core.client.dataset.HashPartitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.HashPartitioner; +import cz.seznam.euphoria.core.client.dataset.partitioning.HashPartitioning; import cz.seznam.euphoria.core.client.dataset.windowing.Time; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.io.Context; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/RepartitionTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/RepartitionTest.java index 9470944959ec6..7e40c314baf12 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/RepartitionTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/RepartitionTest.java @@ -16,7 +16,7 @@ package cz.seznam.euphoria.core.client.operator; import cz.seznam.euphoria.core.client.dataset.Dataset; -import cz.seznam.euphoria.core.client.dataset.HashPartitioner; +import cz.seznam.euphoria.core.client.dataset.partitioning.HashPartitioner; import cz.seznam.euphoria.core.client.flow.Flow; import org.junit.Test; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/SumByKeyTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/SumByKeyTest.java index 96bf2eefef542..1da3818934aaa 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/SumByKeyTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/SumByKeyTest.java @@ -15,10 +15,9 @@ */ package cz.seznam.euphoria.core.client.operator; -import cz.seznam.euphoria.core.client.dataset.windowing.Batch; import cz.seznam.euphoria.core.client.dataset.Dataset; -import cz.seznam.euphoria.core.client.dataset.HashPartitioner; -import cz.seznam.euphoria.core.client.dataset.HashPartitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.HashPartitioner; +import cz.seznam.euphoria.core.client.dataset.partitioning.HashPartitioning; import cz.seznam.euphoria.core.client.dataset.windowing.Time; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.util.Pair; diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/TopPerKeyKeyTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/TopPerKeyKeyTest.java index b3608d1d53cd1..f352fe87f7a9a 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/TopPerKeyKeyTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/TopPerKeyKeyTest.java @@ -17,8 +17,8 @@ import com.google.common.collect.Iterables; import cz.seznam.euphoria.core.client.dataset.Dataset; -import cz.seznam.euphoria.core.client.dataset.HashPartitioner; -import cz.seznam.euphoria.core.client.dataset.HashPartitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.HashPartitioner; +import cz.seznam.euphoria.core.client.dataset.partitioning.HashPartitioning; import cz.seznam.euphoria.core.client.dataset.windowing.Time; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.util.Triple; diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/RepartitionTranslator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/RepartitionTranslator.java index b400c5c907bd8..9e1c4922d6648 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/RepartitionTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/RepartitionTranslator.java @@ -15,7 +15,7 @@ */ package cz.seznam.euphoria.flink.batch; -import cz.seznam.euphoria.core.client.dataset.Partitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning; import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement; import cz.seznam.euphoria.core.client.operator.Repartition; import cz.seznam.euphoria.flink.FlinkOperator; diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/functions/PartitionerWrapper.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/functions/PartitionerWrapper.java index b55f42ed89fad..55a601e271643 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/functions/PartitionerWrapper.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/functions/PartitionerWrapper.java @@ -15,7 +15,7 @@ */ package cz.seznam.euphoria.flink.functions; -import cz.seznam.euphoria.core.client.dataset.Partitioner; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioner; public class PartitionerWrapper implements org.apache.flink.api.common.functions.Partitioner diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/RepartitionTranslator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/RepartitionTranslator.java index dd48ee83ea603..c96f0ed5ce924 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/RepartitionTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/RepartitionTranslator.java @@ -15,7 +15,7 @@ */ package cz.seznam.euphoria.flink.streaming; -import cz.seznam.euphoria.core.client.dataset.Partitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning; import cz.seznam.euphoria.core.client.operator.Repartition; import cz.seznam.euphoria.flink.FlinkOperator; import cz.seznam.euphoria.flink.functions.PartitionerWrapper; diff --git a/sdks/java/extensions/euphoria/euphoria-fluent/src/main/java/cz/seznam/euphoria/fluent/Dataset.java b/sdks/java/extensions/euphoria/euphoria-fluent/src/main/java/cz/seznam/euphoria/fluent/Dataset.java index 23bd20d5f6c58..9402ac8f784e1 100644 --- a/sdks/java/extensions/euphoria/euphoria-fluent/src/main/java/cz/seznam/euphoria/fluent/Dataset.java +++ b/sdks/java/extensions/euphoria/euphoria-fluent/src/main/java/cz/seznam/euphoria/fluent/Dataset.java @@ -15,7 +15,7 @@ */ package cz.seznam.euphoria.fluent; -import cz.seznam.euphoria.core.client.dataset.Partitioner; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioner; import cz.seznam.euphoria.core.client.functional.UnaryFunction; import cz.seznam.euphoria.core.client.functional.UnaryFunctor; import cz.seznam.euphoria.core.client.io.DataSink; diff --git a/sdks/java/extensions/euphoria/euphoria-inmem/src/main/java/cz/seznam/euphoria/inmem/InMemExecutor.java b/sdks/java/extensions/euphoria/euphoria-inmem/src/main/java/cz/seznam/euphoria/inmem/InMemExecutor.java index 1755982ae0092..b4290323f9e81 100644 --- a/sdks/java/extensions/euphoria/euphoria-inmem/src/main/java/cz/seznam/euphoria/inmem/InMemExecutor.java +++ b/sdks/java/extensions/euphoria/euphoria-inmem/src/main/java/cz/seznam/euphoria/inmem/InMemExecutor.java @@ -16,7 +16,7 @@ package cz.seznam.euphoria.inmem; import com.google.common.collect.Iterables; -import cz.seznam.euphoria.core.client.dataset.Partitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning; import cz.seznam.euphoria.core.client.dataset.windowing.Batch; import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing; import cz.seznam.euphoria.core.client.dataset.windowing.Window; diff --git a/sdks/java/extensions/euphoria/euphoria-inmem/src/test/java/cz/seznam/euphoria/inmem/WindowingTest.java b/sdks/java/extensions/euphoria/euphoria-inmem/src/test/java/cz/seznam/euphoria/inmem/WindowingTest.java index a3aee2018c120..262186df0d928 100644 --- a/sdks/java/extensions/euphoria/euphoria-inmem/src/test/java/cz/seznam/euphoria/inmem/WindowingTest.java +++ b/sdks/java/extensions/euphoria/euphoria-inmem/src/test/java/cz/seznam/euphoria/inmem/WindowingTest.java @@ -17,7 +17,7 @@ import com.google.common.collect.Sets; import cz.seznam.euphoria.core.client.dataset.Dataset; -import cz.seznam.euphoria.core.client.dataset.Partitioner; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioner; import cz.seznam.euphoria.core.client.dataset.windowing.Count; import cz.seznam.euphoria.core.client.dataset.windowing.Time; import cz.seznam.euphoria.core.client.dataset.windowing.TimeInterval; diff --git a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/PartitioningWrapper.java b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/PartitioningWrapper.java index 8f7f234ae58c9..4acca26c57813 100644 --- a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/PartitioningWrapper.java +++ b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/PartitioningWrapper.java @@ -15,7 +15,7 @@ */ package cz.seznam.euphoria.spark; -import cz.seznam.euphoria.core.client.dataset.Partitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning; import org.apache.spark.Partitioner; /** @@ -23,7 +23,7 @@ */ class PartitioningWrapper extends Partitioner { - private final cz.seznam.euphoria.core.client.dataset.Partitioner partitioner; + private final cz.seznam.euphoria.core.client.dataset.partitioning.Partitioner partitioner; private final int numPartitions; public PartitioningWrapper(Partitioning partitioning) { diff --git a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/ReduceByKeyTranslator.java b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/ReduceByKeyTranslator.java index 4bdb422ce6215..0924d3dd76619 100644 --- a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/ReduceByKeyTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/ReduceByKeyTranslator.java @@ -16,7 +16,7 @@ package cz.seznam.euphoria.spark; import com.google.common.base.Preconditions; -import cz.seznam.euphoria.core.client.dataset.Partitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning; import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing; import cz.seznam.euphoria.core.client.dataset.windowing.TimedWindow; import cz.seznam.euphoria.core.client.dataset.windowing.Window; diff --git a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/RepartitionTranslator.java b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/RepartitionTranslator.java index fdbb16fd4784e..c348ae3b97708 100644 --- a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/RepartitionTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/RepartitionTranslator.java @@ -15,8 +15,8 @@ */ package cz.seznam.euphoria.spark; -import cz.seznam.euphoria.core.client.dataset.Partitioner; -import cz.seznam.euphoria.core.client.dataset.Partitioning; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioner; +import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning; import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement; import cz.seznam.euphoria.core.client.operator.Repartition; import org.apache.spark.api.java.JavaPairRDD;