From 9c389672887d3f03310b4115e8a627b90b4e6bfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1ra=20Van=C4=9Bk?= Date: Tue, 23 May 2017 18:08:11 +0200 Subject: [PATCH] #31 [euphoria-core] Implementation of accumulator API + integration with Flink executor --- .../common/trends/EuphoriaTrends.java | 6 +- .../euphoria/common/trends/Util.java | 6 +- .../core/client/accumulators/Accumulator.java | 26 +++ .../accumulators/AccumulatorProvider.java | 62 +++++++ .../core/client/accumulators/Counter.java | 33 ++++ .../core/client/accumulators/Histogram.java | 35 ++++ .../core/client/accumulators/Timer.java | 41 ++++ .../accumulators/VoidAccumulatorProvider.java | 125 +++++++++++++ .../client/dataset/windowing/TimeSliding.java | 4 +- .../core/client/functional/BinaryFunctor.java | 4 +- .../client/functional/UnaryFunctionEnv.java | 41 ++++ .../core/client/functional/UnaryFunctor.java | 11 +- .../core/client/io/AbstractCollector.java | 48 +++++ .../euphoria/core/client/io/Collector.java | 32 ++++ .../euphoria/core/client/io/Context.java | 23 +-- .../euphoria/core/client/io/Environment.java | 71 +++++++ .../core/client/operator/FlatMap.java | 3 +- .../euphoria/core/client/operator/Join.java | 14 +- .../core/client/operator/MapElements.java | 76 +++++++- .../core/client/operator/ReduceByKey.java | 16 +- .../core/client/operator/ReduceWindow.java | 6 +- .../euphoria/core/client/operator/Sort.java | 4 +- .../core/client/operator/TopPerKey.java | 6 +- .../core/client/operator/state/State.java | 6 +- .../client/operator/state/StateFactory.java | 4 +- .../euphoria/core/executor/Executor.java | 11 ++ .../core/executor/greduce/GroupReducer.java | 37 +++- .../executor/util/SingleValueContext.java | 29 ++- .../core/client/operator/FlatMapTest.java | 33 +++- .../core/client/operator/JoinTest.java | 46 ++++- .../core/client/operator/MapElementsTest.java | 31 +++- .../client/operator/ReduceStateByKeyTest.java | 6 +- .../core/executor/FlowUnfolderTest.java | 6 +- .../examples/wordcount/AccessLogCount.java | 4 +- .../examples/wordcount/SimpleWordCount.java | 4 +- .../euphoria/flink/ExecutorContext.java | 20 +- .../seznam/euphoria/flink/FlinkExecutor.java | 38 +++- .../euphoria/flink/TestFlinkExecutor.java | 9 +- .../accumulators/FlinkAccumulatorFactory.java | 59 ++++++ .../accumulators/FlinkNativeAccumulators.java | 175 ++++++++++++++++++ .../flink/batch/BatchExecutorContext.java | 9 +- .../flink/batch/BatchFlowTranslator.java | 21 ++- .../flink/batch/BatchUnaryFunctorWrapper.java | 35 ++-- .../flink/batch/FlatMapTranslator.java | 8 +- .../batch/ReduceStateByKeyTranslator.java | 25 ++- .../flink/streaming/FlatMapTranslator.java | 10 +- .../streaming/ReduceStateByKeyTranslator.java | 11 +- .../streaming/StreamingExecutorContext.java | 6 +- .../streaming/StreamingFlowTranslator.java | 10 +- .../StreamingUnaryFunctorWrapper.java | 35 ++-- .../windowing/AbstractWindowOperator.java | 25 ++- ...yedMultiWindowedElementWindowOperator.java | 35 ++-- .../StreamingElementWindowOperator.java | 9 +- .../flink/streaming/RSBKWindowingTest.java | 12 +- .../flink/testkit/FlinkExecutorProvider.java | 4 + .../cz/seznam/euphoria/fluent/FluentTest.java | 4 +- .../euphoria/hadoop/ExerciseHadoopIO.java | 4 +- .../seznam/euphoria/inmem/InMemExecutor.java | 6 + .../inmem/WindowedElementCollector.java | 9 +- .../euphoria/inmem/BasicOperatorTest.java | 10 +- .../euphoria/inmem/InMemExecutorTest.java | 10 +- .../euphoria/operator/test/FlatMapTest.java | 4 +- .../euphoria/operator/test/JoinTest.java | 10 +- .../operator/test/MapElementsTest.java | 3 +- .../operator/test/ReduceByKeyTest.java | 8 +- .../operator/test/ReduceStateByKeyTest.java | 14 +- .../euphoria/operator/test/WatermarkTest.java | 4 +- .../euphoria/operator/test/WindowingTest.java | 12 +- ...ionContext.java => FunctionCollector.java} | 4 +- ...Async.java => FunctionCollectorAsync.java} | 6 +- ...textMem.java => FunctionCollectorMem.java} | 6 +- .../spark/ReduceStateByKeyTranslator.java | 5 +- .../seznam/euphoria/spark/SparkExecutor.java | 6 + .../euphoria/spark/UnaryFunctorWrapper.java | 4 +- 74 files changed, 1352 insertions(+), 243 deletions(-) create mode 100644 sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/accumulators/Accumulator.java create mode 100644 sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/accumulators/AccumulatorProvider.java create mode 100644 sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/accumulators/Counter.java create mode 100644 sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/accumulators/Histogram.java create mode 100644 sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/accumulators/Timer.java create mode 100644 sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/accumulators/VoidAccumulatorProvider.java create mode 100644 sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/functional/UnaryFunctionEnv.java create mode 100644 sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/io/AbstractCollector.java create mode 100644 sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/io/Collector.java create mode 100644 sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/io/Environment.java create mode 100644 sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/accumulators/FlinkAccumulatorFactory.java create mode 100644 sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/accumulators/FlinkNativeAccumulators.java rename sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/{FunctionContext.java => FunctionCollector.java} (88%) rename sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/{FunctionContextAsync.java => FunctionCollectorAsync.java} (94%) rename sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/{FunctionContextMem.java => FunctionCollectorMem.java} (87%) diff --git a/sdks/java/extensions/euphoria/benchmarks/euphoria/euphoria-common/src/main/java/cz/seznam/euphoria/benchmarks/euphoria/common/trends/EuphoriaTrends.java b/sdks/java/extensions/euphoria/benchmarks/euphoria/euphoria-common/src/main/java/cz/seznam/euphoria/benchmarks/euphoria/common/trends/EuphoriaTrends.java index b30cdd9c9ac6d..335df5c4f2ca6 100644 --- a/sdks/java/extensions/euphoria/benchmarks/euphoria/euphoria-common/src/main/java/cz/seznam/euphoria/benchmarks/euphoria/common/trends/EuphoriaTrends.java +++ b/sdks/java/extensions/euphoria/benchmarks/euphoria/euphoria-common/src/main/java/cz/seznam/euphoria/benchmarks/euphoria/common/trends/EuphoriaTrends.java @@ -23,7 +23,7 @@ import cz.seznam.euphoria.core.client.dataset.windowing.TimeSliding; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.UnaryFunctor; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.io.StdoutSink; import cz.seznam.euphoria.core.client.io.VoidSink; import cz.seznam.euphoria.core.client.operator.FlatMap; @@ -148,7 +148,7 @@ public void execute() throws Exception { .by(Pair::getFirst, Pair::getFirst) .using((Pair left, Pair right, - Context context) -> { + Collector context) -> { double score = rank( longIntervalMillis, left.getSecond(), shortItervalMillis, right.getSecond(), @@ -170,7 +170,7 @@ public void execute() throws Exception { .output(); FlatMap.of(output) - .using((Triple, Double> e, Context c) -> { + .using((Triple, Double> e, Collector c) -> { Date now = new Date(); Date stamp = new Date(TimeSliding.getLabel(c).getEndMillis()); c.collect(now + ": " + stamp + ", " + e.getSecond().getFirst() + ", " + e.getSecond().getSecond()); diff --git a/sdks/java/extensions/euphoria/benchmarks/euphoria/euphoria-common/src/main/java/cz/seznam/euphoria/benchmarks/euphoria/common/trends/Util.java b/sdks/java/extensions/euphoria/benchmarks/euphoria/euphoria-common/src/main/java/cz/seznam/euphoria/benchmarks/euphoria/common/trends/Util.java index 1708c824a55c3..7d0af3b7c36fe 100644 --- a/sdks/java/extensions/euphoria/benchmarks/euphoria/euphoria-common/src/main/java/cz/seznam/euphoria/benchmarks/euphoria/common/trends/Util.java +++ b/sdks/java/extensions/euphoria/benchmarks/euphoria/euphoria-common/src/main/java/cz/seznam/euphoria/benchmarks/euphoria/common/trends/Util.java @@ -20,7 +20,7 @@ import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.UnaryFunctor; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.io.DataSource; import cz.seznam.euphoria.core.client.io.ListDataSource; import cz.seznam.euphoria.core.client.operator.FlatMap; @@ -46,7 +46,7 @@ static Dataset> getInput(boolean test, URI uri, Flow flow) .using(new UnaryFunctor, Pair>() { private final SearchEventsParser parser = new SearchEventsParser(); @Override - public void apply(Pair pair, Context> context) { + public void apply(Pair pair, Collector> context) { try { SearchEventsParser.Query q = parser.parse(pair.getSecond()); if (q != null && q.query != null && !q.query.isEmpty()) { @@ -69,7 +69,7 @@ public void apply(Pair pair, Context> context .using(new UnaryFunctor>() { SearchEventsParser parser = new SearchEventsParser(); @Override - public void apply(String line, Context> context) { + public void apply(String line, Collector> context) { try { SearchEventsParser.Query q = parser.parse(line); if (q != null && q.query != null && !q.query.isEmpty()) { diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/accumulators/Accumulator.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/accumulators/Accumulator.java new file mode 100644 index 0000000000000..1822cb33feb35 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/accumulators/Accumulator.java @@ -0,0 +1,26 @@ +/** + * Copyright 2016-2017 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.core.client.accumulators; + +/** + * Accumulators collect values from user functions. + * Accumulators allow user to calculate statistics during the flow execution. + *

+ * Accumulators are inspired by the Hadoop/MapReduce counters. + */ +public interface Accumulator { + +} diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/accumulators/AccumulatorProvider.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/accumulators/AccumulatorProvider.java new file mode 100644 index 0000000000000..76ee463adbfcf --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/accumulators/AccumulatorProvider.java @@ -0,0 +1,62 @@ +/** + * Copyright 2016-2017 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.core.client.accumulators; + +import cz.seznam.euphoria.core.util.Settings; + +import java.io.Serializable; + +/** + * Provides access to an accumulator backend service. It is intended to be + * implemented by third party to support different type of services. + */ +public interface AccumulatorProvider { + + /** + * Get an existing instance of a counter or create a new one. + * + * @param name Unique name of the counter. + * @return Instance of a counter. + */ + Counter getCounter(String name); + + /** + * Get an existing instance of a histogram or create a new one. + * + * @param name Unique name of the histogram. + * @return Instance of a histogram. + */ + Histogram getHistogram(String name); + + /** + * Get an existing instance of a timer or create a new one. + * + * @param name Unique name of the timer. + * @return Instance of a timer. + */ + Timer getTimer(String name); + + /** + * Creates a new instance of {@link AccumulatorProvider} + * initialized by given settings. + *

+ * It is required this factory is thread-safe. + */ + @FunctionalInterface + interface Factory extends Serializable { + AccumulatorProvider create(Settings settings); + } +} diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/accumulators/Counter.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/accumulators/Counter.java new file mode 100644 index 0000000000000..23e2eddf3c659 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/accumulators/Counter.java @@ -0,0 +1,33 @@ +/** + * Copyright 2016-2017 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.core.client.accumulators; + +/** + * Counter is a type of accumulator making a sum from integral numbers. + */ +public interface Counter extends Accumulator { + + /** + * Increment counter by given value. + * @param value Value to be added to the counter. + */ + void increment(long value); + + /** + * Increment counter by one. + */ + void increment(); +} diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/accumulators/Histogram.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/accumulators/Histogram.java new file mode 100644 index 0000000000000..7f07ecc758fea --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/accumulators/Histogram.java @@ -0,0 +1,35 @@ +/** + * Copyright 2016-2017 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.core.client.accumulators; + +/** + * Histogram is a type of accumulator recording a distribution of different values. + */ +public interface Histogram extends Accumulator { + + /** + * Add specified value. + * @param value Value to be added. + */ + void add(long value); + + /** + * Add specified value multiple times. + * @param value Value to be added. + * @param times Number of occurrences to add. + */ + void add(long value, long times); +} diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/accumulators/Timer.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/accumulators/Timer.java new file mode 100644 index 0000000000000..9fa320aae3623 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/accumulators/Timer.java @@ -0,0 +1,41 @@ +/** + * Copyright 2016-2017 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.core.client.accumulators; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +/** + * Timer provides convenience API very similar to {@link Histogram} + * but extended by time unit support. + */ +public interface Timer extends Accumulator { + + /** + * Add specific duration. + * @param duration Duration to be added. + */ + void add(Duration duration); + + /** + * Add specific duration with given time unit + * @param duration Duration to be added. + * @param unit Time unit. + */ + default void add(long duration, TimeUnit unit) { + add(Duration.ofMillis(unit.toMillis(duration))); + } +} diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/accumulators/VoidAccumulatorProvider.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/accumulators/VoidAccumulatorProvider.java new file mode 100644 index 0000000000000..269b9336a8416 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/accumulators/VoidAccumulatorProvider.java @@ -0,0 +1,125 @@ +/** + * Copyright 2016-2017 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.core.client.accumulators; + +import cz.seznam.euphoria.core.util.Settings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + +/** + * Placeholder implementation of {@link AccumulatorProvider} that + * may be used in executors as a default. + */ +public class VoidAccumulatorProvider implements AccumulatorProvider { + + private static final Logger LOG = LoggerFactory.getLogger(VoidAccumulatorProvider.class); + + private final Map accumulators = new HashMap<>(); + + private VoidAccumulatorProvider() {} + + @Override + public Counter getCounter(String name) { + return getAccumulator(name, VoidCounter.class); + } + + @Override + public Histogram getHistogram(String name) { + return getAccumulator(name, VoidHistogram.class); + } + + @Override + public Timer getTimer(String name) { + return getAccumulator(name, VoidTimer.class); + } + + private ACC getAccumulator(String name, Class clz) { + try { + ACC acc = clz.getConstructor().newInstance(); + if (accumulators.putIfAbsent(name, acc) == null) { + LOG.warn("Using accumulators with VoidAccumulatorProvider will have no effect"); + } + return acc; + } catch (Exception e) { + throw new RuntimeException("Exception during accumulator initialization: " + clz, e); + } + } + + public static Factory getFactory() { + return Factory.get(); + } + + // ------------------------ + + public static class Factory implements AccumulatorProvider.Factory { + + private static final Factory INSTANCE = new Factory(); + + private static final AccumulatorProvider PROVIDER = + new VoidAccumulatorProvider(); + + private Factory() {} + + @Override + public AccumulatorProvider create(Settings settings) { + return PROVIDER; + } + + public static Factory get() { + return INSTANCE; + } + } + + // ------------------------ + + public static class VoidCounter implements Counter { + + @Override + public void increment(long value) { + // NOOP + } + + @Override + public void increment() { + // NOOP + } + } + + public static class VoidHistogram implements Histogram { + + @Override + public void add(long value) { + // NOOP + } + + @Override + public void add(long value, long times) { + // NOOP + } + } + + public static class VoidTimer implements Timer { + + @Override + public void add(Duration duration) { + // NOOP + } + } +} diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/TimeSliding.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/TimeSliding.java index 97adc4ff645f7..623bc2886be09 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/TimeSliding.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/dataset/windowing/TimeSliding.java @@ -15,7 +15,7 @@ */ package cz.seznam.euphoria.core.client.dataset.windowing; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.triggers.TimeTrigger; import cz.seznam.euphoria.core.client.triggers.Trigger; import cz.seznam.euphoria.shaded.guava.com.google.common.base.Preconditions; @@ -44,7 +44,7 @@ public static TimeSliding of(Duration duration, Duration step) { * @throws ClassCastException if the context is not part of a * time-sliding execution */ - public static TimeInterval getLabel(Context context) { + public static TimeInterval getLabel(Collector context) { return (TimeInterval) context.getWindow(); } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/functional/BinaryFunctor.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/functional/BinaryFunctor.java index d8248da943b1f..332ea77b886c7 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/functional/BinaryFunctor.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/functional/BinaryFunctor.java @@ -15,7 +15,7 @@ */ package cz.seznam.euphoria.core.client.functional; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import java.io.Serializable; /** @@ -24,6 +24,6 @@ @FunctionalInterface public interface BinaryFunctor extends Serializable { - void apply(LEFT left, RIGHT right, Context context); + void apply(LEFT left, RIGHT right, Collector context); } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/functional/UnaryFunctionEnv.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/functional/UnaryFunctionEnv.java new file mode 100644 index 0000000000000..95e9a0dc2a29a --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/functional/UnaryFunctionEnv.java @@ -0,0 +1,41 @@ +/** + * Copyright 2016-2017 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.core.client.functional; + +import cz.seznam.euphoria.core.client.io.Context; + +import java.io.Serializable; + +/** + * Function of single argument with access to Euphoria environment via context. + * + * @param the type of the element processed + * @param the type of the result applying element to the function + */ +@FunctionalInterface +public interface UnaryFunctionEnv extends Serializable { + + /** + * Applies function to given element. + * + * @param what The element applied to the function + * @param context Provides access to the environment. + * + * @return the result of the function application + */ + OUT apply(IN what, Context context); + +} diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/functional/UnaryFunctor.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/functional/UnaryFunctor.java index 6ddc0616876be..f0631ed563f83 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/functional/UnaryFunctor.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/functional/UnaryFunctor.java @@ -15,7 +15,7 @@ */ package cz.seznam.euphoria.core.client.functional; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import java.io.Serializable; /** @@ -26,6 +26,11 @@ @FunctionalInterface public interface UnaryFunctor extends Serializable { - void apply(IN elem, Context context); - + /** + * Applies function to given element. + * + * @param elem Input element. + * @param collector Collector to emit results. + */ + void apply(IN elem, Collector collector); } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/io/AbstractCollector.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/io/AbstractCollector.java new file mode 100644 index 0000000000000..9094e5859fa1f --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/io/AbstractCollector.java @@ -0,0 +1,48 @@ +/** + * Copyright 2016-2017 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.core.client.io; + +import cz.seznam.euphoria.core.client.accumulators.AccumulatorProvider; +import cz.seznam.euphoria.core.client.accumulators.Counter; +import cz.seznam.euphoria.core.client.accumulators.Histogram; +import cz.seznam.euphoria.core.client.accumulators.Timer; + +/** + * Abstract implementation of a collector supporting access to accumulators. + */ +public abstract class AbstractCollector implements Collector { + + private final AccumulatorProvider accumulators; + + public AbstractCollector(AccumulatorProvider accumulators) { + this.accumulators = accumulators; + } + + @Override + public Counter getCounter(String name) { + return accumulators.getCounter(name); + } + + @Override + public Histogram getHistogram(String name) { + return accumulators.getHistogram(name); + } + + @Override + public Timer getTimer(String name) { + return accumulators.getTimer(name); + } +} diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/io/Collector.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/io/Collector.java new file mode 100644 index 0000000000000..33faf3d23040b --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/io/Collector.java @@ -0,0 +1,32 @@ +/** + * Copyright 2016-2017 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.core.client.io; + +/** + * Extends {@link Environment} with write capability. Used in + * user defined functors. + * + * @param the type of elements collected through this context + */ +public interface Collector extends Environment { + + /** + * Collects the given element to the output of this context. + * + * @param elem the element to collect + */ + void collect(T elem); +} \ No newline at end of file diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/io/Context.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/io/Context.java index fff8d249bd444..fa45774e53b6c 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/io/Context.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/io/Context.java @@ -16,27 +16,8 @@ package cz.seznam.euphoria.core.client.io; /** - * A collector of elements. Used in functors. - * - * @param the type of elements collected through this context + * Used in user defined functions to access environment methods. */ -public interface Context { - - /** - * Collects the given element to the output of this context. - * - * @param elem the element to collect - */ - void collect(T elem); - - /** - * Retrieves the window - if any - underlying the current - * execution of this context. - * - * @return {@code null} if this context is not executed within a - * windowing strategy, otherwise the current window of - * this context - */ - Object getWindow(); +public interface Context extends Environment { } \ No newline at end of file diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/io/Environment.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/io/Environment.java new file mode 100644 index 0000000000000..e939a3c2a3786 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/io/Environment.java @@ -0,0 +1,71 @@ +/** + * Copyright 2016-2017 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.core.client.io; + +import cz.seznam.euphoria.core.client.accumulators.Counter; +import cz.seznam.euphoria.core.client.accumulators.Histogram; +import cz.seznam.euphoria.core.client.accumulators.Timer; + +/** + * Defines basic methods available in user defined functions. + */ +public interface Environment { + + /** + * Retrieves the window - if any - underlying the current + * execution of this context. + * + * @return {@code null} if this context is not executed within a + * windowing strategy, otherwise the current window of + * this context + */ + Object getWindow(); + + // ---------------- Aggregator related methods ------------ + + // FIXME remove default implementation + // it's just temporary to make the whole project compilable + + /** + * Get an existing instance of a counter or create a new one. + * + * @param name Unique name of the counter. + * @return Instance of a counter. + */ + default Counter getCounter(String name) { + throw new IllegalStateException("Accumulators not supported yet."); + } + + /** + * Get an existing instance of a histogram or create a new one. + * + * @param name Unique name of the histogram. + * @return Instance of a histogram. + */ + default Histogram getHistogram(String name) { + throw new IllegalStateException("Accumulators not supported yet."); + } + + /** + * Get an existing instance of a timer or create a new one. + * + * @param name Unique name of the timer. + * @return Instance of a timer. + */ + default Timer getTimer(String name) { + throw new IllegalStateException("Accumulators not supported yet."); + } +} diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/FlatMap.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/FlatMap.java index 993f443b8c60d..43956afe55174 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/FlatMap.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/FlatMap.java @@ -20,6 +20,7 @@ import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.UnaryFunctor; +import cz.seznam.euphoria.core.client.io.Collector; import javax.annotation.Nullable; import java.util.Objects; @@ -52,7 +53,7 @@ * * The above example tries to parse incoming strings as integers, silently * skipping those which cannot be successfully converted. While - * {@link cz.seznam.euphoria.core.client.io.Context#collect(Object)} has + * {@link Collector#collect(Object)} has * been used only once here, a {@link FlatMap} operator is free * to invoke it multiple times or not at all to generate that many elements * to the output dataset. diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java index 84b23493865cc..cf11b9dee3303 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Join.java @@ -26,7 +26,7 @@ import cz.seznam.euphoria.core.client.functional.BinaryFunctor; import cz.seznam.euphoria.core.client.functional.UnaryFunction; import cz.seznam.euphoria.core.client.graph.DAG; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.operator.state.ListStorage; import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor; import cz.seznam.euphoria.core.client.operator.state.State; @@ -301,7 +301,7 @@ public void close() { } void flushUnjoinedElems( - Context context, Iterable lefts, Iterable rights) { + Collector context, Iterable lefts, Iterable rights) { boolean leftEmpty = !lefts.iterator().hasNext(); boolean rightEmpty = !rights.iterator().hasNext(); // if just a one collection is empty @@ -344,7 +344,7 @@ public void add(Either elem) { } @Override - public void flush(Context context) { + public void flush(Collector context) { Iterable lefts = leftElements.get(); Iterable rights = rightElements.get(); for (LEFT l : lefts) { @@ -390,10 +390,10 @@ private class EarlyEmittingJoinState extends AbstractJoinState implements State, OUT>, StateSupport.MergeFrom { - private final Context context; + private final Collector context; @SuppressWarnings("unchecked") - public EarlyEmittingJoinState(StorageProvider storageProvider, Context context) { + public EarlyEmittingJoinState(StorageProvider storageProvider, Collector context) { super(storageProvider); this.context = Objects.requireNonNull(context); } @@ -424,7 +424,7 @@ private void emitJoinedElements(Either elem, ListStorage others) { } @Override - public void flush(Context context) { + public void flush(Collector context) { // ~ no-op; we do all the work already on the fly // and flush any "pending" state _only_ when closing // this state @@ -500,7 +500,7 @@ public BinaryFunctor getJoiner() { keyExtractor, e -> e, getWindowing(), - (StorageProvider storages, Context ctx) -> + (StorageProvider storages, Collector ctx) -> ctx == null ? new StableJoinState(storages) : new EarlyEmittingJoinState(storages, ctx), diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/MapElements.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/MapElements.java index 9f22be58876c7..a89b37756d6e4 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/MapElements.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/MapElements.java @@ -17,10 +17,16 @@ import cz.seznam.euphoria.core.annotation.operator.Derived; import cz.seznam.euphoria.core.annotation.operator.StateComplexity; +import cz.seznam.euphoria.core.client.accumulators.Counter; +import cz.seznam.euphoria.core.client.accumulators.Histogram; +import cz.seznam.euphoria.core.client.accumulators.Timer; import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.UnaryFunction; +import cz.seznam.euphoria.core.client.functional.UnaryFunctionEnv; import cz.seznam.euphoria.core.client.graph.DAG; +import cz.seznam.euphoria.core.client.io.Collector; +import cz.seznam.euphoria.core.client.io.Context; import java.util.Objects; @@ -59,13 +65,26 @@ public static class UsingBuilder { /** * The mapping function that takes input element and outputs the OUT type element. - * + * If you want use aggregators use rather {@link #using(UnaryFunctionEnv)}. + * * @param type of output elements * @param mapper the mapping function * @return the next builder to complete the setup of the * {@link MapElements} operator */ public OutputBuilder using(UnaryFunction mapper) { + return new OutputBuilder<>(name, input, ((el, ctx) -> mapper.apply(el))); + } + + /** + * The mapping function that takes input element and outputs the OUT type element. + * + * @param type of output elements + * @param mapper the mapping function + * @return the next builder to complete the setup of the + * {@link MapElements} operator + */ + public OutputBuilder using(UnaryFunctionEnv mapper) { return new OutputBuilder<>(name, input, mapper); } } @@ -73,9 +92,9 @@ public OutputBuilder using(UnaryFunction mapper) { public static class OutputBuilder implements Builders.Output { private final String name; private final Dataset input; - private final UnaryFunction mapper; + private final UnaryFunctionEnv mapper; - OutputBuilder(String name, Dataset input, UnaryFunction mapper) { + OutputBuilder(String name, Dataset input, UnaryFunctionEnv mapper) { this.name = name; this.input = input; this.mapper = mapper; @@ -119,9 +138,19 @@ public static OfBuilder named(String name) { return new OfBuilder(name); } - final UnaryFunction mapper; + final UnaryFunctionEnv mapper; + + MapElements(String name, + Flow flow, + Dataset input, + UnaryFunction mapper) { + this(name, flow, input, (el, ctx) -> mapper.apply(el)); + } - MapElements(String name, Flow flow, Dataset input, UnaryFunction mapper) { + MapElements(String name, + Flow flow, + Dataset input, + UnaryFunctionEnv mapper) { super(name, flow, input); this.mapper = mapper; } @@ -136,6 +165,41 @@ public static OfBuilder named(String name) { return DAG.of( // do not use the client API here, because it modifies the Flow! new FlatMap(getName(), getFlow(), input, - (i, c) -> c.collect(mapper.apply(i)), null)); + (i, c) -> c.collect(mapper.apply(i, new CollectorAdapter(c))), null)); + } + + public UnaryFunctionEnv getMapper() { + return mapper; + } + + /** + * Adapts Collector to be used as Context in UnaryFunctionEnv. + */ + private static class CollectorAdapter implements Context { + private final Collector collector; + + public CollectorAdapter(Collector collector) { + this.collector = collector; + } + + @Override + public Object getWindow() { + return collector.getWindow(); + } + + @Override + public Counter getCounter(String name) { + return collector.getCounter(name); + } + + @Override + public Histogram getHistogram(String name) { + return collector.getHistogram(name); + } + + @Override + public Timer getTimer(String name) { + return collector.getTimer(name); + } } } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java index d60e20a9d7b29..b1efa64138f06 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java @@ -27,7 +27,7 @@ import cz.seznam.euphoria.core.client.functional.ReduceFunctor; import cz.seznam.euphoria.core.client.functional.UnaryFunction; import cz.seznam.euphoria.core.client.graph.DAG; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.operator.state.ListStorage; import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor; import cz.seznam.euphoria.core.client.operator.state.State; @@ -116,7 +116,7 @@ interface ReduceBy { */ default DatasetBuilder4 reduceBy( ReduceFunction reducer) { - return reduceBy((Iterable in, Context ctx) -> { + return reduceBy((Iterable in, Collector ctx) -> { ctx.collect(reducer.apply(in)); }); } @@ -344,7 +344,7 @@ public static OfBuilder named(String name) { @Nullable Windowing windowing, ReduceFunctor reducer, Partitioning partitioning) { - + super(name, flow, input, keyExtractor, windowing, partitioning); this.reducer = reducer; this.valueExtractor = valueExtractor; @@ -389,7 +389,7 @@ public boolean isCombinable() { } @Override - public void apply(Iterable elem, Context context) { + public void apply(Iterable elem, Collector context) { context.collect(reducer1.apply(elem)); } }; @@ -408,7 +408,7 @@ static final class Factory implements StateFactory> { @Override public State createState( - StorageProvider storageProvider, Context context) { + StorageProvider storageProvider, Collector context) { return new CombiningReduceState<>(storageProvider, r); } } @@ -442,7 +442,7 @@ public void add(E element) { } @Override - public void flush(Context context) { + public void flush(Collector context) { context.collect(storage.get()); } @@ -470,7 +470,7 @@ static final class Factory @Override public NonCombiningReduceState - createState(StorageProvider storageProvider, Context context) { + createState(StorageProvider storageProvider, Collector context) { return new NonCombiningReduceState<>(storageProvider, r); } } @@ -497,7 +497,7 @@ public void add(IN element) { } @Override - public void flush(Context ctx) { + public void flush(Collector ctx) { reducer.apply(reducibleValues.get(), ctx); } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java index fcfeab4605aa8..249a07c92dfc1 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceWindow.java @@ -28,7 +28,7 @@ import cz.seznam.euphoria.core.client.functional.ReduceFunctor; import cz.seznam.euphoria.core.client.functional.UnaryFunction; import cz.seznam.euphoria.core.client.graph.DAG; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.util.Pair; import javax.annotation.Nullable; @@ -101,7 +101,7 @@ public ReduceBuilder( } public OutputBuilder reduceBy( ReduceFunction reducer) { - return reduceBy((Iterable in, Context ctx) -> { + return reduceBy((Iterable in, Collector ctx) -> { ctx.collect(reducer.apply(in)); }); } @@ -135,7 +135,7 @@ public OutputBuilder( ReduceFunction reducer) { this( name, input, valueExtractor, - (Iterable in, Context ctx) -> { + (Iterable in, Collector ctx) -> { ctx.collect(reducer.apply(in)); }); } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Sort.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Sort.java index f55a670d0fd51..eecc5f31eac5e 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Sort.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/Sort.java @@ -27,7 +27,7 @@ import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.UnaryFunction; import cz.seznam.euphoria.core.client.graph.DAG; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.operator.state.ListStorage; import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor; import cz.seznam.euphoria.core.client.operator.state.State; @@ -115,7 +115,7 @@ public void add(V element) { } @Override - public void flush(Context ctx) { + public void flush(Collector ctx) { List toSort = Lists.newArrayList(curr.get()); Collections.sort(toSort, cmp); toSort.forEach(ctx::collect); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java index f59d13fd65ff8..db87cf15dc231 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java @@ -24,7 +24,7 @@ import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.UnaryFunction; import cz.seznam.euphoria.core.client.graph.DAG; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.operator.state.State; import cz.seznam.euphoria.core.client.operator.state.StorageProvider; import cz.seznam.euphoria.core.client.operator.state.ValueStorage; @@ -88,7 +88,7 @@ public void add(Pair element) { } @Override - public void flush(Context> context) { + public void flush(Collector> context) { Pair c = curr.get(); if (c.getFirst() != null) { context.collect(c); @@ -327,7 +327,7 @@ public UnaryFunction getScoreExtractor() { keyExtractor, e -> Pair.of(valueFn.apply(e), scoreFn.apply(e)), windowing, - (StorageProvider storageProvider, Context> ctx) -> new MaxScored<>(storageProvider), + (StorageProvider storageProvider, Collector> ctx) -> new MaxScored<>(storageProvider), stateCombiner, partitioning); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/state/State.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/state/State.java index 9849e4e463286..0b6f86afe056f 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/state/State.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/state/State.java @@ -15,7 +15,7 @@ */ package cz.seznam.euphoria.core.client.operator.state; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; /** * A state for stateful operations. @@ -36,10 +36,10 @@ public interface State { * @param context the context to utilize for emitting output elements; * never {@code null} */ - void flush(Context context); + void flush(Collector context); /** - * Closes this state. Invoked after {@link #flush(Context)} and before + * Closes this state. Invoked after {@link #flush(Collector)} and before * this state gets disposed to allow clean-up of temporary state storage. */ void close(); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/state/StateFactory.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/state/StateFactory.java index 070f37b923562..9cf59f138a610 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/state/StateFactory.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/state/StateFactory.java @@ -16,7 +16,7 @@ package cz.seznam.euphoria.core.client.operator.state; import cz.seznam.euphoria.core.annotation.stability.Experimental; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import javax.annotation.Nullable; import java.io.Serializable; @@ -38,6 +38,6 @@ public interface StateFactory> extends Ser */ STATE createState(StorageProvider storageProvider, @Experimental("https://github.com/seznam/euphoria/issues/118") - @Nullable Context context); + @Nullable Collector context); } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/executor/Executor.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/executor/Executor.java index 9e51112c710de..9836094d4fcfa 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/executor/Executor.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/executor/Executor.java @@ -15,6 +15,8 @@ */ package cz.seznam.euphoria.core.executor; +import cz.seznam.euphoria.core.client.accumulators.AccumulatorProvider; +import cz.seznam.euphoria.core.client.accumulators.VoidAccumulatorProvider; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.operator.FlatMap; import cz.seznam.euphoria.core.client.operator.Operator; @@ -75,4 +77,13 @@ class Result {} return (Set) Sets.newHashSet( FlatMap.class, Repartition.class, ReduceStateByKey.class, Union.class); } + + /** + * Set accumulator provider that will be used to collect metrics and counters. + * When no provider is set a default instance of {@link VoidAccumulatorProvider} + * will be used. + * + * @param factory Factory to create an instance of accumulator provider. + */ + void setAccumulatorProvider(AccumulatorProvider.Factory factory); } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/executor/greduce/GroupReducer.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/executor/greduce/GroupReducer.java index ccd659fc90494..a3d0ca54f08fe 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/executor/greduce/GroupReducer.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/executor/greduce/GroupReducer.java @@ -15,13 +15,16 @@ */ package cz.seznam.euphoria.core.executor.greduce; +import cz.seznam.euphoria.core.client.accumulators.AccumulatorProvider; +import cz.seznam.euphoria.core.client.accumulators.Counter; +import cz.seznam.euphoria.core.client.accumulators.Histogram; +import cz.seznam.euphoria.core.client.accumulators.Timer; import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing; import cz.seznam.euphoria.core.client.dataset.windowing.TimedWindow; import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement; import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.functional.BinaryFunction; -import cz.seznam.euphoria.core.client.io.Context; import cz.seznam.euphoria.core.client.operator.state.ListStorage; import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor; import cz.seznam.euphoria.core.client.operator.state.MergingStorageDescriptor; @@ -85,6 +88,7 @@ public interface WindowedElementFactory { private final Collector>> collector; private final Windowing windowing; private final Trigger trigger; + private final AccumulatorProvider accumulators; // ~ temporary store for trigger states final TriggerStorage triggerStorage; @@ -99,14 +103,16 @@ public GroupReducer(StateFactory> stateFactory, Windowing windowing, Trigger trigger, Collector>> collector, + AccumulatorProvider accumulators, boolean allowEarlyEmitting) { this.stateFactory = Objects.requireNonNull(stateFactory); this.elementFactory = Objects.requireNonNull(elementFactory); this.stateCombiner = Objects.requireNonNull(stateCombiner); this.stateStorageProvider = Objects.requireNonNull(stateStorageProvider); + this.collector = Objects.requireNonNull(collector); this.windowing = Objects.requireNonNull(windowing); this.trigger = Objects.requireNonNull(trigger); - this.collector = Objects.requireNonNull(collector); + this.accumulators = Objects.requireNonNull(accumulators); this.allowEarlyEmitting = allowEarlyEmitting; this.triggerStorage = new TriggerStorage(stateStorageProvider); @@ -146,8 +152,8 @@ public void process(WindowedElement> elem) { @SuppressWarnings("unchecked") private State getStateForUpdate(WID window) { return states.computeIfAbsent(window, w -> { - ElementCollectContext col = allowEarlyEmitting - ? new ElementCollectContext(collector, w) + ElementCollector col = allowEarlyEmitting + ? new ElementCollector(collector, w) : null; return stateFactory.createState(stateStorageProvider, col); }); @@ -263,7 +269,7 @@ private void processTriggerResult( // ~ close the window State state = states.remove(window); if (state != null) { - state.flush(new ElementCollectContext(collector, window)); + state.flush(new ElementCollector(collector, window)); state.close(); } // ~ clean up trigger states @@ -271,11 +277,11 @@ private void processTriggerResult( } } - class ElementCollectContext implements Context { + class ElementCollector implements cz.seznam.euphoria.core.client.io.Collector { final Collector>> out; WID window; - ElementCollectContext(Collector>> out, WID window) { + ElementCollector(Collector>> out, WID window) { this.out = out; this.window = window; } @@ -293,6 +299,23 @@ public void collect(T elem) { public Object getWindow() { return window; } + + @Override + public Counter getCounter(String name) { + return accumulators.getCounter(name); + } + + @Override + public Histogram getHistogram(String name) { + return accumulators.getHistogram(name); + } + + @Override + public Timer getTimer(String name) { + return accumulators.getTimer(name); + } + + } class ElementTriggerContext implements TriggerContext { diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/executor/util/SingleValueContext.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/executor/util/SingleValueContext.java index 4798e4c1823df..1212c9aeb0149 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/executor/util/SingleValueContext.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/executor/util/SingleValueContext.java @@ -16,7 +16,10 @@ package cz.seznam.euphoria.core.executor.util; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.accumulators.Counter; +import cz.seznam.euphoria.core.client.accumulators.Histogram; +import cz.seznam.euphoria.core.client.accumulators.Timer; +import cz.seznam.euphoria.core.client.io.Collector; /** * A {@code Context} that holds only single value. @@ -25,7 +28,7 @@ * This context will free the value as soon as {@code getAndResetValue()} * is called. */ -public class SingleValueContext implements Context { +public class SingleValueContext implements Collector { T value; @@ -47,6 +50,26 @@ public Object getWindow() throws UnsupportedOperationException { "The window is unknown in this context"); } + @Override + public Counter getCounter(String name) { + throw new UnsupportedOperationException( + "Accumulators not supported in this context"); + } + + @Override + public Histogram getHistogram(String name) { + throw new UnsupportedOperationException( + "Accumulators not supported in this context"); + + } + + @Override + public Timer getTimer(String name) { + throw new UnsupportedOperationException( + "Accumulators not supported in this context"); + + } + /** * Retrieve and reset the stored value to null. * @return the stored value @@ -57,5 +80,5 @@ public T getAndResetValue() { return ret; } -}; +} diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/FlatMapTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/FlatMapTest.java index 97ce1af64a2de..4dcde8783679d 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/FlatMapTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/FlatMapTest.java @@ -17,7 +17,7 @@ import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.flow.Flow; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import org.junit.Test; import java.math.BigDecimal; @@ -35,7 +35,7 @@ public void testBuild() { Dataset mapped = FlatMap.named("FlatMap1") .of(dataset) - .using((String s, Context c) -> c.collect(s)) + .using((String s, Collector c) -> c.collect(s)) .output(); assertEquals(flow, mapped.getFlow()); @@ -56,7 +56,7 @@ public void testBuild_EventTimeExtractor() { Dataset mapped = FlatMap.named("FlatMap2") .of(dataset) - .using((String s, Context c) -> c.collect(null)) + .using((String s, Collector c) -> c.collect(null)) .eventTimeBy(Long::parseLong) // ~ consuming the original input elements .output(); @@ -71,13 +71,36 @@ public void testBuild_EventTimeExtractor() { assertNotNull(map.getEventTimeExtractor()); } + @Test + public void testBuild_WithCounters() { + Flow flow = Flow.create("TEST"); + Dataset dataset = Util.createMockDataset(flow, 1); + + Dataset mapped = FlatMap.named("FlatMap1") + .of(dataset) + .using((String s, Collector c) -> { + c.getCounter("my-counter").increment(); + c.collect(s); + }) + .output(); + + assertEquals(flow, mapped.getFlow()); + assertEquals(1, flow.size()); + + FlatMap map = (FlatMap) flow.operators().iterator().next(); + assertEquals(flow, map.getFlow()); + assertEquals("FlatMap1", map.getName()); + assertNotNull(map.getFunctor()); + assertEquals(mapped, map.output()); + } + @Test public void testBuild_ImplicitName() { Flow flow = Flow.create("TEST"); Dataset dataset = Util.createMockDataset(flow, 1); Dataset mapped = FlatMap.of(dataset) - .using((String s, Context c) -> c.collect(s)) + .using((String s, Collector c) -> c.collect(s)) .output(); FlatMap map = (FlatMap) flow.operators().iterator().next(); @@ -97,7 +120,7 @@ public void testOutputNumPartitionsIsUnchanged() { assertEquals(N_PARTITIONS, input.getNumPartitions()); Dataset output = FlatMap.of(input) - .using((Object o, Context c) -> c.collect(o)) + .using((Object o, Collector c) -> c.collect(o)) .output(); assertEquals(N_PARTITIONS, output.getNumPartitions()); } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/JoinTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/JoinTest.java index d71489dd09ac3..454e5f2ef2fa9 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/JoinTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/JoinTest.java @@ -20,7 +20,7 @@ import cz.seznam.euphoria.core.client.dataset.partitioning.HashPartitioning; import cz.seznam.euphoria.core.client.dataset.windowing.Time; import cz.seznam.euphoria.core.client.flow.Flow; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.util.Pair; import org.junit.Test; @@ -40,7 +40,39 @@ public void testBuild() { .of(left, right) .by(String::length, String::length) //TODO It's sad the Collector type must be explicitly stated :-( - .using((String l, String r, Context c) -> c.collect(l + r)) + .using((String l, String r, Collector c) -> c.collect(l + r)) + .output(); + + assertEquals(flow, joined.getFlow()); + assertEquals(1, flow.size()); + + Join join = (Join) flow.operators().iterator().next(); + assertEquals(flow, join.getFlow()); + assertEquals("Join1", join.getName()); + assertNotNull(join.leftKeyExtractor); + assertNotNull(join.rightKeyExtractor); + assertEquals(joined, join.output()); + assertNull(join.getWindowing()); + assertFalse(join.outer); + + // default partitioning used + assertTrue(join.getPartitioning().hasDefaultPartitioner()); + assertEquals(3, join.getPartitioning().getNumPartitions()); + } + + @Test + public void testBuild_WithCounters() { + Flow flow = Flow.create("TEST"); + Dataset left = Util.createMockDataset(flow, 2); + Dataset right = Util.createMockDataset(flow, 3); + + Dataset> joined = Join.named("Join1") + .of(left, right) + .by(String::length, String::length) + .using((String l, String r, Collector c) -> { + c.getCounter("my-counter").increment(); + c.collect(l + r); + }) .output(); assertEquals(flow, joined.getFlow()); @@ -68,7 +100,7 @@ public void testBuild_ImplicitName() { Dataset> joined = Join.of(left, right) .by(String::length, String::length) - .using((String l, String r, Context c) -> c.collect(l + r)) + .using((String l, String r, Collector c) -> c.collect(l + r)) .output(); Join join = (Join) flow.operators().iterator().next(); @@ -84,7 +116,7 @@ public void testBuild_OuterJoin() { Dataset> joined = Join.named("Join1") .of(left, right) .by(String::length, String::length) - .using((String l, String r, Context c) -> c.collect(l + r)) + .using((String l, String r, Collector c) -> c.collect(l + r)) .outer() .output(); @@ -101,7 +133,7 @@ public void testBuild_Windowing() { Dataset> joined = Join.named("Join1") .of(left, right) .by(String::length, String::length) - .using((String l, String r, Context c) -> c.collect(l + r)) + .using((String l, String r, Collector c) -> c.collect(l + r)) .windowBy(Time.of(Duration.ofHours(1))) .output(); @@ -118,7 +150,7 @@ public void testBuild_Partitioning() { Dataset> joined = Join.named("Join1") .of(left, right) .by(String::length, String::length) - .using((String l, String r, Context c) -> c.collect(l + r)) + .using((String l, String r, Collector c) -> c.collect(l + r)) .setPartitioning(new HashPartitioning<>(1)) .windowBy(Time.of(Duration.ofHours(1))) .output(); @@ -139,7 +171,7 @@ public void testBuild_Partitioner() { Dataset> joined = Join.named("Join1") .of(left, right) .by(String::length, String::length) - .using((String l, String r, Context c) -> c.collect(l + r)) + .using((String l, String r, Collector c) -> c.collect(l + r)) .windowBy(Time.of(Duration.ofHours(1))) .setPartitioner(new HashPartitioner<>()) .setNumPartitions(5) diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/MapElementsTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/MapElementsTest.java index 7373357da75bb..b768c28a3e3e5 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/MapElementsTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/MapElementsTest.java @@ -17,9 +17,11 @@ import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.flow.Flow; +import cz.seznam.euphoria.core.client.io.Context; import org.junit.Test; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; public class MapElementsTest { @@ -39,7 +41,32 @@ public void testBuild() { MapElements map = (MapElements) flow.operators().iterator().next(); assertEquals(flow, map.getFlow()); assertEquals("Map1", map.getName()); - assertNotNull(map.mapper); + assertNotNull(map.getMapper()); + assertEquals(mapped, map.output()); + } + + @Test + public void testBuild_WithCounters() { + Flow flow = Flow.create("TEST"); + Dataset dataset = Util.createMockDataset(flow, 1); + + Dataset mapped = MapElements.named("Map1") + .of(dataset) + .using((String input, Context context) -> { + // use simple counter + context.getCounter("my-counter").increment(); + + return input.toLowerCase(); + }) + .output(); + + assertEquals(flow, mapped.getFlow()); + assertEquals(1, flow.size()); + + MapElements map = (MapElements) flow.operators().iterator().next(); + assertEquals(flow, map.getFlow()); + assertEquals("Map1", map.getName()); + assertNotNull(map.getMapper()); assertEquals(mapped, map.output()); } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKeyTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKeyTest.java index 9a347d42d3e2b..7c0be5305351e 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKeyTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKeyTest.java @@ -21,7 +21,7 @@ import cz.seznam.euphoria.core.client.dataset.partitioning.HashPartitioning; import cz.seznam.euphoria.core.client.dataset.windowing.Time; import cz.seznam.euphoria.core.client.flow.Flow; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.operator.state.State; import cz.seznam.euphoria.core.client.operator.state.StorageProvider; import cz.seznam.euphoria.core.client.operator.state.ValueStorage; @@ -150,7 +150,7 @@ public void testBuild_Partitioner() { private static class WordCountState implements State { private final ValueStorage sum; - protected WordCountState(StorageProvider storageProvider, Context ctx) { + protected WordCountState(StorageProvider storageProvider, Collector ctx) { sum = storageProvider.getValueStorage( ValueStorageDescriptor.of("sum", Long.class, 0L)); } @@ -161,7 +161,7 @@ public void add(Long element) { } @Override - public void flush(Context ctx) { + public void flush(Collector ctx) { ctx.collect(sum.get()); } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/executor/FlowUnfolderTest.java b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/executor/FlowUnfolderTest.java index 6254d7ae947bb..e13ef520d1986 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/executor/FlowUnfolderTest.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/test/java/cz/seznam/euphoria/core/executor/FlowUnfolderTest.java @@ -20,7 +20,7 @@ import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.graph.DAG; import cz.seznam.euphoria.core.client.graph.Node; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.io.ListDataSink; import cz.seznam.euphoria.core.client.io.MockStreamDataSourceFactory; import cz.seznam.euphoria.core.client.io.StdoutSink; @@ -78,7 +78,7 @@ public void before() throws Exception { Dataset> output = Join.of(mapped, reduced) .by(e -> e, Pair::getFirst) - .using((Object l, Pair r, Context c) -> c.collect(r.getSecond())) + .using((Object l, Pair r, Collector c) -> c.collect(r.getSecond())) .windowBy(Time.of(Duration.ofSeconds(1))) .output(); @@ -161,7 +161,7 @@ public void testMultipleOutputsToSameSink() throws Exception { Dataset> output = Join.of(mapped, reduced) .by(e -> e, Pair::getFirst) - .using((Object l, Pair r, Context c) -> { + .using((Object l, Pair r, Collector c) -> { c.collect(r.getSecond()); }) .windowBy(Time.of(Duration.ofSeconds(1))) diff --git a/sdks/java/extensions/euphoria/euphoria-examples/src/main/java/cz/seznam/euphoria/examples/wordcount/AccessLogCount.java b/sdks/java/extensions/euphoria/euphoria-examples/src/main/java/cz/seznam/euphoria/examples/wordcount/AccessLogCount.java index 3ccb16c27ca4e..8ce85783f793f 100644 --- a/sdks/java/extensions/euphoria/euphoria-examples/src/main/java/cz/seznam/euphoria/examples/wordcount/AccessLogCount.java +++ b/sdks/java/extensions/euphoria/euphoria-examples/src/main/java/cz/seznam/euphoria/examples/wordcount/AccessLogCount.java @@ -19,7 +19,7 @@ import cz.seznam.euphoria.core.client.dataset.windowing.Time; import cz.seznam.euphoria.core.client.dataset.windowing.TimeInterval; import cz.seznam.euphoria.core.client.flow.Flow; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.io.DataSink; import cz.seznam.euphoria.core.client.io.DataSource; import cz.seznam.euphoria.core.client.io.StdoutSink; @@ -195,7 +195,7 @@ public static void main(String[] args) throws Exception { // parameter as demonstrated below. FlatMap.named("FORMAT-OUTPUT") .of(aggregated) - .using(((Pair elem, Context context) -> { + .using(((Pair elem, Collector context) -> { Date d = new Date(((TimeInterval) context.getWindow()).getStartMillis()); SimpleDateFormat sdf = new SimpleDateFormat("dd/MMM/yyyy", Locale.ENGLISH); diff --git a/sdks/java/extensions/euphoria/euphoria-examples/src/main/java/cz/seznam/euphoria/examples/wordcount/SimpleWordCount.java b/sdks/java/extensions/euphoria/euphoria-examples/src/main/java/cz/seznam/euphoria/examples/wordcount/SimpleWordCount.java index f4ffa9adb134f..235ea1a2fffc2 100644 --- a/sdks/java/extensions/euphoria/euphoria-examples/src/main/java/cz/seznam/euphoria/examples/wordcount/SimpleWordCount.java +++ b/sdks/java/extensions/euphoria/euphoria-examples/src/main/java/cz/seznam/euphoria/examples/wordcount/SimpleWordCount.java @@ -17,7 +17,7 @@ import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.flow.Flow; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.io.DataSink; import cz.seznam.euphoria.core.client.io.DataSource; import cz.seznam.euphoria.core.client.operator.FlatMap; @@ -161,7 +161,7 @@ private static Flow buildFlow(DataSource input, DataSink output, // string into individual words and emit each individually instead. Dataset words = FlatMap.named("TOKENIZER") .of(lines) - .using((String line, Context c) -> + .using((String line, Collector c) -> SPLIT_RE.splitAsStream(line).forEachOrdered(c::collect)) .output(); diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/ExecutorContext.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/ExecutorContext.java index 454b0e27b3e88..da49b293364ae 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/ExecutorContext.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/ExecutorContext.java @@ -19,6 +19,8 @@ import cz.seznam.euphoria.core.client.graph.DAG; import cz.seznam.euphoria.core.client.graph.Node; import cz.seznam.euphoria.core.client.operator.SingleInputOperator; +import cz.seznam.euphoria.core.util.Settings; +import cz.seznam.euphoria.flink.accumulators.FlinkAccumulatorFactory; import cz.seznam.euphoria.shaded.guava.com.google.common.collect.Iterables; import org.apache.flink.streaming.api.datastream.DataStream; @@ -40,13 +42,21 @@ */ public abstract class ExecutorContext { + private final FlinkAccumulatorFactory accumulatorFactory; + private final Settings settings; + private final E env; private final DAG> dag; private final Map, D> outputs; - public ExecutorContext(E env, DAG> dag) { + public ExecutorContext(E env, + DAG> dag, + FlinkAccumulatorFactory accumulatorFactory, + Settings settings) { this.env = env; this.dag = dag; + this.accumulatorFactory = accumulatorFactory; + this.settings = settings; this.outputs = new IdentityHashMap<>(); } @@ -54,6 +64,14 @@ public E getExecutionEnvironment() { return this.env; } + public FlinkAccumulatorFactory getAccumulatorFactory() { + return accumulatorFactory; + } + + public Settings getSettings() { + return settings; + } + /** * Retrieves list of Flink {@link DataStream} inputs of given operator * diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/FlinkExecutor.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/FlinkExecutor.java index 984070b3a5749..4b9593f62a093 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/FlinkExecutor.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/FlinkExecutor.java @@ -15,10 +15,13 @@ */ package cz.seznam.euphoria.flink; +import cz.seznam.euphoria.core.client.accumulators.AccumulatorProvider; +import cz.seznam.euphoria.core.client.accumulators.VoidAccumulatorProvider; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.io.DataSink; import cz.seznam.euphoria.core.executor.Executor; import cz.seznam.euphoria.core.util.Settings; +import cz.seznam.euphoria.flink.accumulators.FlinkAccumulatorFactory; import cz.seznam.euphoria.flink.batch.BatchFlowTranslator; import cz.seznam.euphoria.flink.streaming.StreamingFlowTranslator; import org.apache.flink.api.common.ExecutionConfig; @@ -51,6 +54,8 @@ public class FlinkExecutor implements Executor { private Duration autoWatermarkInterval = Duration.ofMillis(200); private Duration allowedLateness = Duration.ofMillis(0); private Duration latencyTracking = Duration.ofSeconds(2); + private FlinkAccumulatorFactory accumulatorFactory = + new FlinkAccumulatorFactory.Adapter(VoidAccumulatorProvider.getFactory()); private final Set> registeredClasses = new HashSet<>(); @Nullable private Duration checkpointInterval; @@ -90,7 +95,24 @@ public void shutdown() { LOG.info("Shutting down flink executor."); submitExecutor.shutdownNow(); } - + + @Override + public void setAccumulatorProvider(AccumulatorProvider.Factory factory) { + this.accumulatorFactory = new FlinkAccumulatorFactory.Adapter( + Objects.requireNonNull(factory)); + } + + /** + * Set accumulator provider that will be used to collect metrics and counters. + * When no provider is set a default instance of {@link VoidAccumulatorProvider} + * will be used. + * + * @param factory Factory to create an instance of accumulator provider. + */ + public void setAccumulatorProvider(FlinkAccumulatorFactory factory) { + this.accumulatorFactory = Objects.requireNonNull(factory); + } + private Executor.Result execute(Flow flow) { try { ExecutionEnvironment.Mode mode = ExecutionEnvironment.determineMode(flow); @@ -116,9 +138,9 @@ private Executor.Result execute(Flow flow) { FlowTranslator translator; if (mode == ExecutionEnvironment.Mode.BATCH) { - translator = createBatchTranslator(settings, environment); + translator = createBatchTranslator(settings, environment, accumulatorFactory); } else { - translator = createStreamTranslator(settings, environment, allowedLateness, autoWatermarkInterval); + translator = createStreamTranslator(settings, environment, accumulatorFactory, allowedLateness, autoWatermarkInterval); } List> sinks = translator.translateInto(flow); @@ -178,16 +200,20 @@ private Executor.Result execute(Flow flow) { } } - protected FlowTranslator createBatchTranslator(Settings settings, ExecutionEnvironment environment) { - return new BatchFlowTranslator(settings, environment.getBatchEnv()); + protected FlowTranslator createBatchTranslator(Settings settings, + ExecutionEnvironment environment, + FlinkAccumulatorFactory accumulatorFactory) { + return new BatchFlowTranslator(settings, environment.getBatchEnv(), accumulatorFactory); } protected FlowTranslator createStreamTranslator(Settings settings, ExecutionEnvironment environment, + FlinkAccumulatorFactory accumulatorFactory, Duration allowedLateness, Duration autoWatermarkInterval) { return new StreamingFlowTranslator( - settings, environment.getStreamEnv(), allowedLateness, autoWatermarkInterval); + settings, environment.getStreamEnv(), accumulatorFactory, + allowedLateness, autoWatermarkInterval); } /** diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/TestFlinkExecutor.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/TestFlinkExecutor.java index 601ab3f44d420..24d9752d8c58d 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/TestFlinkExecutor.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/TestFlinkExecutor.java @@ -16,6 +16,7 @@ package cz.seznam.euphoria.flink; import cz.seznam.euphoria.core.util.Settings; +import cz.seznam.euphoria.flink.accumulators.FlinkAccumulatorFactory; import cz.seznam.euphoria.flink.batch.BatchFlowTranslator; import cz.seznam.euphoria.flink.batch.BatchFlowTranslator.SplitAssignerFactory; @@ -37,10 +38,12 @@ public TestFlinkExecutor(SplitAssignerFactory splitAssignerFactory) { super(true); this.splitAssignerFactory = splitAssignerFactory; } - + @Override protected FlowTranslator createBatchTranslator(Settings settings, - ExecutionEnvironment environment) { - return new BatchFlowTranslator(settings, environment.getBatchEnv(), splitAssignerFactory); + ExecutionEnvironment environment, + FlinkAccumulatorFactory accumulatorFactory) { + return new BatchFlowTranslator(settings, environment.getBatchEnv(), + accumulatorFactory, splitAssignerFactory); } } diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/accumulators/FlinkAccumulatorFactory.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/accumulators/FlinkAccumulatorFactory.java new file mode 100644 index 0000000000000..079954ee82805 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/accumulators/FlinkAccumulatorFactory.java @@ -0,0 +1,59 @@ +/** + * Copyright 2016-2017 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.flink.accumulators; + +import cz.seznam.euphoria.core.client.accumulators.AccumulatorProvider; +import cz.seznam.euphoria.core.util.Settings; +import org.apache.flink.api.common.functions.RuntimeContext; + +import java.io.Serializable; + +/** + * Flink specific implementation of {@link AccumulatorProvider.Factory} + * accepting Flink runtime context as an argument. + *

+ * It is required this factory is thread-safe. + */ +public interface FlinkAccumulatorFactory extends Serializable { + + /** + * Creates a new instance of {@link AccumulatorProvider} + * initialized by given settings. + * + * @param settings Euphoria settings. + * @param context Flink runtime context. + * @return Instance of accumulator provider. + */ + AccumulatorProvider create(Settings settings, RuntimeContext context); + + /** + * Adapts generic euphoria accumulator factory to Flink specific + * usage. + */ + class Adapter implements FlinkAccumulatorFactory { + + private final AccumulatorProvider.Factory factory; + + public Adapter(AccumulatorProvider.Factory factory) { + this.factory = factory; + } + + @Override + public AccumulatorProvider create(Settings settings, RuntimeContext context) { + return factory.create(settings); + } + } +} diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/accumulators/FlinkNativeAccumulators.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/accumulators/FlinkNativeAccumulators.java new file mode 100644 index 0000000000000..c831d6c826313 --- /dev/null +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/accumulators/FlinkNativeAccumulators.java @@ -0,0 +1,175 @@ +/** + * Copyright 2016-2017 Seznam.cz, a.s. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package cz.seznam.euphoria.flink.accumulators; + +import cz.seznam.euphoria.core.client.accumulators.Accumulator; +import cz.seznam.euphoria.core.client.accumulators.AccumulatorProvider; +import cz.seznam.euphoria.core.client.accumulators.Counter; +import cz.seznam.euphoria.core.client.accumulators.Histogram; +import cz.seznam.euphoria.core.client.accumulators.Timer; +import cz.seznam.euphoria.core.util.Settings; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.functions.RuntimeContext; + +import java.time.Duration; +import java.util.TreeMap; + +/** + * Integrates Flink native accumulator API. + */ +public class FlinkNativeAccumulators implements AccumulatorProvider { + + private final RuntimeContext context; + + private FlinkNativeAccumulators(RuntimeContext context) { + this.context = context; + } + + @Override + public Counter getCounter(String name) { + System.out.println("Getting counter: " + name); + return getByNameAndClass(name, FlinkCounter.class); + } + + @Override + public Histogram getHistogram(String name) { + return getByNameAndClass(name, FlinkHistogram.class); + } + + @Override + public Timer getTimer(String name) { + return getByNameAndClass(name, FlinkTimer.class); + } + + @SuppressWarnings("unchecked") + private ACC getByNameAndClass(String name, + Class clz) { + Accumulator acc = (Accumulator) context.getAllAccumulators().get(name); + + if (acc == null) { + try { + System.out.println("Creating counter: " + name); + // register a new instance + acc = clz.getConstructor().newInstance(); + context.addAccumulator(name, (org.apache.flink.api.common.accumulators.Accumulator) acc); + } catch (Exception e) { + throw new RuntimeException("Exception during accumulator initialization: " + clz, e); + } + } + + if (!clz.isAssignableFrom(acc.getClass())) { + throw new IllegalStateException( + "Accumulator named '" + name + "' is type of " + acc.getClass().getSimpleName()); + } + + return (ACC) acc; + } + + public static Factory getFactory() { + return Factory.get(); + } + + // ------------------------------ + + public static class Factory implements FlinkAccumulatorFactory { + + private static final Factory INSTANCE = new Factory(); + + private Factory() {} + + @Override + public AccumulatorProvider create(Settings settings, RuntimeContext context) { + return new FlinkNativeAccumulators(context); + } + + public static Factory get() { + return INSTANCE; + } + } + + // ------------------------------ + + // Each class implements both Flink Accumulator and Euphoria Accumulator + // so it can be used in both contexts without creating multiple instances. + + public static class FlinkCounter extends LongCounter implements Counter { + + @Override + public void increment(long value) { + super.add(value); + } + + @Override + public void increment() { + super.add(1L); + } + + // It is necessary to override clone method to instantiate this + // extended class and not the parent type. + @Override + @SuppressWarnings("CloneDoesntCallSuperClone") + public LongCounter clone() { + FlinkCounter result = new FlinkCounter(); + result.merge(this); + return result; + } + } + + public static class FlinkHistogram extends org.apache.flink.api.common.accumulators.Histogram + implements Histogram { + + @Override + public void add(long value) { + super.add(Math.toIntExact(value)); + } + + @Override + public void add(long value, long times) { + for (int i = 0; i < times; i++) { + this.add(value); + } + } + + // It is necessary to override clone method to instantiate this + // extended class and not the parent type. + @Override + @SuppressWarnings("CloneDoesntCallSuperClone") + public org.apache.flink.api.common.accumulators.Accumulator> clone() { + FlinkHistogram result = new FlinkHistogram(); + result.merge(this); + return result; + } + } + + public static class FlinkTimer extends org.apache.flink.api.common.accumulators.Histogram + implements Timer { + + @Override + public void add(Duration duration) { + super.add(Math.toIntExact(duration.toMillis())); + } + + // It is necessary to override clone method to instantiate this + // extended class and not the parent type. + @Override + @SuppressWarnings("CloneDoesntCallSuperClone") + public org.apache.flink.api.common.accumulators.Accumulator> clone() { + FlinkTimer result = new FlinkTimer(); + result.merge(this); + return result; + } + } +} diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchExecutorContext.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchExecutorContext.java index 07766be87c9d2..450fc209c83bb 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchExecutorContext.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchExecutorContext.java @@ -16,15 +16,20 @@ package cz.seznam.euphoria.flink.batch; import cz.seznam.euphoria.core.client.graph.DAG; +import cz.seznam.euphoria.core.util.Settings; import cz.seznam.euphoria.flink.ExecutorContext; import cz.seznam.euphoria.flink.FlinkOperator; +import cz.seznam.euphoria.flink.accumulators.FlinkAccumulatorFactory; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; public class BatchExecutorContext extends ExecutorContext> { - public BatchExecutorContext(ExecutionEnvironment env, DAG> dag) { - super(env, dag); + public BatchExecutorContext(ExecutionEnvironment env, + DAG> dag, + FlinkAccumulatorFactory accumulatorFactory, + Settings settings) { + super(env, dag, accumulatorFactory, settings); } } diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchFlowTranslator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchFlowTranslator.java index 275f65d55c02a..79976eaf3fe92 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchFlowTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchFlowTranslator.java @@ -15,6 +15,7 @@ */ package cz.seznam.euphoria.flink.batch; +import cz.seznam.euphoria.flink.accumulators.FlinkAccumulatorFactory; import cz.seznam.euphoria.shaded.guava.com.google.common.base.Preconditions; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.UnaryPredicate; @@ -84,21 +85,30 @@ private Translation( } private final Map translations = new IdentityHashMap<>(); + + private final Settings settings; private final ExecutionEnvironment env; + private final FlinkAccumulatorFactory accumulatorFactory; - public BatchFlowTranslator(Settings settings, ExecutionEnvironment env) { - this(settings, env, DEFAULT_SPLIT_ASSIGNER_FACTORY); + public BatchFlowTranslator(Settings settings, + ExecutionEnvironment env, + FlinkAccumulatorFactory accumulatorFactory) { + this(settings, env, accumulatorFactory, DEFAULT_SPLIT_ASSIGNER_FACTORY); } - public BatchFlowTranslator(Settings settings, ExecutionEnvironment env, + public BatchFlowTranslator(Settings settings, + ExecutionEnvironment env, + FlinkAccumulatorFactory accumulatorFactory, SplitAssignerFactory splitAssignerFactory) { + this.settings = Objects.requireNonNull(settings); this.env = Objects.requireNonNull(env); + this.accumulatorFactory = Objects.requireNonNull(accumulatorFactory); // basic operators Translation.set(translations, FlowUnfolder.InputOperator.class, new InputTranslator(splitAssignerFactory)); Translation.set(translations, FlatMap.class, new FlatMapTranslator()); Translation.set(translations, Repartition.class, new RepartitionTranslator()); - Translation.set(translations, ReduceStateByKey.class, new ReduceStateByKeyTranslator(settings, env)); + Translation.set(translations, ReduceStateByKey.class, new ReduceStateByKeyTranslator()); Translation.set(translations, Union.class, new UnionTranslator()); // derived operators @@ -129,7 +139,8 @@ public List> translateInto(Flow flow) { // transform flow to acyclic graph of supported operators DAG>> dag = flowToDag(flow); - BatchExecutorContext executorContext = new BatchExecutorContext(env, (DAG) dag); + BatchExecutorContext executorContext = new BatchExecutorContext(env, (DAG) dag, + accumulatorFactory, settings); // translate each operator to proper Flink transformation dag.traverse().map(Node::get).forEach(op -> { diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchUnaryFunctorWrapper.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchUnaryFunctorWrapper.java index 139318b24cc9d..b829dcc73eeef 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchUnaryFunctorWrapper.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/BatchUnaryFunctorWrapper.java @@ -15,38 +15,51 @@ */ package cz.seznam.euphoria.flink.batch; +import cz.seznam.euphoria.core.client.accumulators.AccumulatorProvider; import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.functional.UnaryFunctor; -import cz.seznam.euphoria.core.client.io.Context; -import org.apache.flink.api.common.functions.FlatMapFunction; +import cz.seznam.euphoria.core.client.io.AbstractCollector; +import cz.seznam.euphoria.core.util.Settings; +import cz.seznam.euphoria.flink.accumulators.FlinkAccumulatorFactory; +import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.util.Collector; import java.util.Objects; public class BatchUnaryFunctorWrapper - implements FlatMapFunction, - BatchElement>, - ResultTypeQueryable> { + extends RichFlatMapFunction, + BatchElement> + implements ResultTypeQueryable> { private final UnaryFunctor f; - public BatchUnaryFunctorWrapper(UnaryFunctor f) { + private final FlinkAccumulatorFactory accumulatorFactory; + private final Settings settings; + + public BatchUnaryFunctorWrapper(UnaryFunctor f, + FlinkAccumulatorFactory accumulatorFactory, + Settings settings) { this.f = Objects.requireNonNull(f); + this.accumulatorFactory = Objects.requireNonNull(accumulatorFactory); + this.settings = Objects.requireNonNull(settings); } @Override public void flatMap(BatchElement value, - Collector> out) - throws Exception - { - f.apply(value.getElement(), new Context() { + org.apache.flink.util.Collector> out) + throws Exception { + + AccumulatorProvider accumulators = + accumulatorFactory.create(settings, getRuntimeContext()); + + f.apply(value.getElement(), new AbstractCollector(accumulators) { @Override public void collect(OUT elem) { out.collect(new BatchElement<>( value.getWindow(), value.getTimestamp(), elem)); } + @Override public Object getWindow() { return value.getWindow(); diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/FlatMapTranslator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/FlatMapTranslator.java index d5f2b93048142..d33c73f5c110d 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/FlatMapTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/FlatMapTranslator.java @@ -18,7 +18,9 @@ import cz.seznam.euphoria.core.client.functional.UnaryFunctor; import cz.seznam.euphoria.core.client.operator.ExtractEventTime; import cz.seznam.euphoria.core.client.operator.FlatMap; +import cz.seznam.euphoria.core.util.Settings; import cz.seznam.euphoria.flink.FlinkOperator; +import cz.seznam.euphoria.flink.accumulators.FlinkAccumulatorFactory; import org.apache.flink.api.java.DataSet; class FlatMapTranslator implements BatchOperatorTranslator { @@ -27,6 +29,10 @@ class FlatMapTranslator implements BatchOperatorTranslator { @SuppressWarnings("unchecked") public DataSet translate(FlinkOperator operator, BatchExecutorContext context) { + + Settings settings = context.getSettings(); + FlinkAccumulatorFactory accumulatorFactory = context.getAccumulatorFactory(); + DataSet input = context.getSingleInputStream(operator); UnaryFunctor mapper = operator.getOriginalOperator().getFunctor(); ExtractEventTime timeAssigner = operator.getOriginalOperator().getEventTimeExtractor(); @@ -40,7 +46,7 @@ public DataSet translate(FlinkOperator operator, } return input - .flatMap(new BatchUnaryFunctorWrapper(mapper)) + .flatMap(new BatchUnaryFunctorWrapper(mapper, accumulatorFactory, settings)) .returns((Class) BatchElement.class) .setParallelism(operator.getParallelism()) .name(operator.getName()); diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/ReduceStateByKeyTranslator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/ReduceStateByKeyTranslator.java index ab653f164bf85..568e8bf4bb036 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/ReduceStateByKeyTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/ReduceStateByKeyTranslator.java @@ -29,9 +29,10 @@ import cz.seznam.euphoria.core.util.Settings; import cz.seznam.euphoria.flink.FlinkOperator; import cz.seznam.euphoria.flink.Utils; +import cz.seznam.euphoria.flink.accumulators.FlinkAccumulatorFactory; import cz.seznam.euphoria.flink.functions.PartitionerWrapper; import cz.seznam.euphoria.shaded.guava.com.google.common.collect.Iterables; -import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.operators.Order; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; @@ -44,9 +45,9 @@ public class ReduceStateByKeyTranslator implements BatchOperatorTranslator { - final StorageProvider stateStorageProvider; + private StorageProvider stateStorageProvider; - public ReduceStateByKeyTranslator(Settings settings, ExecutionEnvironment env) { + private void loadConfig(Settings settings, ExecutionEnvironment env) { int maxMemoryElements = settings.getInt(CFG_LIST_STORAGE_MAX_MEMORY_ELEMS_KEY, CFG_LIST_STORAGE_MAX_MEMORY_ELEMS_DEFAULT); this.stateStorageProvider = new BatchStateStorageProvider(maxMemoryElements, env); } @@ -55,6 +56,7 @@ public ReduceStateByKeyTranslator(Settings settings, ExecutionEnvironment env) { @SuppressWarnings("unchecked") public DataSet translate(FlinkOperator operator, BatchExecutorContext context) { + loadConfig(context.getSettings(), context.getExecutionEnvironment()); int inputParallelism = Iterables.getOnlyElement(context.getInputOperators(operator)).getParallelism(); DataSet input = Iterables.getOnlyElement(context.getInputStreams(operator)); @@ -101,7 +103,8 @@ public DataSet translate(FlinkOperator operator, (KeySelector, Long>) BatchElement::getTimestamp, Long.class), Order.ASCENDING) - .reduceGroup(new RSBKReducer(origOperator, stateStorageProvider, windowing)) + .reduceGroup(new RSBKReducer(origOperator, stateStorageProvider, windowing, + context.getAccumulatorFactory(), context.getSettings())) .setParallelism(operator.getParallelism()) .name(operator.getName() + "::reduce"); @@ -121,13 +124,16 @@ public DataSet translate(FlinkOperator operator, } static class RSBKReducer - implements GroupReduceFunction, BatchElement>, - ResultTypeQueryable> { + extends RichGroupReduceFunction, BatchElement> + implements ResultTypeQueryable> { + private final StateFactory> stateFactory; private final StateMerger> stateCombiner; private final StorageProvider stateStorageProvider; private final Windowing windowing; private final Trigger trigger; + private final FlinkAccumulatorFactory accumulatorFactory; + private final Settings settings; // mapping of [Key -> GroupReducer] private transient Map activeReducers; @@ -136,13 +142,17 @@ static class RSBKReducer RSBKReducer( ReduceStateByKey operator, StorageProvider stateStorageProvider, - Windowing windowing) { + Windowing windowing, + FlinkAccumulatorFactory accumulatorFactory, + Settings settings) { this.stateFactory = operator.getStateFactory(); this.stateCombiner = operator.getStateMerger(); this.stateStorageProvider = stateStorageProvider; this.windowing = windowing; this.trigger = windowing.getTrigger(); + this.accumulatorFactory = accumulatorFactory; + this.settings = settings; } @Override @@ -164,6 +174,7 @@ public void reduce(Iterable> values, windowing, trigger, elem -> out.collect((BatchElement) elem), + accumulatorFactory.create(settings, getRuntimeContext()), false); activeReducers.put(key, reducer); } diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/FlatMapTranslator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/FlatMapTranslator.java index e83f72be205e5..eebeb32b585df 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/FlatMapTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/FlatMapTranslator.java @@ -18,7 +18,9 @@ import cz.seznam.euphoria.core.client.functional.UnaryFunctor; import cz.seznam.euphoria.core.client.operator.ExtractEventTime; import cz.seznam.euphoria.core.client.operator.FlatMap; +import cz.seznam.euphoria.core.util.Settings; import cz.seznam.euphoria.flink.FlinkOperator; +import cz.seznam.euphoria.flink.accumulators.FlinkAccumulatorFactory; import org.apache.flink.streaming.api.datastream.DataStream; class FlatMapTranslator implements StreamingOperatorTranslator { @@ -26,8 +28,10 @@ class FlatMapTranslator implements StreamingOperatorTranslator { @Override @SuppressWarnings("unchecked") public DataStream translate(FlinkOperator operator, - StreamingExecutorContext context) - { + StreamingExecutorContext context) { + Settings settings = context.getSettings(); + FlinkAccumulatorFactory accumulatorFactory = context.getAccumulatorFactory(); + DataStream input = context.getSingleInputStream(operator); UnaryFunctor mapper = operator.getOriginalOperator().getFunctor(); ExtractEventTime evtTimeFn = operator.getOriginalOperator().getEventTimeExtractor(); @@ -37,7 +41,7 @@ public DataStream translate(FlinkOperator operator, .returns((Class) StreamingElement.class); } return input - .flatMap(new StreamingUnaryFunctorWrapper(mapper)) + .flatMap(new StreamingUnaryFunctorWrapper(mapper, accumulatorFactory, settings)) .returns((Class) StreamingElement.class) .name(operator.getName()) .setParallelism(operator.getParallelism()); diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java index 20e82abef5e51..fc64bbca3f96e 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java @@ -57,7 +57,7 @@ class ReduceStateByKeyTranslator implements StreamingOperatorTranslator translate(FlinkOperator operator, StreamingExecutorContext context) { + loadSettings(context.getSettings()); + DataStream input = Iterables.getOnlyElement(context.getInputStreams(operator)); @@ -97,7 +99,8 @@ public DataStream translate(FlinkOperator operator, new StreamingElementWindowOperator( elMapper, windowing, stateFactory, stateCombiner, context.isLocalMode(), descriptorsCacheMaxSize, - allowEarlyEmitting)) + allowEarlyEmitting, + context.getAccumulatorFactory(), context.getSettings())) .setParallelism(operator.getParallelism()); } else { // assign windows @@ -113,7 +116,9 @@ public DataStream translate(FlinkOperator operator, new KeyedMultiWindowedElementWindowOperator( windowing, stateFactory, stateCombiner, context.isLocalMode(), descriptorsCacheMaxSize, - allowEarlyEmitting)) + allowEarlyEmitting, + context.getAccumulatorFactory(), + context.getSettings())) .setParallelism(operator.getParallelism()); } diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingExecutorContext.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingExecutorContext.java index 89b429b7ec9e6..a7fa679a2a7f5 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingExecutorContext.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingExecutorContext.java @@ -16,8 +16,10 @@ package cz.seznam.euphoria.flink.streaming; import cz.seznam.euphoria.core.client.graph.DAG; +import cz.seznam.euphoria.core.util.Settings; import cz.seznam.euphoria.flink.ExecutorContext; import cz.seznam.euphoria.flink.FlinkOperator; +import cz.seznam.euphoria.flink.accumulators.FlinkAccumulatorFactory; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -34,9 +36,11 @@ public class StreamingExecutorContext public StreamingExecutorContext(StreamExecutionEnvironment env, DAG> dag, + FlinkAccumulatorFactory accumulatorFactory, + Settings settings, Duration allowedLateness, boolean localMode) { - super(env, dag); + super(env, dag, accumulatorFactory, settings); this.allowedLateness = allowedLateness; this.localMode = localMode; } diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingFlowTranslator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingFlowTranslator.java index e37f1a9e707e5..d913a158a5e4f 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingFlowTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingFlowTranslator.java @@ -28,6 +28,7 @@ import cz.seznam.euphoria.core.util.Settings; import cz.seznam.euphoria.flink.FlinkOperator; import cz.seznam.euphoria.flink.FlowTranslator; +import cz.seznam.euphoria.flink.accumulators.FlinkAccumulatorFactory; import cz.seznam.euphoria.flink.streaming.io.DataSinkWrapper; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; @@ -51,22 +52,27 @@ public class StreamingFlowTranslator extends FlowTranslator { // ~ ------------------------------------------------------------------------------ + private final Settings settings; private final StreamExecutionEnvironment env; + private final FlinkAccumulatorFactory accumulatorFactory; private final Duration allowedLateness; private final Duration autoWatermarkInterval; public StreamingFlowTranslator(Settings settings, StreamExecutionEnvironment env, + FlinkAccumulatorFactory accumulatorFactory, Duration allowedLateness, Duration autoWatermarkInterval) { + this.settings = settings; this.env = Objects.requireNonNull(env); + this.accumulatorFactory = Objects.requireNonNull(accumulatorFactory); this.allowedLateness = Objects.requireNonNull(allowedLateness); this.autoWatermarkInterval = Objects.requireNonNull(autoWatermarkInterval); translators.put(FlowUnfolder.InputOperator.class, new InputTranslator()); translators.put(FlatMap.class, new FlatMapTranslator()); translators.put(Repartition.class, new RepartitionTranslator()); - translators.put(ReduceStateByKey.class, new ReduceStateByKeyTranslator(settings)); + translators.put(ReduceStateByKey.class, new ReduceStateByKeyTranslator()); translators.put(Union.class, new UnionTranslator()); } @@ -91,6 +97,8 @@ public List> translateInto(Flow flow) { StreamingExecutorContext executorContext = new StreamingExecutorContext(env, (DAG) dag, + accumulatorFactory, + settings, allowedLateness, env instanceof LocalStreamEnvironment); diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingUnaryFunctorWrapper.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingUnaryFunctorWrapper.java index 8d7331ad0d9ab..80d1bb030cdda 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingUnaryFunctorWrapper.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/StreamingUnaryFunctorWrapper.java @@ -15,37 +15,50 @@ */ package cz.seznam.euphoria.flink.streaming; +import cz.seznam.euphoria.core.client.accumulators.AccumulatorProvider; import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.functional.UnaryFunctor; -import cz.seznam.euphoria.core.client.io.Context; -import org.apache.flink.api.common.functions.FlatMapFunction; +import cz.seznam.euphoria.core.util.Settings; +import cz.seznam.euphoria.core.client.io.AbstractCollector; +import cz.seznam.euphoria.flink.accumulators.FlinkAccumulatorFactory; +import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.util.Collector; import java.util.Objects; public class StreamingUnaryFunctorWrapper - implements FlatMapFunction, - StreamingElement>, - ResultTypeQueryable> { + extends RichFlatMapFunction, + StreamingElement> + implements ResultTypeQueryable> { private final UnaryFunctor f; - public StreamingUnaryFunctorWrapper(UnaryFunctor f) { + private final FlinkAccumulatorFactory accumulatorFactory; + private final Settings settings; + + public StreamingUnaryFunctorWrapper(UnaryFunctor f, + FlinkAccumulatorFactory accumulatorFactory, + Settings settings) { this.f = Objects.requireNonNull(f); + this.accumulatorFactory = Objects.requireNonNull(accumulatorFactory); + this.settings = Objects.requireNonNull(settings); } @Override public void flatMap(StreamingElement element, - Collector> out) - throws Exception - { - f.apply(element.getElement(), new Context() { + org.apache.flink.util.Collector> out) + throws Exception { + + AccumulatorProvider accumulators = + accumulatorFactory.create(settings, getRuntimeContext()); + + f.apply(element.getElement(), new AbstractCollector(accumulators) { @Override public void collect(OUT elem) { out.collect(new StreamingElement<>(element.getWindow(), elem)); } + @Override public Object getWindow() { return element.getWindow(); diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java index e2b60925ee148..73f94c5ce1554 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java @@ -15,11 +15,12 @@ */ package cz.seznam.euphoria.flink.streaming.windowing; +import cz.seznam.euphoria.core.client.accumulators.AccumulatorProvider; import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing; import cz.seznam.euphoria.core.client.dataset.windowing.TimedWindow; import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.AbstractCollector; import cz.seznam.euphoria.core.client.operator.state.ListStorage; import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor; import cz.seznam.euphoria.core.client.operator.state.MergingStorageDescriptor; @@ -32,6 +33,8 @@ import cz.seznam.euphoria.core.client.triggers.Trigger; import cz.seznam.euphoria.core.client.triggers.TriggerContext; import cz.seznam.euphoria.core.client.util.Pair; +import cz.seznam.euphoria.core.util.Settings; +import cz.seznam.euphoria.flink.accumulators.FlinkAccumulatorFactory; import cz.seznam.euphoria.flink.storage.Descriptors; import cz.seznam.euphoria.flink.streaming.StreamingElement; import cz.seznam.euphoria.shaded.guava.com.google.common.collect.Lists; @@ -83,13 +86,16 @@ public abstract class AbstractWindowOperator // see {@link WindowedStorageProvider} private final int descriptorsCacheMaxSize; + private final FlinkAccumulatorFactory accumulatorFactory; + private final Settings settings; + private transient InternalTimerService timerService; // tracks existing windows to flush them in case of end of stream is reached private transient InternalTimerService endOfStreamTimerService; private transient TriggerContextAdapter triggerContext; - private transient OutputContext outputContext; + private transient OutputCollector outputContext; private transient WindowedStorageProvider storageProvider; private transient ListStateDescriptor> mergingWindowsDescriptor; @@ -101,7 +107,9 @@ public AbstractWindowOperator(Windowing windowing, StateMerger> stateCombiner, boolean localMode, int descriptorsCacheMaxSize, - boolean allowEarlyEmitting) { + boolean allowEarlyEmitting, + FlinkAccumulatorFactory accumulatorFactory, + Settings settings) { this.windowing = Objects.requireNonNull(windowing); this.trigger = windowing.getTrigger(); this.stateFactory = Objects.requireNonNull(stateFactory); @@ -109,6 +117,8 @@ public AbstractWindowOperator(Windowing windowing, this.localMode = localMode; this.descriptorsCacheMaxSize = descriptorsCacheMaxSize; this.allowEarlyEmitting = allowEarlyEmitting; + this.accumulatorFactory = Objects.requireNonNull(accumulatorFactory); + this.settings = Objects.requireNonNull(settings); } @Override @@ -125,7 +135,8 @@ public void open() throws Exception { this.endOfStreamTimerService = getInternalTimerService("end-of-stream-timers", windowSerializer, this); this.triggerContext = new TriggerContextAdapter(); - this.outputContext = new OutputContext(); + this.outputContext = new OutputCollector( + accumulatorFactory.create(settings, getRuntimeContext())); this.storageProvider = new WindowedStorageProvider<>( getKeyedStateBackend(), windowSerializer, descriptorsCacheMaxSize); @@ -428,13 +439,17 @@ private void setWindow(WID window) { } } - private class OutputContext implements Context { + private class OutputCollector extends AbstractCollector { private Object key; private Window window; private final StreamRecord reuse = new StreamRecord<>(null); + public OutputCollector(AccumulatorProvider accumulators) { + super(accumulators); + } + @Override @SuppressWarnings("unchecked") public void collect(Object elem) { diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/KeyedMultiWindowedElementWindowOperator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/KeyedMultiWindowedElementWindowOperator.java index f9a6eab869d55..996da601068b8 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/KeyedMultiWindowedElementWindowOperator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/KeyedMultiWindowedElementWindowOperator.java @@ -20,6 +20,8 @@ import cz.seznam.euphoria.core.client.operator.state.State; import cz.seznam.euphoria.core.client.operator.state.StateFactory; import cz.seznam.euphoria.core.client.operator.state.StateMerger; +import cz.seznam.euphoria.core.util.Settings; +import cz.seznam.euphoria.flink.accumulators.FlinkAccumulatorFactory; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; /** @@ -29,20 +31,23 @@ public class KeyedMultiWindowedElementWindowOperator extends AbstractWindowOperator, KEY, WID> { - public KeyedMultiWindowedElementWindowOperator( - Windowing windowing, - StateFactory> stateFactory, - StateMerger> stateCombiner, - boolean localMode, - int descriptorsCacheMaxSize, - boolean allowEarlyEmitting) { - super(windowing, stateFactory, stateCombiner, localMode, - descriptorsCacheMaxSize, allowEarlyEmitting); - } + public KeyedMultiWindowedElementWindowOperator( + Windowing windowing, + StateFactory> stateFactory, + StateMerger> stateCombiner, + boolean localMode, + int descriptorsCacheMaxSize, + boolean allowEarlyEmitting, + FlinkAccumulatorFactory accumulatorFactory, + Settings settings) { + super(windowing, stateFactory, stateCombiner, localMode, + descriptorsCacheMaxSize, allowEarlyEmitting, + accumulatorFactory, settings); + } - @Override - protected KeyedMultiWindowedElement - recordValue(StreamRecord> record) { - return record.getValue(); - } + @Override + protected KeyedMultiWindowedElement + recordValue(StreamRecord> record) { + return record.getValue(); + } } diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/StreamingElementWindowOperator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/StreamingElementWindowOperator.java index 302755f567759..08208fde0b052 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/StreamingElementWindowOperator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/StreamingElementWindowOperator.java @@ -20,6 +20,8 @@ import cz.seznam.euphoria.core.client.operator.state.State; import cz.seznam.euphoria.core.client.operator.state.StateFactory; import cz.seznam.euphoria.core.client.operator.state.StateMerger; +import cz.seznam.euphoria.core.util.Settings; +import cz.seznam.euphoria.flink.accumulators.FlinkAccumulatorFactory; import cz.seznam.euphoria.flink.streaming.StreamingElement; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -42,9 +44,12 @@ public StreamingElementWindowOperator( StateMerger> stateCombiner, boolean localMode, int descriptorsCacheMaxSize, - boolean allowEarlyEmitting) { + boolean allowEarlyEmitting, + FlinkAccumulatorFactory accumulatorFactory, + Settings settings) { super(windowing, stateFactory, stateCombiner, localMode, - descriptorsCacheMaxSize, allowEarlyEmitting); + descriptorsCacheMaxSize, allowEarlyEmitting, + accumulatorFactory, settings); this.windowAssigner = Objects.requireNonNull(windowAssigner); } diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/streaming/RSBKWindowingTest.java b/sdks/java/extensions/euphoria/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/streaming/RSBKWindowingTest.java index b125c62c747c2..ec2e11494e0f3 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/streaming/RSBKWindowingTest.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/streaming/RSBKWindowingTest.java @@ -20,12 +20,10 @@ import cz.seznam.euphoria.core.client.dataset.windowing.TimeInterval; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.UnaryFunctor; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.io.ListDataSink; import cz.seznam.euphoria.core.client.io.ListDataSource; -import cz.seznam.euphoria.core.client.operator.ExtractEventTime; import cz.seznam.euphoria.core.client.operator.FlatMap; -import cz.seznam.euphoria.core.client.operator.MapElements; import cz.seznam.euphoria.core.client.operator.ReduceStateByKey; import cz.seznam.euphoria.core.client.operator.state.ListStorage; import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor; @@ -60,7 +58,7 @@ public void add(VALUE element) { } @Override - public void flush(Context context) { + public void flush(Collector context) { for (VALUE value : reducableValues.get()) { context.collect(value); } @@ -106,7 +104,7 @@ public void testEventWindowing() throws Exception { ReduceStateByKey.of(f.createInput(source, Pair::getSecond)) .keyBy(Pair::getFirst) .valueBy(e -> e) - .stateFactory((StorageProvider storages, Context> ctx) -> new AccState<>(storages)) + .stateFactory((StorageProvider storages, Collector> ctx) -> new AccState<>(storages)) .mergeStatesBy(AccState::combine) .windowBy(Time.of(Duration.ofMillis(5))) .setNumPartitions(1) @@ -155,7 +153,7 @@ public void testEventWindowing_attachedWindowing() throws Exception { ReduceStateByKey.of(f.createInput(source, Pair::getSecond)) .keyBy(Pair::getFirst) .valueBy(e -> e) - .stateFactory((StorageProvider storages, Context> ctx) -> new AccState<>(storages)) + .stateFactory((StorageProvider storages, Collector> ctx) -> new AccState<>(storages)) .mergeStatesBy(AccState::combine) .windowBy(Time.of(Duration.ofMillis(5))) .setNumPartitions(1) @@ -172,7 +170,7 @@ public void testEventWindowing_attachedWindowing() throws Exception { ReduceStateByKey.of(secondStep) .keyBy(Pair::getFirst) .valueBy(e -> e) - .stateFactory((StorageProvider storages, Context> ctx) -> new AccState<>(storages)) + .stateFactory((StorageProvider storages, Collector> ctx) -> new AccState<>(storages)) .mergeStatesBy(AccState::combine) .windowBy(Time.of(Duration.ofMillis(5))) .setNumPartitions(1) diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/testkit/FlinkExecutorProvider.java b/sdks/java/extensions/euphoria/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/testkit/FlinkExecutorProvider.java index 14ab17cca1851..059d852f8b9bb 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/testkit/FlinkExecutorProvider.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/testkit/FlinkExecutorProvider.java @@ -18,6 +18,7 @@ import cz.seznam.euphoria.core.executor.Executor; import cz.seznam.euphoria.flink.FlinkExecutor; import cz.seznam.euphoria.flink.TestFlinkExecutor; +import cz.seznam.euphoria.flink.accumulators.FlinkNativeAccumulators; import cz.seznam.euphoria.operator.test.junit.ExecutorEnvironment; import cz.seznam.euphoria.operator.test.junit.ExecutorProvider; import org.apache.commons.io.FileUtils; @@ -32,6 +33,9 @@ default ExecutorEnvironment newExecutorEnvironment() throws Exception { RocksDBStateBackend backend = new RocksDBStateBackend("file://" + path); FlinkExecutor executor = new TestFlinkExecutor(ModuloInputSplitAssigner::new) .setStateBackend(backend); + // TODO should use some local test implementation of counters + // that is queryable in assertions + executor.setAccumulatorProvider(FlinkNativeAccumulators.getFactory()); return new ExecutorEnvironment() { @Override public Executor getExecutor() { diff --git a/sdks/java/extensions/euphoria/euphoria-fluent/src/test/java/cz/seznam/euphoria/fluent/FluentTest.java b/sdks/java/extensions/euphoria/euphoria-fluent/src/test/java/cz/seznam/euphoria/fluent/FluentTest.java index cbc2091a981c8..40769b6fad154 100644 --- a/sdks/java/extensions/euphoria/euphoria-fluent/src/test/java/cz/seznam/euphoria/fluent/FluentTest.java +++ b/sdks/java/extensions/euphoria/euphoria-fluent/src/test/java/cz/seznam/euphoria/fluent/FluentTest.java @@ -17,7 +17,7 @@ import cz.seznam.euphoria.core.client.dataset.windowing.Count; import cz.seznam.euphoria.core.client.functional.ReduceFunction; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.io.ListDataSink; import cz.seznam.euphoria.core.client.io.ListDataSource; import cz.seznam.euphoria.core.client.operator.ReduceByKey; @@ -50,7 +50,7 @@ public void testBasics() throws Exception { .windowBy(Count.of(3))) // ~ strip the needless key and flatten out the elements thereby // creating multiple elements in the output belonging to the same window - .flatMap((Pair> e, Context c) -> + .flatMap((Pair> e, Collector c) -> e.getSecond().stream().forEachOrdered(c::collect)) // ~ now spread the elements (belonging to the same window) over // multiple partitions diff --git a/sdks/java/extensions/euphoria/euphoria-hadoop/src/test/java/cz/seznam/euphoria/hadoop/ExerciseHadoopIO.java b/sdks/java/extensions/euphoria/euphoria-hadoop/src/test/java/cz/seznam/euphoria/hadoop/ExerciseHadoopIO.java index d0e375a0836bd..08896747500a6 100644 --- a/sdks/java/extensions/euphoria/euphoria-hadoop/src/test/java/cz/seznam/euphoria/hadoop/ExerciseHadoopIO.java +++ b/sdks/java/extensions/euphoria/euphoria-hadoop/src/test/java/cz/seznam/euphoria/hadoop/ExerciseHadoopIO.java @@ -17,7 +17,7 @@ import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.flow.Flow; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.io.StdoutSink; import cz.seznam.euphoria.core.client.operator.FlatMap; import cz.seznam.euphoria.core.client.operator.ReduceByKey; @@ -58,7 +58,7 @@ public static void main(String[] args) throws Exception { Dataset lines = flow.createInput(inputUri); Dataset> tuples = FlatMap.of(lines) - .using((String line, Context> out) -> + .using((String line, Collector> out) -> SPLIT_RE.splitAsStream(line) .map(String::trim) .filter(s -> !s.isEmpty()) diff --git a/sdks/java/extensions/euphoria/euphoria-inmem/src/main/java/cz/seznam/euphoria/inmem/InMemExecutor.java b/sdks/java/extensions/euphoria/euphoria-inmem/src/main/java/cz/seznam/euphoria/inmem/InMemExecutor.java index 02543b156fe05..75adaa459d972 100644 --- a/sdks/java/extensions/euphoria/euphoria-inmem/src/main/java/cz/seznam/euphoria/inmem/InMemExecutor.java +++ b/sdks/java/extensions/euphoria/euphoria-inmem/src/main/java/cz/seznam/euphoria/inmem/InMemExecutor.java @@ -15,6 +15,7 @@ */ package cz.seznam.euphoria.inmem; +import cz.seznam.euphoria.core.client.accumulators.AccumulatorProvider; import cz.seznam.euphoria.core.client.dataset.partitioning.Partitioning; import cz.seznam.euphoria.core.client.dataset.windowing.GlobalWindowing; import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing; @@ -335,6 +336,11 @@ public void shutdown() { executor.shutdownNow(); } + @Override + public void setAccumulatorProvider(AccumulatorProvider.Factory factory) { + // TODO accumulators + } + private Executor.Result execute(Flow flow) { // transform the given flow to DAG of basic operators DAG> dag = FlowUnfolder.unfold(flow, Executor.getBasicOps()); diff --git a/sdks/java/extensions/euphoria/euphoria-inmem/src/main/java/cz/seznam/euphoria/inmem/WindowedElementCollector.java b/sdks/java/extensions/euphoria/euphoria-inmem/src/main/java/cz/seznam/euphoria/inmem/WindowedElementCollector.java index 3c45ac2bfd475..5569bd03c19aa 100644 --- a/sdks/java/extensions/euphoria/euphoria-inmem/src/main/java/cz/seznam/euphoria/inmem/WindowedElementCollector.java +++ b/sdks/java/extensions/euphoria/euphoria-inmem/src/main/java/cz/seznam/euphoria/inmem/WindowedElementCollector.java @@ -17,18 +17,19 @@ import cz.seznam.euphoria.core.client.dataset.windowing.TimedWindow; import cz.seznam.euphoria.core.client.dataset.windowing.Window; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import java.util.Objects; import java.util.function.Supplier; -class WindowedElementCollector implements Context { - private final Collector wrap; +class WindowedElementCollector implements Collector { + private final cz.seznam.euphoria.inmem.Collector wrap; private final Supplier stampSupplier; protected Window window; - WindowedElementCollector(Collector wrap, Supplier stampSupplier) { + WindowedElementCollector(cz.seznam.euphoria.inmem.Collector wrap, + Supplier stampSupplier) { this.wrap = Objects.requireNonNull(wrap); this.stampSupplier = stampSupplier; } diff --git a/sdks/java/extensions/euphoria/euphoria-inmem/src/test/java/cz/seznam/euphoria/inmem/BasicOperatorTest.java b/sdks/java/extensions/euphoria/euphoria-inmem/src/test/java/cz/seznam/euphoria/inmem/BasicOperatorTest.java index 82d29487f281c..c5d9fe2b553ee 100644 --- a/sdks/java/extensions/euphoria/euphoria-inmem/src/test/java/cz/seznam/euphoria/inmem/BasicOperatorTest.java +++ b/sdks/java/extensions/euphoria/euphoria-inmem/src/test/java/cz/seznam/euphoria/inmem/BasicOperatorTest.java @@ -22,7 +22,7 @@ import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.UnaryFunction; import cz.seznam.euphoria.core.client.functional.UnaryFunctor; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.io.ListDataSink; import cz.seznam.euphoria.core.client.io.ListDataSource; import cz.seznam.euphoria.core.client.operator.AssignEventTime; @@ -64,7 +64,7 @@ private static UnaryFunctor> toWordCountPair() { } private static UnaryFunctor toWords(UnaryFunction f) { - return (String s, Context c) -> { + return (String s, Collector c) -> { for (String part : s.split(" ")) { c.collect(f.apply(part)); } @@ -143,7 +143,7 @@ public void testWordCountStreamWithWindowLabel() throws Exception { ListDataSink.get(1); FlatMap.of(streamOutput) - .using((Pair p, Context> c) -> { + .using((Pair p, Collector> c) -> { // ~ just access the windows testifying their accessibility c.collect(Triple.of((TimeInterval) c.getWindow(), p.getFirst(), p.getSecond())); }) @@ -198,7 +198,7 @@ public void testWordCountStreamEarlyTriggered() throws Exception { // expand it to words Dataset> words = FlatMap.of(lines) - .using((Pair p, Context> out) -> { + .using((Pair p, Collector> out) -> { for (String word : p.getFirst().split(" ")) { out.collect(Triple.of(word, 1L, p.getSecond())); } @@ -263,7 +263,7 @@ public void testWordCountStreamEarlyTriggeredInSession() throws Exception { // expand it to words Dataset> words = FlatMap.of(lines) - .using((Pair p, Context> out) -> { + .using((Pair p, Collector> out) -> { for (String word : p.getFirst().split(" ")) { out.collect(Triple.of(word, 1L, p.getSecond())); } diff --git a/sdks/java/extensions/euphoria/euphoria-inmem/src/test/java/cz/seznam/euphoria/inmem/InMemExecutorTest.java b/sdks/java/extensions/euphoria/euphoria-inmem/src/test/java/cz/seznam/euphoria/inmem/InMemExecutorTest.java index 6de035c1f1980..f01d30d70a8c8 100644 --- a/sdks/java/extensions/euphoria/euphoria-inmem/src/test/java/cz/seznam/euphoria/inmem/InMemExecutorTest.java +++ b/sdks/java/extensions/euphoria/euphoria-inmem/src/test/java/cz/seznam/euphoria/inmem/InMemExecutorTest.java @@ -24,7 +24,7 @@ import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.UnaryFunction; import cz.seznam.euphoria.core.client.functional.UnaryFunctor; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.io.ListDataSink; import cz.seznam.euphoria.core.client.io.ListDataSource; import cz.seznam.euphoria.core.client.operator.AssignEventTime; @@ -241,7 +241,7 @@ public void simpleFlatMapTest() throws InterruptedException, ExecutionException // repeat each element N N count Dataset output = FlatMap.of(ints) - .using((Integer e, Context c) -> { + .using((Integer e, Collector c) -> { for (int i = 0; i < e; i++) { c.collect(e); } @@ -273,7 +273,7 @@ public static class SortState implements State { final ListStorage data; - SortState(StorageProvider storageProvider, Context c) { + SortState(StorageProvider storageProvider, Collector c) { data = storageProvider.getListStorage( ListStorageDescriptor.of("data", Integer.class)); } @@ -285,7 +285,7 @@ public void add(Integer element) { @Override @SuppressWarnings("unchecked") - public void flush(Context context) { + public void flush(Collector context) { List toSort = Lists.newArrayList(data.get()); Collections.sort(toSort); for (Integer i : toSort) { @@ -682,7 +682,7 @@ public void testWithWatermarkAndEventTimeMixed() throws Exception { // and store it as the original input, process it further in // the same way as in `testWithWatermarkAndEventTime' input = FlatMap.of(reduced) - .using((Set grp, Context c) -> { + .using((Set grp, Collector c) -> { for (Integer i : grp) { c.collect(i); } diff --git a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/FlatMapTest.java b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/FlatMapTest.java index 433c831260f2a..7bbcdae715275 100644 --- a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/FlatMapTest.java +++ b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/FlatMapTest.java @@ -16,7 +16,7 @@ package cz.seznam.euphoria.operator.test; import cz.seznam.euphoria.core.client.dataset.Dataset; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.operator.FlatMap; import cz.seznam.euphoria.operator.test.junit.AbstractOperatorTest; import cz.seznam.euphoria.operator.test.junit.Processing; @@ -40,7 +40,7 @@ public void testExplodeOnTwoPartitions() { @Override protected Dataset getOutput(Dataset input) { return FlatMap.of(input) - .using((Integer e, Context c) -> { + .using((Integer e, Collector c) -> { for (int i = 1; i <= e; i++) { c.collect(i); } diff --git a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/JoinTest.java b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/JoinTest.java index 8649c5668f538..af66b14d524f1 100644 --- a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/JoinTest.java +++ b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/JoinTest.java @@ -21,7 +21,7 @@ import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement; import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.flow.Flow; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.operator.AssignEventTime; import cz.seznam.euphoria.core.client.operator.Join; import cz.seznam.euphoria.core.client.operator.MapElements; @@ -74,7 +74,7 @@ protected Dataset> getOutput( Dataset left, Dataset right) { return Join.of(left, right) .by(e -> e, e -> (int) (e % 10)) - .using((Integer l, Long r, Context c) -> c.collect(l + "+" + r)) + .using((Integer l, Long r, Collector c) -> c.collect(l + "+" + r)) .setPartitioner(e -> e % 2) .outer() .output(); @@ -126,7 +126,7 @@ protected Dataset> getOutput( Dataset left, Dataset right) { return Join.of(left, right) .by(e -> e, e -> (int) (e % 10)) - .using((Integer l, Long r, Context c) -> { + .using((Integer l, Long r, Collector c) -> { c.collect(l + "+" + r); }) .setPartitioner(e -> e % 2) @@ -204,7 +204,7 @@ protected Dataset> getOutput( Dataset left, Dataset right) { return Join.of(left, right) .by(e -> e, e -> (int) (e % 10)) - .using((Integer l, Long r, Context c) -> { + .using((Integer l, Long r, Collector c) -> { c.collect(l + "+" + r); }) .setNumPartitions(2) @@ -281,7 +281,7 @@ public int getNumOutputPartitions() { Dataset>> joined = Join.of(left, right) .by(p -> "", p -> "") - .using((Pair l, Pair r, Context> c) -> + .using((Pair l, Pair r, Collector> c) -> c.collect(Triple.of((TimeInterval) c.getWindow(), l.getFirst(), r.getFirst()))) .windowBy(Session.of(Duration.ofMillis(10))) .setNumPartitions(1) diff --git a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/MapElementsTest.java b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/MapElementsTest.java index cf3dcb834a785..ee5d845651bb0 100644 --- a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/MapElementsTest.java +++ b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/MapElementsTest.java @@ -16,6 +16,7 @@ package cz.seznam.euphoria.operator.test; import cz.seznam.euphoria.core.client.dataset.Dataset; +import cz.seznam.euphoria.core.client.functional.UnaryFunction; import cz.seznam.euphoria.core.client.operator.MapElements; import cz.seznam.euphoria.operator.test.junit.AbstractOperatorTest; import cz.seznam.euphoria.operator.test.junit.Processing; @@ -39,7 +40,7 @@ public void testOnTwoPartitions() { @Override protected Dataset getOutput(Dataset input) { return MapElements.of(input) - .using(String::valueOf) + .using((UnaryFunction) String::valueOf) .output(); } diff --git a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/ReduceByKeyTest.java b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/ReduceByKeyTest.java index 5735e5b1b52bc..fcc1dc7d45586 100644 --- a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/ReduceByKeyTest.java +++ b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/ReduceByKeyTest.java @@ -27,7 +27,7 @@ import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.functional.ReduceFunction; import cz.seznam.euphoria.core.client.functional.UnaryFunctor; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.operator.AssignEventTime; import cz.seznam.euphoria.core.client.operator.FlatMap; import cz.seznam.euphoria.core.client.operator.ReduceByKey; @@ -141,7 +141,7 @@ protected Partitions getInput() { return ReduceByKey.of(input) .keyBy(e -> e % 2) .valueBy(e -> e) - .reduceBy((Iterable in, Context ctx) -> { + .reduceBy((Iterable in, Collector ctx) -> { // start with seed based on window int sum = 0; for (Integer i : in) { @@ -598,7 +598,7 @@ public void validate(Partitions>> static class SumState implements State { private final ValueStorage sum; - SumState(StorageProvider storageProvider, Context context) { + SumState(StorageProvider storageProvider, Collector context) { sum = storageProvider.getValueStorage( ValueStorageDescriptor.of("sum-state", Integer.class, 0)); } @@ -609,7 +609,7 @@ public void add(Integer element) { } @Override - public void flush(Context context) { + public void flush(Collector context) { context.collect(sum.get()); } diff --git a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/ReduceStateByKeyTest.java b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/ReduceStateByKeyTest.java index 1838ffa11a5cf..4c17299f3da93 100644 --- a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/ReduceStateByKeyTest.java +++ b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/ReduceStateByKeyTest.java @@ -25,7 +25,7 @@ import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement; import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.functional.UnaryFunctor; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.operator.AssignEventTime; import cz.seznam.euphoria.core.client.operator.FlatMap; import cz.seznam.euphoria.core.client.operator.ReduceStateByKey; @@ -70,7 +70,7 @@ static class SortState implements State { final ListStorage data; - SortState(StorageProvider storageProvider, Context c) { + SortState(StorageProvider storageProvider, Collector c) { this.data = storageProvider.getListStorage( ListStorageDescriptor.of("data", Integer.class)); } @@ -81,7 +81,7 @@ public void add(Integer element) { } @Override - public void flush(Context context) { + public void flush(Collector context) { List list = Lists.newArrayList(data.get()); Collections.sort(list); for (Integer i : list) { @@ -223,7 +223,7 @@ List flatten(List> l) { private static class CountState implements State { final ValueStorage count; - CountState(StorageProvider storageProvider, Context context) { + CountState(StorageProvider storageProvider, Collector context) { this.count = storageProvider.getValueStorage( ValueStorageDescriptor.of("count-state", Long.class, 0L)); } @@ -235,7 +235,7 @@ public void add(IN element) { } @Override - public void flush(Context context) { + public void flush(Collector context) { context.collect(count.get()); } @@ -319,7 +319,7 @@ public void validate(Partitions> partitions) { private static class AccState implements State { final ListStorage vals; @SuppressWarnings("unchecked") - AccState(StorageProvider storageProvider, Context context) { + AccState(StorageProvider storageProvider, Collector context) { vals = storageProvider.getListStorage( ListStorageDescriptor.of("vals", (Class) Object.class)); } @@ -330,7 +330,7 @@ public void add(VALUE element) { } @Override - public void flush(Context context) { + public void flush(Collector context) { for (VALUE value : vals.get()) { context.collect(value); } diff --git a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/WatermarkTest.java b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/WatermarkTest.java index bf630dc20d4bb..6cb870f70de6f 100644 --- a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/WatermarkTest.java +++ b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/WatermarkTest.java @@ -18,7 +18,7 @@ import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.dataset.windowing.Time; import cz.seznam.euphoria.core.client.dataset.windowing.TimeInterval; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.operator.AssignEventTime; import cz.seznam.euphoria.core.client.operator.Join; import cz.seznam.euphoria.core.client.operator.MapElements; @@ -68,7 +68,7 @@ public int getNumOutputPartitions() { Dataset>> joined = Join.of(left, right) .by(p -> "", p -> "") - .using((Pair l, Pair r, Context> c) -> + .using((Pair l, Pair r, Collector> c) -> c.collect(Triple.of((TimeInterval) c.getWindow(), l.getFirst(), r.getFirst()))) .windowBy(Time.of(Duration.ofMillis(10))) .setNumPartitions(1) diff --git a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/WindowingTest.java b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/WindowingTest.java index e9e5306c27ff5..7d15350ed7d9d 100644 --- a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/WindowingTest.java +++ b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/WindowingTest.java @@ -22,7 +22,7 @@ import cz.seznam.euphoria.core.client.dataset.windowing.TimeInterval; import cz.seznam.euphoria.core.client.dataset.windowing.WindowedElement; import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.core.client.operator.AssignEventTime; import cz.seznam.euphoria.core.client.operator.Distinct; import cz.seznam.euphoria.core.client.operator.FlatMap; @@ -86,7 +86,7 @@ protected Dataset> getOutput(Dataset p, Context> ctx) -> { + .using((Pair p, Collector> ctx) -> { long windowEnd = ((TimeInterval) ctx.getWindow()).getEndMillis(); ctx.collect(Triple.of(Instant.ofEpochMilli(windowEnd), p.getFirst(), p.getSecond())); }) @@ -166,7 +166,7 @@ protected Dataset> getOutput(Dataset p, Context> ctx) -> { + .using((Pair p, Collector> ctx) -> { long windowEnd = ((TimeInterval) ctx.getWindow()).getEndMillis(); ctx.collect(Triple.of(Instant.ofEpochMilli(windowEnd), p.getFirst(), p.getSecond())); }) @@ -220,7 +220,7 @@ private static class DistinctState implements State { private final ValueStorage storage; - DistinctState(StorageProvider storageProvider, Context context) { + DistinctState(StorageProvider storageProvider, Collector context) { this.storage = storageProvider.getValueStorage( ValueStorageDescriptor.of("element", Object.class, null)); } @@ -231,7 +231,7 @@ public void add(Object element) { } @Override - public void flush(Context context) { + public void flush(Collector context) { context.collect(storage.get()); } @@ -402,7 +402,7 @@ private void assertTrState(TimeInterval window, TriggerContext ctx) { // extract window timestamp return FlatMap.of(pairs) - .using((Pair in, Context> out) -> { + .using((Pair in, Collector> out) -> { long windowBegin = ((TimeInterval) out.getWindow()).getStartMillis(); long windowEnd = ((TimeInterval) out.getWindow()).getEndMillis(); out.collect(Triple.of( diff --git a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/FunctionContext.java b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/FunctionCollector.java similarity index 88% rename from sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/FunctionContext.java rename to sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/FunctionCollector.java index 5ab434f2d0747..011a8248cda0a 100644 --- a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/FunctionContext.java +++ b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/FunctionCollector.java @@ -15,9 +15,9 @@ */ package cz.seznam.euphoria.spark; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; -abstract class FunctionContext implements Context { +abstract class FunctionCollector implements Collector { protected KeyedWindow window; diff --git a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/FunctionContextAsync.java b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/FunctionCollectorAsync.java similarity index 94% rename from sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/FunctionContextAsync.java rename to sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/FunctionCollectorAsync.java index 5c12f979f8a1d..f94e269b51cd9 100644 --- a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/FunctionContextAsync.java +++ b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/FunctionCollectorAsync.java @@ -15,7 +15,7 @@ */ package cz.seznam.euphoria.spark; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import cz.seznam.euphoria.shaded.guava.com.google.common.collect.AbstractIterator; import cz.seznam.euphoria.shaded.guava.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -26,10 +26,10 @@ import java.util.concurrent.TransferQueue; /** - * Implementation of {@link Context} using asynchronous {@link TransferQueue} + * Implementation of {@link Collector} using asynchronous {@link TransferQueue} * as a workaround to lower the memory footprint */ -class FunctionContextAsync extends FunctionContext { +class FunctionCollectorAsync extends FunctionCollector { private TransferQueue queue = new LinkedTransferQueue<>(); private boolean consumed = false; diff --git a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/FunctionContextMem.java b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/FunctionCollectorMem.java similarity index 87% rename from sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/FunctionContextMem.java rename to sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/FunctionCollectorMem.java index 30a7fba09caae..14f0ec6fdef8a 100644 --- a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/FunctionContextMem.java +++ b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/FunctionCollectorMem.java @@ -15,7 +15,7 @@ */ package cz.seznam.euphoria.spark; -import cz.seznam.euphoria.core.client.io.Context; +import cz.seznam.euphoria.core.client.io.Collector; import java.io.Serializable; import java.util.ArrayList; @@ -23,10 +23,10 @@ import java.util.List; /** - * Implementation of {@link Context} that holds all the + * Implementation of {@link Collector} that holds all the * data in memory. */ -class FunctionContextMem implements Context, Serializable { +class FunctionCollectorMem implements Collector, Serializable { private final List elements = new ArrayList<>(1); private Object window; diff --git a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/ReduceStateByKeyTranslator.java b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/ReduceStateByKeyTranslator.java index 411a41dda8832..b14c22565ae3c 100644 --- a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/ReduceStateByKeyTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/ReduceStateByKeyTranslator.java @@ -15,6 +15,7 @@ */ package cz.seznam.euphoria.spark; +import cz.seznam.euphoria.core.client.accumulators.VoidAccumulatorProvider; import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing; import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; @@ -179,7 +180,7 @@ public StateReducer(Windowing windowing, @SuppressWarnings("unchecked") public Iterator call(Iterator> iterator) { activeReducers = new HashMap<>(); - FunctionContextAsync>> context = new FunctionContextAsync<>(); + FunctionCollectorAsync>> context = new FunctionCollectorAsync<>(); // reduce states in separate thread context.runAsynchronously(() -> { @@ -206,6 +207,8 @@ public Iterator call(Iterator> iterato windowing, trigger, el -> context.collect((SparkElement) el), + // TODO accumulators + VoidAccumulatorProvider.getFactory().create(null), false); activeReducers.put(kw.key(), reducer); diff --git a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/SparkExecutor.java b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/SparkExecutor.java index f0af9b9af6130..a3d211a2b89c7 100644 --- a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/SparkExecutor.java +++ b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/SparkExecutor.java @@ -15,6 +15,7 @@ */ package cz.seznam.euphoria.spark; +import cz.seznam.euphoria.core.client.accumulators.AccumulatorProvider; import cz.seznam.euphoria.core.client.dataset.Dataset; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.io.DataSink; @@ -60,6 +61,11 @@ public void shutdown() { submitExecutor.shutdownNow(); } + @Override + public void setAccumulatorProvider(AccumulatorProvider.Factory factory) { + // TODO accumulators + } + private Result execute(Flow flow) { if (!isBoundedInput(flow)) { throw new UnsupportedOperationException("Spark executor doesn't support unbounded input"); diff --git a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/UnaryFunctorWrapper.java b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/UnaryFunctorWrapper.java index d229b075d6e7e..255be3292db2c 100644 --- a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/UnaryFunctorWrapper.java +++ b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/UnaryFunctorWrapper.java @@ -26,12 +26,12 @@ class UnaryFunctorWrapper implements FlatMapFunction, SparkElement> { - private final FunctionContextMem context; + private final FunctionCollectorMem context; private final UnaryFunctor functor; public UnaryFunctorWrapper(UnaryFunctor functor) { this.functor = Objects.requireNonNull(functor); - this.context = new FunctionContextMem<>(); + this.context = new FunctionCollectorMem<>(); } @Override