From fdfe9375bd29e39a5efa3b9d469dbd0e2383b866 Mon Sep 17 00:00:00 2001 From: "Novotnik, Petr" Date: Fri, 31 Mar 2017 10:27:43 +0200 Subject: [PATCH 1/6] #! Make storage descriptors static --- .../euphoria/core/client/triggers/CountTrigger.java | 10 +++++----- .../core/client/triggers/PeriodicTimeTrigger.java | 12 ++++++------ 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/triggers/CountTrigger.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/triggers/CountTrigger.java index a32637c8af9cd..fa318c4f6b487 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/triggers/CountTrigger.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/triggers/CountTrigger.java @@ -24,7 +24,7 @@ */ public class CountTrigger implements Trigger { - private final ValueStorageDescriptor countDesc = + private static final ValueStorageDescriptor COUNT_DESCR = ValueStorageDescriptor.of("count", Long.class, 0L, (x, y) -> x + y ); private final long maxCount; @@ -35,7 +35,7 @@ public CountTrigger(long maxCount) { @Override public TriggerResult onElement(long time, W window, TriggerContext ctx) { - ValueStorage count = ctx.getValueStorage(countDesc); + ValueStorage count = ctx.getValueStorage(COUNT_DESCR); count.set(count.get() + 1L); @@ -53,13 +53,13 @@ public TriggerResult onTimer(long time, W window, TriggerContext ctx) { @Override public void onClear(W window, TriggerContext ctx) { - ctx.getValueStorage(countDesc).clear(); + ctx.getValueStorage(COUNT_DESCR).clear(); } @Override public TriggerResult onMerge(W window, TriggerContext.TriggerMergeContext ctx) { - ctx.mergeStoredState(countDesc); - ValueStorage count = ctx.getValueStorage(countDesc); + ctx.mergeStoredState(COUNT_DESCR); + ValueStorage count = ctx.getValueStorage(COUNT_DESCR); if (count.get() >= maxCount) { count.clear(); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/triggers/PeriodicTimeTrigger.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/triggers/PeriodicTimeTrigger.java index 91718088e5e89..397b38e79fd0c 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/triggers/PeriodicTimeTrigger.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/triggers/PeriodicTimeTrigger.java @@ -26,7 +26,7 @@ public class PeriodicTimeTrigger implements Trigger { /** Next fire stamp (when merging the lowest timestamp is taken) */ - private final ValueStorageDescriptor fireTimeDescriptor = + private static final ValueStorageDescriptor FIRE_TIME_DESCR = ValueStorageDescriptor.of("fire-time", Long.class, Long.MAX_VALUE, Math::min); private final long interval; @@ -37,7 +37,7 @@ public PeriodicTimeTrigger(long interval) { @Override public TriggerResult onElement(long time, TimeInterval window, TriggerContext ctx) { - ValueStorage fireStamp = ctx.getValueStorage(fireTimeDescriptor); + ValueStorage fireStamp = ctx.getValueStorage(FIRE_TIME_DESCR); if (fireStamp.get() == Long.MAX_VALUE) { // register first timer aligned with window start @@ -53,7 +53,7 @@ public TriggerResult onElement(long time, TimeInterval window, TriggerContext ct @Override public TriggerResult onTimer(long time, TimeInterval window, TriggerContext ctx) { - ValueStorage fireStamp = ctx.getValueStorage(fireTimeDescriptor); + ValueStorage fireStamp = ctx.getValueStorage(FIRE_TIME_DESCR); if (fireStamp.get() == time) { long nextTimestamp = time + interval; @@ -70,16 +70,16 @@ public TriggerResult onTimer(long time, TimeInterval window, TriggerContext ctx) @Override public void onClear(TimeInterval window, TriggerContext ctx) { - ValueStorage fireStamp = ctx.getValueStorage(fireTimeDescriptor); + ValueStorage fireStamp = ctx.getValueStorage(FIRE_TIME_DESCR); ctx.deleteTimer(fireStamp.get(), window); fireStamp.clear(); } @Override public TriggerResult onMerge(TimeInterval window, TriggerContext.TriggerMergeContext ctx) { - ctx.mergeStoredState(fireTimeDescriptor); + ctx.mergeStoredState(FIRE_TIME_DESCR); // register timer according to merged state - ValueStorage fireStamp = ctx.getValueStorage(fireTimeDescriptor); + ValueStorage fireStamp = ctx.getValueStorage(FIRE_TIME_DESCR); ctx.registerTimer(fireStamp.get(), window); return TriggerResult.NOOP; From 17af93bbd34abb00ec76db2f935b366f3e03b9f2 Mon Sep 17 00:00:00 2001 From: "Novotnik, Petr" Date: Mon, 3 Apr 2017 08:45:47 +0200 Subject: [PATCH 2/6] #! [euphoria-flink] Do not cache states --- .../windowing/AbstractWindowOperator.java | 54 +++++++------------ .../windowing/WindowedStorageProvider.java | 28 +++++++--- 2 files changed, 40 insertions(+), 42 deletions(-) diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java index f67ba5ea584b3..e8767be612aaf 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java @@ -15,9 +15,7 @@ */ package cz.seznam.euphoria.flink.streaming.windowing; -import com.google.common.collect.HashBasedTable; import com.google.common.collect.Lists; -import com.google.common.collect.Table; import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing; import cz.seznam.euphoria.core.client.dataset.windowing.TimedWindow; import cz.seznam.euphoria.core.client.dataset.windowing.Window; @@ -92,9 +90,6 @@ public abstract class AbstractWindowOperator private transient TypeSerializer windowSerializer; - // cached window states by key+window - private transient Table windowStates; - public AbstractWindowOperator(Windowing windowing, StateFactory stateFactory, CombinableReduceFunction stateCombiner, @@ -124,8 +119,6 @@ public void open() throws Exception { this.storageProvider = new WindowedStorageProvider<>( getKeyedStateBackend(), windowSerializer); - this.windowStates = HashBasedTable.create(); - if (windowing instanceof MergingWindowing) { TupleSerializer> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[]{windowSerializer, windowSerializer}); @@ -197,7 +190,6 @@ public void processElement(StreamRecord record) // remove merged window states mergedStateWindows.forEach(sw -> { getWindowState(sw).close(); - removeState(sw); }); }); @@ -230,7 +222,8 @@ public void processElement(StreamRecord record) endOfStreamTimerService.registerEventTimeTimer(window, Long.MAX_VALUE); } - getWindowState(window).add(element.getValue()); + State windowState = getWindowState(window); + windowState.add(element.getValue()); // process trigger Trigger.TriggerResult triggerResult = trigger.onElement( @@ -238,7 +231,15 @@ public void processElement(StreamRecord record) window, triggerContext); - processTriggerResult(window, triggerResult, null); + // inlining: processTriggerResult(window, triggerResult, null); + if (triggerResult.isFlush()) { + windowState.flush(); + } + if (triggerResult.isPurge()) { + windowState.close(); + trigger.onClear(window, triggerContext); + } +// processTriggerResult(window, triggerResult, null); } } } @@ -295,41 +296,30 @@ private void processTriggerResult(WID window, @Nullable MergingWindowSet mergingWindowSet) { if (tr.isFlush() || tr.isPurge()) { - WID stateWindow = window; State windowState; if (windowing instanceof MergingWindowing) { Objects.requireNonNull(mergingWindowSet); - stateWindow = mergingWindowSet.getStateWindow(window); - windowState = getWindowState(stateWindow); + windowState = getWindowState(mergingWindowSet.getStateWindow(window)); } else { windowState = getWindowState(window); } - if (tr.isFlush() || tr.isPurge()) { - if (tr.isFlush()) { - windowState.flush(); - } + if (tr.isFlush()) { + windowState.flush(); + } - if (tr.isPurge()) { - windowState.close(); - trigger.onClear(window, triggerContext); - removeWindow(window, mergingWindowSet); - removeState(stateWindow); - } + if (tr.isPurge()) { + windowState.close(); + trigger.onClear(window, triggerContext); + removeWindow(window, mergingWindowSet); } } } @SuppressWarnings("unchecked") private State getWindowState(WID window) { - State windowState = windowStates.get(getCurrentKey(), window); - if (windowState == null) { - windowState = stateFactory.apply(outputContext, storageProvider); - windowStates.put((KEY) getCurrentKey(), window, windowState); - } - - return windowState; + return stateFactory.apply(outputContext, storageProvider); } private MergingWindowSet getMergingWindowSet() { @@ -357,10 +347,6 @@ private void removeWindow(WID window, @Nullable MergingWindowSet windowSet) } } - private void removeState(WID stateWindow) { - windowStates.remove(getCurrentKey(), stateWindow); - } - // ------------------- private class TriggerContextAdapter implements TriggerContext, TriggerContext.TriggerMergeContext { diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/WindowedStorageProvider.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/WindowedStorageProvider.java index a1f61cea8b108..cbd9a5ee6ef22 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/WindowedStorageProvider.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/WindowedStorageProvider.java @@ -18,6 +18,7 @@ import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.operator.state.ListStorage; import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor; +import cz.seznam.euphoria.core.client.operator.state.StorageDescriptor; import cz.seznam.euphoria.core.client.operator.state.StorageProvider; import cz.seznam.euphoria.core.client.operator.state.ValueStorage; import cz.seznam.euphoria.core.client.operator.state.ValueStorageDescriptor; @@ -26,12 +27,17 @@ import cz.seznam.euphoria.flink.storage.FlinkReducingValueStorage; import cz.seznam.euphoria.flink.storage.FlinkValueStorage; import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.state.KeyedStateBackend; +import java.util.IdentityHashMap; + /** * Storage provider using flink's state API. All states are namespaced by window. */ @@ -41,6 +47,8 @@ class WindowedStorageProvider implements StorageProvider { private final TypeSerializer windowSerializer; private Window window; + private final IdentityHashMap + storageToStateDescriptors = new IdentityHashMap<>(); public WindowedStorageProvider(KeyedStateBackend keyedStateBackend, TypeSerializer windowSerializer) { @@ -57,18 +65,20 @@ public void setWindow(Window window) { public ValueStorage getValueStorage(ValueStorageDescriptor descriptor) { try { if (descriptor instanceof ValueStorageDescriptor.MergingValueStorageDescriptor) { - ReducingStateDescriptor flinkDescriptor = Descriptors.from( - (ValueStorageDescriptor.MergingValueStorageDescriptor) descriptor); + ReducingStateDescriptor flinkDescriptor = + (ReducingStateDescriptor) storageToStateDescriptors.computeIfAbsent( + descriptor, k -> Descriptors.from((ValueStorageDescriptor.MergingValueStorageDescriptor) k)); return new FlinkReducingValueStorage<>((ReducingState) - keyedStateBackend.getPartitionedState(window, - windowSerializer, flinkDescriptor), + keyedStateBackend.getPartitionedState(window, windowSerializer, flinkDescriptor), descriptor.getDefaultValue(), window); } else { + ValueStateDescriptor flinkDescriptor = + (ValueStateDescriptor) storageToStateDescriptors.computeIfAbsent( + descriptor, k -> Descriptors.from((ValueStorageDescriptor) k)); return new FlinkValueStorage<>((ValueState) - keyedStateBackend.getPartitionedState(window, - windowSerializer, Descriptors.from(descriptor)), + keyedStateBackend.getPartitionedState(window, windowSerializer, flinkDescriptor), window); } } catch (Exception e) { @@ -80,9 +90,11 @@ public ValueStorage getValueStorage(ValueStorageDescriptor descriptor) @SuppressWarnings("unchecked") public ListStorage getListStorage(ListStorageDescriptor descriptor) { try { + ListStateDescriptor flinkDescriptor = + (ListStateDescriptor) storageToStateDescriptors.computeIfAbsent( + descriptor, k -> Descriptors.from((ListStorageDescriptor) k)); return new FlinkListStorage<>((ListState) - keyedStateBackend.getPartitionedState(window, - windowSerializer, Descriptors.from(descriptor)), + keyedStateBackend.getPartitionedState(window, windowSerializer, flinkDescriptor), window); } catch (Exception e) { throw new RuntimeException(e); From 2dd9b879e25bf874608ab238bf28032c08f1f7ac Mon Sep 17 00:00:00 2001 From: "Novotnik, Petr" Date: Mon, 3 Apr 2017 14:14:57 +0200 Subject: [PATCH 3/6] #67 [euphoria-flink] Avoid descriptors cache infinitely growing --- .../streaming/ReduceStateByKeyTranslator.java | 20 ++++++++--- .../windowing/AbstractWindowOperator.java | 9 +++-- ...yedMultiWindowedElementWindowOperator.java | 5 +-- .../StreamingElementWindowOperator.java | 5 +-- .../windowing/WindowedStorageProvider.java | 33 +++++++++++++++++-- 5 files changed, 59 insertions(+), 13 deletions(-) diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java index 498c638e78008..0b96aef34fd2a 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java @@ -47,14 +47,20 @@ class ReduceStateByKeyTranslator implements StreamingOperatorTranslator { - static String CFG_VALUE_OF_AFTER_SHUFFLE_KEY = "euphoria.flink.streaming.windowing.only.after.shuffle"; - static boolean CFG_VALUE_OF_AFTER_SHUFFLE_DEFAULT = false; + static final String CFG_VALUE_OF_AFTER_SHUFFLE_KEY = "euphoria.flink.streaming.windowing.only.after.shuffle"; + static final boolean CFG_VALUE_OF_AFTER_SHUFFLE_DEFAULT = false; - private boolean valueOfAfterShuffle = true; + static final String CFG_DESCRIPTORS_CACHE_SIZE_MAX_KEY = "euphoria.flink.streaming.descriptors.cache.max.size"; + static final int CFG_DESCRIPTORS_CACHE_MAX_SIZE_DEFAULT = 1000; + + private boolean valueOfAfterShuffle; + private int descriptorsCacheMaxSize; public ReduceStateByKeyTranslator(Settings settings) { this.valueOfAfterShuffle = settings.getBoolean(CFG_VALUE_OF_AFTER_SHUFFLE_KEY, CFG_VALUE_OF_AFTER_SHUFFLE_DEFAULT); + this.descriptorsCacheMaxSize = + settings.getInt(CFG_DESCRIPTORS_CACHE_SIZE_MAX_KEY, CFG_DESCRIPTORS_CACHE_MAX_SIZE_DEFAULT); } @Override @@ -91,7 +97,9 @@ public DataStream translate(FlinkOperator operator, if (valueOfAfterShuffle) { reduced = input.keyBy(new UnaryFunctionKeyExtractor(keyExtractor)) .transform(operator.getName(), TypeInformation.of(StreamingElement.class), - new StreamingElementWindowOperator(elMapper, windowing, stateFactory, stateCombiner, context.isLocalMode())) + new StreamingElementWindowOperator( + elMapper, windowing, stateFactory, stateCombiner, + context.isLocalMode(), descriptorsCacheMaxSize)) .setParallelism(operator.getParallelism()); } else { // assign windows @@ -104,7 +112,9 @@ public DataStream translate(FlinkOperator operator, .setParallelism(input.getParallelism()); reduced = (DataStream) windowed.keyBy(new KeyedMultiWindowedElementKeyExtractor()) .transform(operator.getName(), TypeInformation.of(StreamingElement.class), - new KeyedMultiWindowedElementWindowOperator<>(windowing, stateFactory, stateCombiner, context.isLocalMode())) + new KeyedMultiWindowedElementWindowOperator<>( + windowing, stateFactory, stateCombiner, + context.isLocalMode(), descriptorsCacheMaxSize)) .setParallelism(operator.getParallelism()); } diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java index e8767be612aaf..dbc8692c9c5ce 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java @@ -77,6 +77,9 @@ public abstract class AbstractWindowOperator /** True when executor is running in local test (mode) */ private final boolean localMode; + // see {@link WindowedStorageProvider} + private final int descriptorsCacheMaxSize; + private transient InternalTimerService timerService; // tracks existing windows to flush them in case of end of stream is reached @@ -93,12 +96,14 @@ public abstract class AbstractWindowOperator public AbstractWindowOperator(Windowing windowing, StateFactory stateFactory, CombinableReduceFunction stateCombiner, - boolean localMode) { + boolean localMode, + int descriptorsCacheMaxSize) { this.windowing = Objects.requireNonNull(windowing); this.trigger = windowing.getTrigger(); this.stateFactory = Objects.requireNonNull(stateFactory); this.stateCombiner = Objects.requireNonNull(stateCombiner); this.localMode = localMode; + this.descriptorsCacheMaxSize = descriptorsCacheMaxSize; } @Override @@ -117,7 +122,7 @@ public void open() throws Exception { this.triggerContext = new TriggerContextAdapter(); this.outputContext = new OutputContext(); this.storageProvider = new WindowedStorageProvider<>( - getKeyedStateBackend(), windowSerializer); + getKeyedStateBackend(), windowSerializer, descriptorsCacheMaxSize); if (windowing instanceof MergingWindowing) { TupleSerializer> tupleSerializer = diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/KeyedMultiWindowedElementWindowOperator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/KeyedMultiWindowedElementWindowOperator.java index ae271843ff778..2cf5e9afe21be 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/KeyedMultiWindowedElementWindowOperator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/KeyedMultiWindowedElementWindowOperator.java @@ -33,8 +33,9 @@ public KeyedMultiWindowedElementWindowOperator( Windowing windowing, StateFactory stateFactory, CombinableReduceFunction stateCombiner, - boolean localMode) { - super(windowing, stateFactory, stateCombiner, localMode); + boolean localMode, + int descriptorsCacheMaxSize) { + super(windowing, stateFactory, stateCombiner, localMode, descriptorsCacheMaxSize); } @Override diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/StreamingElementWindowOperator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/StreamingElementWindowOperator.java index e04d3f8ecd7f8..4887b4a219645 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/StreamingElementWindowOperator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/StreamingElementWindowOperator.java @@ -40,8 +40,9 @@ public StreamingElementWindowOperator( Windowing windowing, StateFactory stateFactory, CombinableReduceFunction stateCombiner, - boolean localMode) { - super(windowing, stateFactory, stateCombiner, localMode); + boolean localMode, + int descriptorsCacheMaxSize) { + super(windowing, stateFactory, stateCombiner, localMode, descriptorsCacheMaxSize); this.windowAssigner = Objects.requireNonNull(windowAssigner); } diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/WindowedStorageProvider.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/WindowedStorageProvider.java index cbd9a5ee6ef22..123a5b6cf01b7 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/WindowedStorageProvider.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/WindowedStorageProvider.java @@ -49,11 +49,27 @@ class WindowedStorageProvider implements StorageProvider { private final IdentityHashMap storageToStateDescriptors = new IdentityHashMap<>(); + private final int descriptorsCacheMaxSize; - public WindowedStorageProvider(KeyedStateBackend keyedStateBackend, - TypeSerializer windowSerializer) { + /* + * @descriptorsCacheMaxSize: + * + * The maximum size of the storage-to-state descriptor "cache". If we + * exceed this value we are either running a program with this many or + * more logical states or - rather more likely - the user's operators + * are unnecessarily re-creating descriptor objects again and again, + * which is something they want to avoid. + * + * TODO we should avoid the need for this cache in future by having user + * code asking for the storage through an already "resolved descriptor" + * such that the translation is not necessary at all when processing elements. + */ + WindowedStorageProvider(KeyedStateBackend keyedStateBackend, + TypeSerializer windowSerializer, + int descriptorsCacheMaxSize) { this.keyedStateBackend = keyedStateBackend; this.windowSerializer = windowSerializer; + this.descriptorsCacheMaxSize = descriptorsCacheMaxSize; } public void setWindow(Window window) { @@ -68,6 +84,7 @@ public ValueStorage getValueStorage(ValueStorageDescriptor descriptor) ReducingStateDescriptor flinkDescriptor = (ReducingStateDescriptor) storageToStateDescriptors.computeIfAbsent( descriptor, k -> Descriptors.from((ValueStorageDescriptor.MergingValueStorageDescriptor) k)); + validateStateDescriptorSize(); return new FlinkReducingValueStorage<>((ReducingState) keyedStateBackend.getPartitionedState(window, windowSerializer, flinkDescriptor), @@ -77,6 +94,8 @@ public ValueStorage getValueStorage(ValueStorageDescriptor descriptor) ValueStateDescriptor flinkDescriptor = (ValueStateDescriptor) storageToStateDescriptors.computeIfAbsent( descriptor, k -> Descriptors.from((ValueStorageDescriptor) k)); + validateStateDescriptorSize(); + return new FlinkValueStorage<>((ValueState) keyedStateBackend.getPartitionedState(window, windowSerializer, flinkDescriptor), window); @@ -93,6 +112,8 @@ public ListStorage getListStorage(ListStorageDescriptor descriptor) { ListStateDescriptor flinkDescriptor = (ListStateDescriptor) storageToStateDescriptors.computeIfAbsent( descriptor, k -> Descriptors.from((ListStorageDescriptor) k)); + validateStateDescriptorSize(); + return new FlinkListStorage<>((ListState) keyedStateBackend.getPartitionedState(window, windowSerializer, flinkDescriptor), window); @@ -100,4 +121,12 @@ public ListStorage getListStorage(ListStorageDescriptor descriptor) { throw new RuntimeException(e); } } + + private void validateStateDescriptorSize() { + if (storageToStateDescriptors.size() > descriptorsCacheMaxSize) { + throw new IllegalStateException( + "Too many state descriptors! Likely some of the storage descriptors" + + " are not declared as 'static final' and are generated for each element!"); + } + } } From bd404329ebf68497b4652dd5ed335de1f57d7cc9 Mon Sep 17 00:00:00 2001 From: "Novotnik, Petr" Date: Mon, 3 Apr 2017 14:38:24 +0200 Subject: [PATCH 4/6] #67 [euphoria-flink] Avoid flushing empty states due to "localMode" --- .../flink/streaming/windowing/AbstractWindowOperator.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java index dbc8692c9c5ce..a0f88727415d1 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java @@ -243,8 +243,12 @@ public void processElement(StreamRecord record) if (triggerResult.isPurge()) { windowState.close(); trigger.onClear(window, triggerContext); + + if (localMode) { + // un-register the cleanup timer for each window since we just discarded it + endOfStreamTimerService.deleteEventTimeTimer(window, Long.MAX_VALUE); + } } -// processTriggerResult(window, triggerResult, null); } } } From d307ab8c324e6fe4836ee347ab2cdb374c9c77e8 Mon Sep 17 00:00:00 2001 From: "Novotnik, Petr" Date: Mon, 3 Apr 2017 16:26:03 +0200 Subject: [PATCH 5/6] #67 [euphoria-flink] Fix window merging --- .../flink/streaming/windowing/AbstractWindowOperator.java | 1 + .../euphoria/flink/streaming/windowing/MergingWindowSet.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java index a0f88727415d1..3d1fd710bfdfa 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java @@ -328,6 +328,7 @@ private void processTriggerResult(WID window, @SuppressWarnings("unchecked") private State getWindowState(WID window) { + storageProvider.setWindow(window); return stateFactory.apply(outputContext, storageProvider); } diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/MergingWindowSet.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/MergingWindowSet.java index 09b35593142ee..4e944d93f3e8b 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/MergingWindowSet.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/MergingWindowSet.java @@ -139,7 +139,7 @@ public W addWindow(W newWindow, MergeCallback callback) throws Exception { // Compute the state windows that we are merging List mergedStateWindows = new ArrayList<>(); - for (W mergedWindow: mergedWindows) { + for (W mergedWindow : mergedWindows) { W res = this.mapping.remove(mergedWindow); if (res != null) { mergedStateWindows.add(res); From 6828c128653c7b8715f9c9f025faf7cfcf0dba9f Mon Sep 17 00:00:00 2001 From: "Novotnik, Petr" Date: Mon, 3 Apr 2017 17:00:53 +0200 Subject: [PATCH 6/6] #67 [euphoria-flink] Avoid inlining processTriggerResult --- .../windowing/AbstractWindowOperator.java | 39 ++++++++----------- .../streaming/windowing/MergingWindowSet.java | 2 +- 2 files changed, 17 insertions(+), 24 deletions(-) diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java index 3d1fd710bfdfa..f35f456f2c720 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java @@ -171,6 +171,7 @@ public void processElement(StreamRecord record) triggerContext.setWindow(mergeResult); processTriggerResult(mergeResult, + null, triggerContext.onMerge(mergedWindows), mergingWindowSet); @@ -212,9 +213,10 @@ public void processElement(StreamRecord record) WID stateWindow = mergingWindowSet.getStateWindow(currentWindow); setupEnvironment(getCurrentKey(), stateWindow); - getWindowState(stateWindow).add(element.getValue()); + State stateWindowState = getWindowState(stateWindow); + stateWindowState.add(element.getValue()); - processTriggerResult(currentWindow, triggerResult, mergingWindowSet); + processTriggerResult(currentWindow, stateWindowState, triggerResult, mergingWindowSet); } mergingWindowSet.persist(); @@ -236,19 +238,7 @@ public void processElement(StreamRecord record) window, triggerContext); - // inlining: processTriggerResult(window, triggerResult, null); - if (triggerResult.isFlush()) { - windowState.flush(); - } - if (triggerResult.isPurge()) { - windowState.close(); - trigger.onClear(window, triggerContext); - - if (localMode) { - // un-register the cleanup timer for each window since we just discarded it - endOfStreamTimerService.deleteEventTimeTimer(window, Long.MAX_VALUE); - } - } + processTriggerResult(window, windowState, triggerResult, null); } } } @@ -273,7 +263,7 @@ public void onEventTime(InternalTimer timer) throws Exception { mergingWindowSet = getMergingWindowSet(); } - processTriggerResult(window, triggerResult, mergingWindowSet); + processTriggerResult(window, null, triggerResult, mergingWindowSet); if (mergingWindowSet != null) { mergingWindowSet.persist(); @@ -301,17 +291,20 @@ public void processWatermark(Watermark mark) throws Exception { } private void processTriggerResult(WID window, + // ~ @windowState the state of `window` to + // use; if `null` the state will be fetched + @Nullable State windowState, Trigger.TriggerResult tr, @Nullable MergingWindowSet mergingWindowSet) { if (tr.isFlush() || tr.isPurge()) { - State windowState; - - if (windowing instanceof MergingWindowing) { - Objects.requireNonNull(mergingWindowSet); - windowState = getWindowState(mergingWindowSet.getStateWindow(window)); - } else { - windowState = getWindowState(window); + if (windowState == null) { + if (windowing instanceof MergingWindowing) { + Objects.requireNonNull(mergingWindowSet); + windowState = getWindowState(mergingWindowSet.getStateWindow(window)); + } else { + windowState = getWindowState(window); + } } if (tr.isFlush()) { diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/MergingWindowSet.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/MergingWindowSet.java index 4e944d93f3e8b..712f93b0d076b 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/MergingWindowSet.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/MergingWindowSet.java @@ -153,7 +153,7 @@ public W addWindow(W newWindow, MergeCallback callback) throws Exception { callback.merge(mergeResult, mergedWindows, - mapping.get(mergeResult), + mergedStateWindow, mergedStateWindows); }