diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java index b69de39bf85a3..bce501048aef9 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceByKey.java @@ -24,13 +24,13 @@ import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.CombinableReduceFunction; import cz.seznam.euphoria.core.client.functional.ReduceFunction; -import cz.seznam.euphoria.core.client.operator.state.StateFactory; import cz.seznam.euphoria.core.client.functional.UnaryFunction; import cz.seznam.euphoria.core.client.graph.DAG; import cz.seznam.euphoria.core.client.io.Context; import cz.seznam.euphoria.core.client.operator.state.ListStorage; import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor; import cz.seznam.euphoria.core.client.operator.state.State; +import cz.seznam.euphoria.core.client.operator.state.StateFactory; import cz.seznam.euphoria.core.client.operator.state.StorageProvider; import cz.seznam.euphoria.core.client.operator.state.ValueStorage; import cz.seznam.euphoria.core.client.operator.state.ValueStorageDescriptor; @@ -287,7 +287,7 @@ static class CombiningReduceState extends State implements StateSupport.MergeFrom> { - static final class Factory implements StateFactory> { + static final class Factory implements StateFactory> { private final CombinableReduceFunction r; Factory(CombinableReduceFunction r) { @@ -295,8 +295,8 @@ static final class Factory implements StateFactory> { } @Override - public State apply(Context ctx, StorageProvider storageProvider) { - return new CombiningReduceState<>(ctx, storageProvider, r); + public State createState(Context context, StorageProvider storageProvider) { + return new CombiningReduceState<>(context, storageProvider, r); } } @@ -344,22 +344,22 @@ public void mergeFrom(CombiningReduceState other) { } } - private static class NonCombiningReduceState - extends State - implements StateSupport.MergeFrom> { + private static class NonCombiningReduceState + extends State + implements StateSupport.MergeFrom> { - static final class Factory - implements StateFactory> { - private final ReduceFunction r; + static final class Factory + implements StateFactory> { + private final ReduceFunction r; - Factory(ReduceFunction r) { + Factory(ReduceFunction r) { this.r = Objects.requireNonNull(r); } @Override - public NonCombiningReduceState - apply(Context ctx, StorageProvider storageProvider) { - return new NonCombiningReduceState<>(ctx, storageProvider, r); + public NonCombiningReduceState + createState(Context context, StorageProvider storageProvider) { + return new NonCombiningReduceState<>(context, storageProvider, r); } } @@ -367,22 +367,22 @@ static final class Factory private static final ListStorageDescriptor STORAGE_DESC = ListStorageDescriptor.of("values", (Class) Object.class); - private final ReduceFunction reducer; - private final ListStorage reducibleValues; + private final ReduceFunction reducer; + private final ListStorage reducibleValues; NonCombiningReduceState(Context context, StorageProvider storageProvider, - ReduceFunction reducer) { + ReduceFunction reducer) { super(context); this.reducer = Objects.requireNonNull(reducer); @SuppressWarnings("unchecked") - ListStorage ls = storageProvider.getListStorage(STORAGE_DESC); + ListStorage ls = storageProvider.getListStorage(STORAGE_DESC); reducibleValues = ls; } @Override - public void add(VALUE element) { + public void add(IN element) { reducibleValues.add(element); } @@ -398,7 +398,7 @@ public void close() { } @Override - public void mergeFrom(NonCombiningReduceState other) { + public void mergeFrom(NonCombiningReduceState other) { this.reducibleValues.addAll(other.reducibleValues.get()); } } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java index 35c9f4e98a72a..083eaa3e8b749 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java @@ -23,9 +23,9 @@ import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.flow.Flow; import cz.seznam.euphoria.core.client.functional.CombinableReduceFunction; -import cz.seznam.euphoria.core.client.operator.state.StateFactory; import cz.seznam.euphoria.core.client.functional.UnaryFunction; import cz.seznam.euphoria.core.client.operator.state.State; +import cz.seznam.euphoria.core.client.operator.state.StateFactory; import cz.seznam.euphoria.core.client.util.Pair; import javax.annotation.Nullable; @@ -109,7 +109,7 @@ public static class DatasetBuilder3 { } public > DatasetBuilder4< IN, KEY, VALUE, OUT, STATE> stateFactory( - StateFactory stateFactory) { + StateFactory stateFactory) { return new DatasetBuilder4<>( name, input, keyExtractor, valueExtractor, stateFactory); } @@ -120,12 +120,12 @@ public static class DatasetBuilder4< private final Dataset input; private final UnaryFunction keyExtractor; private final UnaryFunction valueExtractor; - private final StateFactory stateFactory; + private final StateFactory stateFactory; DatasetBuilder4(String name, Dataset input, UnaryFunction keyExtractor, UnaryFunction valueExtractor, - StateFactory stateFactory) + StateFactory stateFactory) { this.name = Objects.requireNonNull(name); this.input = Objects.requireNonNull(input); @@ -148,14 +148,14 @@ public static class DatasetBuilder5< private final Dataset input; private final UnaryFunction keyExtractor; private final UnaryFunction valueExtractor; - private final StateFactory stateFactory; + private final StateFactory stateFactory; private final CombinableReduceFunction stateCombiner; DatasetBuilder5(String name, Dataset input, UnaryFunction keyExtractor, UnaryFunction valueExtractor, - StateFactory stateFactory, + StateFactory stateFactory, CombinableReduceFunction stateCombiner) { // initialize default partitioning according to input super(new DefaultPartitioning<>(input.getNumPartitions())); @@ -199,7 +199,7 @@ public static class DatasetBuilder6< private final Dataset input; private final UnaryFunction keyExtractor; private final UnaryFunction valueExtractor; - private final StateFactory stateFactory; + private final StateFactory stateFactory; private final CombinableReduceFunction stateCombiner; @Nullable private final Windowing windowing; @@ -210,7 +210,7 @@ public static class DatasetBuilder6< Dataset input, UnaryFunction keyExtractor, UnaryFunction valueExtractor, - StateFactory stateFactory, + StateFactory stateFactory, CombinableReduceFunction stateCombiner, @Nullable Windowing windowing, @Nullable ExtractEventTime eventTimeAssigner, @@ -253,7 +253,7 @@ public static OfBuilder named(String name) { return new OfBuilder(name); } - private final StateFactory stateFactory; + private final StateFactory stateFactory; private final UnaryFunction valueExtractor; private final CombinableReduceFunction stateCombiner; @@ -264,7 +264,7 @@ public static OfBuilder named(String name) { UnaryFunction valueExtractor, @Nullable Windowing windowing, @Nullable ExtractEventTime eventTimeAssigner, - StateFactory stateFactory, + StateFactory stateFactory, CombinableReduceFunction stateCombiner, Partitioning partitioning) { @@ -274,7 +274,7 @@ public static OfBuilder named(String name) { this.stateCombiner = stateCombiner; } - public StateFactory getStateFactory() { + public StateFactory getStateFactory() { return stateFactory; } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java index 88a474ddf048d..0514be3b11bcf 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/TopPerKey.java @@ -26,6 +26,7 @@ import cz.seznam.euphoria.core.client.graph.DAG; import cz.seznam.euphoria.core.client.io.Context; import cz.seznam.euphoria.core.client.operator.state.State; +import cz.seznam.euphoria.core.client.operator.state.StateFactory; import cz.seznam.euphoria.core.client.operator.state.StorageProvider; import cz.seznam.euphoria.core.client.operator.state.ValueStorage; import cz.seznam.euphoria.core.client.operator.state.ValueStorageDescriptor; @@ -294,13 +295,13 @@ public UnaryFunction getScoreExtractor() { MaxScored, W> reduce = new ReduceStateByKey<>(getName() + "::ReduceStateByKey", flow, input, - keyExtractor, - e -> Pair.of(valueFn.apply(e), scoreFn.apply(e)), - windowing, - eventTimeAssigner, - MaxScored::new, - stateCombiner, - partitioning); + keyExtractor, + e -> Pair.of(valueFn.apply(e), scoreFn.apply(e)), + windowing, + eventTimeAssigner, + (StateFactory, Pair, MaxScored>) MaxScored::new, + stateCombiner, + partitioning); MapElements>, Triple> format = diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/state/StateFactory.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/state/StateFactory.java index e4c770bbd29f9..51cb626ca5a75 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/state/StateFactory.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/state/StateFactory.java @@ -15,14 +15,16 @@ */ package cz.seznam.euphoria.core.client.operator.state; -import cz.seznam.euphoria.core.client.functional.BinaryFunction; import cz.seznam.euphoria.core.client.io.Context; -import cz.seznam.euphoria.core.client.operator.state.StorageProvider; + +import java.io.Serializable; /** * Factory for states. */ -public interface StateFactory - extends BinaryFunction, StorageProvider, STATE> { +@FunctionalInterface +public interface StateFactory> extends Serializable { + + STATE createState(Context context, StorageProvider storageProvider); } diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/executor/greduce/GroupReducer.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/executor/greduce/GroupReducer.java index 05c45679dd561..f587e66aad5f3 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/executor/greduce/GroupReducer.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/executor/greduce/GroupReducer.java @@ -22,12 +22,12 @@ import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.functional.BinaryFunction; import cz.seznam.euphoria.core.client.functional.CombinableReduceFunction; -import cz.seznam.euphoria.core.client.operator.state.StateFactory; import cz.seznam.euphoria.core.client.io.Context; import cz.seznam.euphoria.core.client.operator.state.ListStorage; import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor; import cz.seznam.euphoria.core.client.operator.state.MergingStorageDescriptor; import cz.seznam.euphoria.core.client.operator.state.State; +import cz.seznam.euphoria.core.client.operator.state.StateFactory; import cz.seznam.euphoria.core.client.operator.state.Storage; import cz.seznam.euphoria.core.client.operator.state.StorageDescriptor; import cz.seznam.euphoria.core.client.operator.state.StorageProvider; @@ -68,7 +68,7 @@ public interface WindowedElementFactory { WindowedElement create(W window, long timestamp, T element); } - private final StateFactory stateFactory; + private final StateFactory> stateFactory; private final WindowedElementFactory elementFactory; private final CombinableReduceFunction stateCombiner; private final StorageProvider stateStorageProvider; @@ -82,7 +82,7 @@ public interface WindowedElementFactory { final HashMap states = new HashMap<>(); KEY key; - public GroupReducer(StateFactory stateFactory, + public GroupReducer(StateFactory> stateFactory, WindowedElementFactory elementFactory, CombinableReduceFunction stateCombiner, StorageProvider stateStorageProvider, @@ -123,7 +123,7 @@ public void process(WindowedElement> elem) { { State state = states.get(window); if (state == null) { - state = stateFactory.apply( + state = stateFactory.createState( new ElementCollectContext(collector, window), stateStorageProvider); states.put(window, state); } diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/ReduceStateByKeyTranslator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/ReduceStateByKeyTranslator.java index 287bd549b4a14..35684c301535a 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/ReduceStateByKeyTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/batch/ReduceStateByKeyTranslator.java @@ -128,8 +128,8 @@ static class RSBKReducer implements GroupReduceFunction, BatchElement>, ResultTypeQueryable> { - private final StateFactory stateFactory; - private final CombinableReduceFunction stateCombiner; + private final StateFactory> stateFactory; + private final CombinableReduceFunction> stateCombiner; private final StorageProvider stateStorageProvider; private final Windowing windowing; private final Trigger trigger; @@ -152,7 +152,7 @@ static class RSBKReducer public void reduce(Iterable> values, org.apache.flink.util.Collector> out) { - GroupReducer reducer = new GroupReducer<>( + GroupReducer reducer = new GroupReducer( stateFactory, BatchElement::new, stateCombiner, diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java index 5ea33ca2d46bb..f414d9f61dbc8 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java @@ -73,8 +73,8 @@ public DataStream translate(FlinkOperator operator, ReduceStateByKey origOperator = operator.getOriginalOperator(); - StateFactory stateFactory = origOperator.getStateFactory(); - CombinableReduceFunction stateCombiner = origOperator.getStateCombiner(); + StateFactory> stateFactory = origOperator.getStateFactory(); + CombinableReduceFunction> stateCombiner = origOperator.getStateCombiner(); Windowing windowing = origOperator.getWindowing(); if (windowing == null) { diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java index a188cddb64f3b..ee0ff77976bd3 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java @@ -20,12 +20,12 @@ import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.functional.CombinableReduceFunction; -import cz.seznam.euphoria.core.client.operator.state.StateFactory; import cz.seznam.euphoria.core.client.io.Context; import cz.seznam.euphoria.core.client.operator.state.ListStorage; import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor; import cz.seznam.euphoria.core.client.operator.state.MergingStorageDescriptor; import cz.seznam.euphoria.core.client.operator.state.State; +import cz.seznam.euphoria.core.client.operator.state.StateFactory; import cz.seznam.euphoria.core.client.operator.state.StorageDescriptor; import cz.seznam.euphoria.core.client.operator.state.ValueStorage; import cz.seznam.euphoria.core.client.operator.state.ValueStorageDescriptor; @@ -65,8 +65,8 @@ public abstract class AbstractWindowOperator private final Windowing windowing; private final Trigger trigger; - private final StateFactory stateFactory; - private final CombinableReduceFunction stateCombiner; + private final StateFactory> stateFactory; + private final CombinableReduceFunction> stateCombiner; // FIXME Arguable hack that ensures all remaining opened windows @@ -94,8 +94,8 @@ public abstract class AbstractWindowOperator private transient TypeSerializer windowSerializer; public AbstractWindowOperator(Windowing windowing, - StateFactory stateFactory, - CombinableReduceFunction stateCombiner, + StateFactory> stateFactory, + CombinableReduceFunction> stateCombiner, boolean localMode, int descriptorsCacheMaxSize) { this.windowing = Objects.requireNonNull(windowing); @@ -191,7 +191,7 @@ public void processElement(StreamRecord record) List states = new ArrayList<>(); states.add(getWindowState(stateResultWindow)); mergedStateWindows.forEach(sw -> states.add(getWindowState(sw))); - stateCombiner.apply(states); + stateCombiner.apply((Iterable) states); // remove merged window states mergedStateWindows.forEach(sw -> { @@ -322,7 +322,7 @@ private void processTriggerResult(WID window, @SuppressWarnings("unchecked") private State getWindowState(WID window) { storageProvider.setWindow(window); - return stateFactory.apply(outputContext, storageProvider); + return stateFactory.createState(outputContext, storageProvider); } private MergingWindowSet getMergingWindowSet() { diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/KeyedMultiWindowedElementWindowOperator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/KeyedMultiWindowedElementWindowOperator.java index d73608f7e9291..e6dded5a58346 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/KeyedMultiWindowedElementWindowOperator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/KeyedMultiWindowedElementWindowOperator.java @@ -31,8 +31,8 @@ public class KeyedMultiWindowedElementWindowOperator public KeyedMultiWindowedElementWindowOperator( Windowing windowing, - StateFactory stateFactory, - CombinableReduceFunction stateCombiner, + StateFactory> stateFactory, + CombinableReduceFunction> stateCombiner, boolean localMode, int descriptorsCacheMaxSize) { super(windowing, stateFactory, stateCombiner, localMode, descriptorsCacheMaxSize); diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/StreamingElementWindowOperator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/StreamingElementWindowOperator.java index 83c99b39692cb..4512fc7f807c1 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/StreamingElementWindowOperator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/StreamingElementWindowOperator.java @@ -38,8 +38,8 @@ public class StreamingElementWindowOperator public StreamingElementWindowOperator( WindowAssigner windowAssigner, Windowing windowing, - StateFactory stateFactory, - CombinableReduceFunction stateCombiner, + StateFactory> stateFactory, + CombinableReduceFunction> stateCombiner, boolean localMode, int descriptorsCacheMaxSize) { super(windowing, stateFactory, stateCombiner, localMode, descriptorsCacheMaxSize); diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/streaming/RSBKWindowingTest.java b/sdks/java/extensions/euphoria/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/streaming/RSBKWindowingTest.java index a425284b3b0b4..4b4ce3e2c980e 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/streaming/RSBKWindowingTest.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/test/java/cz/seznam/euphoria/flink/streaming/RSBKWindowingTest.java @@ -111,8 +111,7 @@ public void testEventWindowing() throws Exception { ReduceStateByKey.of(f.createInput(source)) .keyBy(Pair::getFirst) .valueBy(e -> e) - .stateFactory((StateFactory, AccState>>) AccState::new) + .stateFactory((StateFactory, Pair, AccState>>) AccState::new) .combineStateBy(AccState::combine) .windowBy(Time.of(Duration.ofMillis(5)), // ~ event time @@ -163,7 +162,7 @@ public void testEventWindowing_attachedWindowing() throws Exception { ReduceStateByKey.of(f.createInput(source)) .keyBy(Pair::getFirst) .valueBy(e -> e) - .stateFactory((StateFactory, AccState>>) AccState::new) + .stateFactory((StateFactory, Pair, AccState>>) AccState::new) .combineStateBy(AccState::combine) .windowBy(Time.of(Duration.ofMillis(5)), // ~ event time @@ -178,8 +177,7 @@ public void testEventWindowing_attachedWindowing() throws Exception { ReduceStateByKey.of(secondStep) .keyBy(Pair::getFirst) .valueBy(e -> e) - .stateFactory((StateFactory, AccState>>) AccState::new) + .stateFactory((StateFactory, Pair, AccState>>) AccState::new) .combineStateBy(AccState::combine) .windowBy(Time.of(Duration.ofMillis(5)), // ~ event time diff --git a/sdks/java/extensions/euphoria/euphoria-inmem/src/main/java/cz/seznam/euphoria/inmem/ReduceStateByKeyReducer.java b/sdks/java/extensions/euphoria/euphoria-inmem/src/main/java/cz/seznam/euphoria/inmem/ReduceStateByKeyReducer.java index 936cb6a0a7575..4447e9644b9c9 100644 --- a/sdks/java/extensions/euphoria/euphoria-inmem/src/main/java/cz/seznam/euphoria/inmem/ReduceStateByKeyReducer.java +++ b/sdks/java/extensions/euphoria/euphoria-inmem/src/main/java/cz/seznam/euphoria/inmem/ReduceStateByKeyReducer.java @@ -398,7 +398,7 @@ final class ProcessingState { final Collector stateOutput; final BlockingQueue rawOutput; final TriggerScheduler triggering; - final StateFactory stateFactory; + final StateFactory> stateFactory; final CombinableReduceFunction stateCombiner; final ProcessingStats stats = new ProcessingStats(this); @@ -409,7 +409,7 @@ final class ProcessingState { private ProcessingState( BlockingQueue output, TriggerScheduler triggering, - StateFactory stateFactory, + StateFactory> stateFactory, CombinableReduceFunction stateCombiner, StorageProvider storageProvider) { @@ -484,7 +484,7 @@ State getWindowStateForUpdate(KeyedWindow kw) { State state = wRegistry.getWindowState(kw); if (state == null) { // ~ if no such window yet ... set it up - state = stateFactory.apply( + state = stateFactory.createState( new KeyedElementCollector( stateOutput, kw.window(), kw.key(), processing.triggering::getCurrentTimestamp), diff --git a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/ReduceStateByKeyTest.java b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/ReduceStateByKeyTest.java index a5f8223c9d8cc..f0ec1317cf16b 100644 --- a/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/ReduceStateByKeyTest.java +++ b/sdks/java/extensions/euphoria/euphoria-operator-testkit/src/main/java/cz/seznam/euphoria/operator/test/ReduceStateByKeyTest.java @@ -302,7 +302,7 @@ protected Partitions> getInput() { return ReduceStateByKey.of(input) .keyBy(e -> e.getFirst().charAt(0) - '0') .valueBy(Pair::getFirst) - .stateFactory((StateFactory) CountState::new) + .stateFactory((StateFactory) CountState::new) .combineStateBy(CountState::combine) // FIXME .timedBy(Pair::getSecond) and make the assertion in the validation phase stronger .windowBy(Count.of(3)) @@ -405,7 +405,7 @@ protected Partitions> getInput() { ReduceStateByKey.of(input) .keyBy(e -> e.getFirst().charAt(0) - '0') .valueBy(Pair::getFirst) - .stateFactory((StateFactory>) AccState::new) + .stateFactory((StateFactory>) AccState::new) .combineStateBy(AccState::combine) .windowBy(Time.of(Duration.ofSeconds(5)), (Pair e) -> e.getSecond() * 1_000L) @@ -466,7 +466,7 @@ protected Dataset> getOutput(Dataset e.getFirst().charAt(0) - '0') .valueBy(e -> e.getFirst().substring(2)) - .stateFactory((StateFactory>) AccState::new) + .stateFactory((StateFactory>) AccState::new) .combineStateBy(AccState::combine) .windowBy(TimeSliding.of(Duration.ofSeconds(10), Duration.ofSeconds(5)), (Pair e) -> e.getSecond() * 1_000L) @@ -545,7 +545,7 @@ protected Partitions> getInput() { ReduceStateByKey.of(input) .keyBy(e -> e.getFirst().charAt(0) - '0') .valueBy(Pair::getFirst) - .stateFactory((StateFactory>) AccState::new) + .stateFactory((StateFactory>) AccState::new) .combineStateBy(AccState::combine) .windowBy(Session.of(Duration.ofSeconds(5)), (Pair e) -> e.getSecond() * 1_000L) diff --git a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/ReduceStateByKeyTranslator.java b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/ReduceStateByKeyTranslator.java index a4457c6fc3231..b4454d897d68a 100644 --- a/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/ReduceStateByKeyTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-spark/src/main/java/cz/seznam/euphoria/spark/ReduceStateByKeyTranslator.java @@ -19,11 +19,11 @@ import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.dataset.windowing.Windowing; import cz.seznam.euphoria.core.client.functional.CombinableReduceFunction; -import cz.seznam.euphoria.core.client.operator.state.StateFactory; import cz.seznam.euphoria.core.client.functional.UnaryFunction; import cz.seznam.euphoria.core.client.operator.ExtractEventTime; import cz.seznam.euphoria.core.client.operator.ReduceStateByKey; import cz.seznam.euphoria.core.client.operator.state.State; +import cz.seznam.euphoria.core.client.operator.state.StateFactory; import cz.seznam.euphoria.core.client.triggers.Trigger; import cz.seznam.euphoria.core.client.util.Pair; import cz.seznam.euphoria.core.executor.greduce.GroupReducer; @@ -53,8 +53,8 @@ public JavaRDD translate(ReduceStateByKey operator, final JavaRDD input = (JavaRDD) context.getSingleInput(operator); - StateFactory stateFactory = operator.getStateFactory(); - CombinableReduceFunction stateCombiner = operator.getStateCombiner(); + StateFactory> stateFactory = operator.getStateFactory(); + CombinableReduceFunction> stateCombiner = operator.getStateCombiner(); final UnaryFunction keyExtractor = operator.getKeyExtractor(); final UnaryFunction valueExtractor = operator.getValueExtractor(); @@ -155,16 +155,16 @@ private static class StateReducer private final Windowing windowing; private final Trigger trigger; - private final StateFactory stateFactory; - private final CombinableReduceFunction stateCombiner; + private final StateFactory> stateFactory; + private final CombinableReduceFunction> stateCombiner; private final SparkStorageProvider storageProvider; // mapping of [Key -> GroupReducer] private transient Map activeReducers; public StateReducer(Windowing windowing, - StateFactory stateFactory, - CombinableReduceFunction stateCombiner) { + StateFactory> stateFactory, + CombinableReduceFunction> stateCombiner) { this.windowing = windowing; this.trigger = windowing.getTrigger(); this.stateFactory = stateFactory; @@ -195,7 +195,7 @@ public Iterator call(Iterator> iterato GroupReducer reducer = activeReducers.get(kw.key()); if (reducer == null) { - reducer = new GroupReducer<>(stateFactory, + reducer = new GroupReducer(stateFactory, SparkElement::new, stateCombiner, storageProvider,