diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/triggers/CountTrigger.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/triggers/CountTrigger.java index a32637c8af9cd..fa318c4f6b487 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/triggers/CountTrigger.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/triggers/CountTrigger.java @@ -24,7 +24,7 @@ */ public class CountTrigger implements Trigger { - private final ValueStorageDescriptor countDesc = + private static final ValueStorageDescriptor COUNT_DESCR = ValueStorageDescriptor.of("count", Long.class, 0L, (x, y) -> x + y ); private final long maxCount; @@ -35,7 +35,7 @@ public CountTrigger(long maxCount) { @Override public TriggerResult onElement(long time, W window, TriggerContext ctx) { - ValueStorage count = ctx.getValueStorage(countDesc); + ValueStorage count = ctx.getValueStorage(COUNT_DESCR); count.set(count.get() + 1L); @@ -53,13 +53,13 @@ public TriggerResult onTimer(long time, W window, TriggerContext ctx) { @Override public void onClear(W window, TriggerContext ctx) { - ctx.getValueStorage(countDesc).clear(); + ctx.getValueStorage(COUNT_DESCR).clear(); } @Override public TriggerResult onMerge(W window, TriggerContext.TriggerMergeContext ctx) { - ctx.mergeStoredState(countDesc); - ValueStorage count = ctx.getValueStorage(countDesc); + ctx.mergeStoredState(COUNT_DESCR); + ValueStorage count = ctx.getValueStorage(COUNT_DESCR); if (count.get() >= maxCount) { count.clear(); diff --git a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/triggers/PeriodicTimeTrigger.java b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/triggers/PeriodicTimeTrigger.java index 91718088e5e89..397b38e79fd0c 100644 --- a/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/triggers/PeriodicTimeTrigger.java +++ b/sdks/java/extensions/euphoria/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/triggers/PeriodicTimeTrigger.java @@ -26,7 +26,7 @@ public class PeriodicTimeTrigger implements Trigger { /** Next fire stamp (when merging the lowest timestamp is taken) */ - private final ValueStorageDescriptor fireTimeDescriptor = + private static final ValueStorageDescriptor FIRE_TIME_DESCR = ValueStorageDescriptor.of("fire-time", Long.class, Long.MAX_VALUE, Math::min); private final long interval; @@ -37,7 +37,7 @@ public PeriodicTimeTrigger(long interval) { @Override public TriggerResult onElement(long time, TimeInterval window, TriggerContext ctx) { - ValueStorage fireStamp = ctx.getValueStorage(fireTimeDescriptor); + ValueStorage fireStamp = ctx.getValueStorage(FIRE_TIME_DESCR); if (fireStamp.get() == Long.MAX_VALUE) { // register first timer aligned with window start @@ -53,7 +53,7 @@ public TriggerResult onElement(long time, TimeInterval window, TriggerContext ct @Override public TriggerResult onTimer(long time, TimeInterval window, TriggerContext ctx) { - ValueStorage fireStamp = ctx.getValueStorage(fireTimeDescriptor); + ValueStorage fireStamp = ctx.getValueStorage(FIRE_TIME_DESCR); if (fireStamp.get() == time) { long nextTimestamp = time + interval; @@ -70,16 +70,16 @@ public TriggerResult onTimer(long time, TimeInterval window, TriggerContext ctx) @Override public void onClear(TimeInterval window, TriggerContext ctx) { - ValueStorage fireStamp = ctx.getValueStorage(fireTimeDescriptor); + ValueStorage fireStamp = ctx.getValueStorage(FIRE_TIME_DESCR); ctx.deleteTimer(fireStamp.get(), window); fireStamp.clear(); } @Override public TriggerResult onMerge(TimeInterval window, TriggerContext.TriggerMergeContext ctx) { - ctx.mergeStoredState(fireTimeDescriptor); + ctx.mergeStoredState(FIRE_TIME_DESCR); // register timer according to merged state - ValueStorage fireStamp = ctx.getValueStorage(fireTimeDescriptor); + ValueStorage fireStamp = ctx.getValueStorage(FIRE_TIME_DESCR); ctx.registerTimer(fireStamp.get(), window); return TriggerResult.NOOP; diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java index 498c638e78008..0b96aef34fd2a 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.java @@ -47,14 +47,20 @@ class ReduceStateByKeyTranslator implements StreamingOperatorTranslator { - static String CFG_VALUE_OF_AFTER_SHUFFLE_KEY = "euphoria.flink.streaming.windowing.only.after.shuffle"; - static boolean CFG_VALUE_OF_AFTER_SHUFFLE_DEFAULT = false; + static final String CFG_VALUE_OF_AFTER_SHUFFLE_KEY = "euphoria.flink.streaming.windowing.only.after.shuffle"; + static final boolean CFG_VALUE_OF_AFTER_SHUFFLE_DEFAULT = false; - private boolean valueOfAfterShuffle = true; + static final String CFG_DESCRIPTORS_CACHE_SIZE_MAX_KEY = "euphoria.flink.streaming.descriptors.cache.max.size"; + static final int CFG_DESCRIPTORS_CACHE_MAX_SIZE_DEFAULT = 1000; + + private boolean valueOfAfterShuffle; + private int descriptorsCacheMaxSize; public ReduceStateByKeyTranslator(Settings settings) { this.valueOfAfterShuffle = settings.getBoolean(CFG_VALUE_OF_AFTER_SHUFFLE_KEY, CFG_VALUE_OF_AFTER_SHUFFLE_DEFAULT); + this.descriptorsCacheMaxSize = + settings.getInt(CFG_DESCRIPTORS_CACHE_SIZE_MAX_KEY, CFG_DESCRIPTORS_CACHE_MAX_SIZE_DEFAULT); } @Override @@ -91,7 +97,9 @@ public DataStream translate(FlinkOperator operator, if (valueOfAfterShuffle) { reduced = input.keyBy(new UnaryFunctionKeyExtractor(keyExtractor)) .transform(operator.getName(), TypeInformation.of(StreamingElement.class), - new StreamingElementWindowOperator(elMapper, windowing, stateFactory, stateCombiner, context.isLocalMode())) + new StreamingElementWindowOperator( + elMapper, windowing, stateFactory, stateCombiner, + context.isLocalMode(), descriptorsCacheMaxSize)) .setParallelism(operator.getParallelism()); } else { // assign windows @@ -104,7 +112,9 @@ public DataStream translate(FlinkOperator operator, .setParallelism(input.getParallelism()); reduced = (DataStream) windowed.keyBy(new KeyedMultiWindowedElementKeyExtractor()) .transform(operator.getName(), TypeInformation.of(StreamingElement.class), - new KeyedMultiWindowedElementWindowOperator<>(windowing, stateFactory, stateCombiner, context.isLocalMode())) + new KeyedMultiWindowedElementWindowOperator<>( + windowing, stateFactory, stateCombiner, + context.isLocalMode(), descriptorsCacheMaxSize)) .setParallelism(operator.getParallelism()); } diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java index f67ba5ea584b3..f35f456f2c720 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.java @@ -15,9 +15,7 @@ */ package cz.seznam.euphoria.flink.streaming.windowing; -import com.google.common.collect.HashBasedTable; import com.google.common.collect.Lists; -import com.google.common.collect.Table; import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing; import cz.seznam.euphoria.core.client.dataset.windowing.TimedWindow; import cz.seznam.euphoria.core.client.dataset.windowing.Window; @@ -79,6 +77,9 @@ public abstract class AbstractWindowOperator /** True when executor is running in local test (mode) */ private final boolean localMode; + // see {@link WindowedStorageProvider} + private final int descriptorsCacheMaxSize; + private transient InternalTimerService timerService; // tracks existing windows to flush them in case of end of stream is reached @@ -92,18 +93,17 @@ public abstract class AbstractWindowOperator private transient TypeSerializer windowSerializer; - // cached window states by key+window - private transient Table windowStates; - public AbstractWindowOperator(Windowing windowing, StateFactory stateFactory, CombinableReduceFunction stateCombiner, - boolean localMode) { + boolean localMode, + int descriptorsCacheMaxSize) { this.windowing = Objects.requireNonNull(windowing); this.trigger = windowing.getTrigger(); this.stateFactory = Objects.requireNonNull(stateFactory); this.stateCombiner = Objects.requireNonNull(stateCombiner); this.localMode = localMode; + this.descriptorsCacheMaxSize = descriptorsCacheMaxSize; } @Override @@ -122,9 +122,7 @@ public void open() throws Exception { this.triggerContext = new TriggerContextAdapter(); this.outputContext = new OutputContext(); this.storageProvider = new WindowedStorageProvider<>( - getKeyedStateBackend(), windowSerializer); - - this.windowStates = HashBasedTable.create(); + getKeyedStateBackend(), windowSerializer, descriptorsCacheMaxSize); if (windowing instanceof MergingWindowing) { TupleSerializer> tupleSerializer = @@ -173,6 +171,7 @@ public void processElement(StreamRecord record) triggerContext.setWindow(mergeResult); processTriggerResult(mergeResult, + null, triggerContext.onMerge(mergedWindows), mergingWindowSet); @@ -197,7 +196,6 @@ public void processElement(StreamRecord record) // remove merged window states mergedStateWindows.forEach(sw -> { getWindowState(sw).close(); - removeState(sw); }); }); @@ -215,9 +213,10 @@ public void processElement(StreamRecord record) WID stateWindow = mergingWindowSet.getStateWindow(currentWindow); setupEnvironment(getCurrentKey(), stateWindow); - getWindowState(stateWindow).add(element.getValue()); + State stateWindowState = getWindowState(stateWindow); + stateWindowState.add(element.getValue()); - processTriggerResult(currentWindow, triggerResult, mergingWindowSet); + processTriggerResult(currentWindow, stateWindowState, triggerResult, mergingWindowSet); } mergingWindowSet.persist(); @@ -230,7 +229,8 @@ public void processElement(StreamRecord record) endOfStreamTimerService.registerEventTimeTimer(window, Long.MAX_VALUE); } - getWindowState(window).add(element.getValue()); + State windowState = getWindowState(window); + windowState.add(element.getValue()); // process trigger Trigger.TriggerResult triggerResult = trigger.onElement( @@ -238,7 +238,7 @@ public void processElement(StreamRecord record) window, triggerContext); - processTriggerResult(window, triggerResult, null); + processTriggerResult(window, windowState, triggerResult, null); } } } @@ -263,7 +263,7 @@ public void onEventTime(InternalTimer timer) throws Exception { mergingWindowSet = getMergingWindowSet(); } - processTriggerResult(window, triggerResult, mergingWindowSet); + processTriggerResult(window, null, triggerResult, mergingWindowSet); if (mergingWindowSet != null) { mergingWindowSet.persist(); @@ -291,45 +291,38 @@ public void processWatermark(Watermark mark) throws Exception { } private void processTriggerResult(WID window, + // ~ @windowState the state of `window` to + // use; if `null` the state will be fetched + @Nullable State windowState, Trigger.TriggerResult tr, @Nullable MergingWindowSet mergingWindowSet) { if (tr.isFlush() || tr.isPurge()) { - WID stateWindow = window; - State windowState; - - if (windowing instanceof MergingWindowing) { - Objects.requireNonNull(mergingWindowSet); - stateWindow = mergingWindowSet.getStateWindow(window); - windowState = getWindowState(stateWindow); - } else { - windowState = getWindowState(window); + if (windowState == null) { + if (windowing instanceof MergingWindowing) { + Objects.requireNonNull(mergingWindowSet); + windowState = getWindowState(mergingWindowSet.getStateWindow(window)); + } else { + windowState = getWindowState(window); + } } - if (tr.isFlush() || tr.isPurge()) { - if (tr.isFlush()) { - windowState.flush(); - } + if (tr.isFlush()) { + windowState.flush(); + } - if (tr.isPurge()) { - windowState.close(); - trigger.onClear(window, triggerContext); - removeWindow(window, mergingWindowSet); - removeState(stateWindow); - } + if (tr.isPurge()) { + windowState.close(); + trigger.onClear(window, triggerContext); + removeWindow(window, mergingWindowSet); } } } @SuppressWarnings("unchecked") private State getWindowState(WID window) { - State windowState = windowStates.get(getCurrentKey(), window); - if (windowState == null) { - windowState = stateFactory.apply(outputContext, storageProvider); - windowStates.put((KEY) getCurrentKey(), window, windowState); - } - - return windowState; + storageProvider.setWindow(window); + return stateFactory.apply(outputContext, storageProvider); } private MergingWindowSet getMergingWindowSet() { @@ -357,10 +350,6 @@ private void removeWindow(WID window, @Nullable MergingWindowSet windowSet) } } - private void removeState(WID stateWindow) { - windowStates.remove(getCurrentKey(), stateWindow); - } - // ------------------- private class TriggerContextAdapter implements TriggerContext, TriggerContext.TriggerMergeContext { diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/KeyedMultiWindowedElementWindowOperator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/KeyedMultiWindowedElementWindowOperator.java index ae271843ff778..2cf5e9afe21be 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/KeyedMultiWindowedElementWindowOperator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/KeyedMultiWindowedElementWindowOperator.java @@ -33,8 +33,9 @@ public KeyedMultiWindowedElementWindowOperator( Windowing windowing, StateFactory stateFactory, CombinableReduceFunction stateCombiner, - boolean localMode) { - super(windowing, stateFactory, stateCombiner, localMode); + boolean localMode, + int descriptorsCacheMaxSize) { + super(windowing, stateFactory, stateCombiner, localMode, descriptorsCacheMaxSize); } @Override diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/MergingWindowSet.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/MergingWindowSet.java index 09b35593142ee..712f93b0d076b 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/MergingWindowSet.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/MergingWindowSet.java @@ -139,7 +139,7 @@ public W addWindow(W newWindow, MergeCallback callback) throws Exception { // Compute the state windows that we are merging List mergedStateWindows = new ArrayList<>(); - for (W mergedWindow: mergedWindows) { + for (W mergedWindow : mergedWindows) { W res = this.mapping.remove(mergedWindow); if (res != null) { mergedStateWindows.add(res); @@ -153,7 +153,7 @@ public W addWindow(W newWindow, MergeCallback callback) throws Exception { callback.merge(mergeResult, mergedWindows, - mapping.get(mergeResult), + mergedStateWindow, mergedStateWindows); } diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/StreamingElementWindowOperator.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/StreamingElementWindowOperator.java index e04d3f8ecd7f8..4887b4a219645 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/StreamingElementWindowOperator.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/StreamingElementWindowOperator.java @@ -40,8 +40,9 @@ public StreamingElementWindowOperator( Windowing windowing, StateFactory stateFactory, CombinableReduceFunction stateCombiner, - boolean localMode) { - super(windowing, stateFactory, stateCombiner, localMode); + boolean localMode, + int descriptorsCacheMaxSize) { + super(windowing, stateFactory, stateCombiner, localMode, descriptorsCacheMaxSize); this.windowAssigner = Objects.requireNonNull(windowAssigner); } diff --git a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/WindowedStorageProvider.java b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/WindowedStorageProvider.java index a1f61cea8b108..123a5b6cf01b7 100644 --- a/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/WindowedStorageProvider.java +++ b/sdks/java/extensions/euphoria/euphoria-flink/src/main/java/cz/seznam/euphoria/flink/streaming/windowing/WindowedStorageProvider.java @@ -18,6 +18,7 @@ import cz.seznam.euphoria.core.client.dataset.windowing.Window; import cz.seznam.euphoria.core.client.operator.state.ListStorage; import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor; +import cz.seznam.euphoria.core.client.operator.state.StorageDescriptor; import cz.seznam.euphoria.core.client.operator.state.StorageProvider; import cz.seznam.euphoria.core.client.operator.state.ValueStorage; import cz.seznam.euphoria.core.client.operator.state.ValueStorageDescriptor; @@ -26,12 +27,17 @@ import cz.seznam.euphoria.flink.storage.FlinkReducingValueStorage; import cz.seznam.euphoria.flink.storage.FlinkValueStorage; import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.state.KeyedStateBackend; +import java.util.IdentityHashMap; + /** * Storage provider using flink's state API. All states are namespaced by window. */ @@ -41,11 +47,29 @@ class WindowedStorageProvider implements StorageProvider { private final TypeSerializer windowSerializer; private Window window; + private final IdentityHashMap + storageToStateDescriptors = new IdentityHashMap<>(); + private final int descriptorsCacheMaxSize; - public WindowedStorageProvider(KeyedStateBackend keyedStateBackend, - TypeSerializer windowSerializer) { + /* + * @descriptorsCacheMaxSize: + * + * The maximum size of the storage-to-state descriptor "cache". If we + * exceed this value we are either running a program with this many or + * more logical states or - rather more likely - the user's operators + * are unnecessarily re-creating descriptor objects again and again, + * which is something they want to avoid. + * + * TODO we should avoid the need for this cache in future by having user + * code asking for the storage through an already "resolved descriptor" + * such that the translation is not necessary at all when processing elements. + */ + WindowedStorageProvider(KeyedStateBackend keyedStateBackend, + TypeSerializer windowSerializer, + int descriptorsCacheMaxSize) { this.keyedStateBackend = keyedStateBackend; this.windowSerializer = windowSerializer; + this.descriptorsCacheMaxSize = descriptorsCacheMaxSize; } public void setWindow(Window window) { @@ -57,18 +81,23 @@ public void setWindow(Window window) { public ValueStorage getValueStorage(ValueStorageDescriptor descriptor) { try { if (descriptor instanceof ValueStorageDescriptor.MergingValueStorageDescriptor) { - ReducingStateDescriptor flinkDescriptor = Descriptors.from( - (ValueStorageDescriptor.MergingValueStorageDescriptor) descriptor); + ReducingStateDescriptor flinkDescriptor = + (ReducingStateDescriptor) storageToStateDescriptors.computeIfAbsent( + descriptor, k -> Descriptors.from((ValueStorageDescriptor.MergingValueStorageDescriptor) k)); + validateStateDescriptorSize(); return new FlinkReducingValueStorage<>((ReducingState) - keyedStateBackend.getPartitionedState(window, - windowSerializer, flinkDescriptor), + keyedStateBackend.getPartitionedState(window, windowSerializer, flinkDescriptor), descriptor.getDefaultValue(), window); } else { + ValueStateDescriptor flinkDescriptor = + (ValueStateDescriptor) storageToStateDescriptors.computeIfAbsent( + descriptor, k -> Descriptors.from((ValueStorageDescriptor) k)); + validateStateDescriptorSize(); + return new FlinkValueStorage<>((ValueState) - keyedStateBackend.getPartitionedState(window, - windowSerializer, Descriptors.from(descriptor)), + keyedStateBackend.getPartitionedState(window, windowSerializer, flinkDescriptor), window); } } catch (Exception e) { @@ -80,12 +109,24 @@ public ValueStorage getValueStorage(ValueStorageDescriptor descriptor) @SuppressWarnings("unchecked") public ListStorage getListStorage(ListStorageDescriptor descriptor) { try { + ListStateDescriptor flinkDescriptor = + (ListStateDescriptor) storageToStateDescriptors.computeIfAbsent( + descriptor, k -> Descriptors.from((ListStorageDescriptor) k)); + validateStateDescriptorSize(); + return new FlinkListStorage<>((ListState) - keyedStateBackend.getPartitionedState(window, - windowSerializer, Descriptors.from(descriptor)), + keyedStateBackend.getPartitionedState(window, windowSerializer, flinkDescriptor), window); } catch (Exception e) { throw new RuntimeException(e); } } + + private void validateStateDescriptorSize() { + if (storageToStateDescriptors.size() > descriptorsCacheMaxSize) { + throw new IllegalStateException( + "Too many state descriptors! Likely some of the storage descriptors" + + " are not declared as 'static final' and are generated for each element!"); + } + } }