Skip to content

Commit

Permalink
Merge pull request apache#69 from seznam/pete/experimental-state-desc…
Browse files Browse the repository at this point in the history
…riptors

apache#61 [euphoria-flink] Do not hold states in memory
  • Loading branch information
vanekjar authored Apr 3, 2017
2 parents e029705 + 6828c12 commit 4eac579
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
*/
public class CountTrigger<W extends Window> implements Trigger<W> {

private final ValueStorageDescriptor<Long> countDesc =
private static final ValueStorageDescriptor<Long> COUNT_DESCR =
ValueStorageDescriptor.of("count", Long.class, 0L, (x, y) -> x + y );

private final long maxCount;
Expand All @@ -35,7 +35,7 @@ public CountTrigger(long maxCount) {

@Override
public TriggerResult onElement(long time, W window, TriggerContext ctx) {
ValueStorage<Long> count = ctx.getValueStorage(countDesc);
ValueStorage<Long> count = ctx.getValueStorage(COUNT_DESCR);

count.set(count.get() + 1L);

Expand All @@ -53,13 +53,13 @@ public TriggerResult onTimer(long time, W window, TriggerContext ctx) {

@Override
public void onClear(W window, TriggerContext ctx) {
ctx.getValueStorage(countDesc).clear();
ctx.getValueStorage(COUNT_DESCR).clear();
}

@Override
public TriggerResult onMerge(W window, TriggerContext.TriggerMergeContext ctx) {
ctx.mergeStoredState(countDesc);
ValueStorage<Long> count = ctx.getValueStorage(countDesc);
ctx.mergeStoredState(COUNT_DESCR);
ValueStorage<Long> count = ctx.getValueStorage(COUNT_DESCR);

if (count.get() >= maxCount) {
count.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public class PeriodicTimeTrigger implements Trigger<TimeInterval> {

/** Next fire stamp (when merging the lowest timestamp is taken) */
private final ValueStorageDescriptor<Long> fireTimeDescriptor =
private static final ValueStorageDescriptor<Long> FIRE_TIME_DESCR =
ValueStorageDescriptor.of("fire-time", Long.class, Long.MAX_VALUE, Math::min);

private final long interval;
Expand All @@ -37,7 +37,7 @@ public PeriodicTimeTrigger(long interval) {

@Override
public TriggerResult onElement(long time, TimeInterval window, TriggerContext ctx) {
ValueStorage<Long> fireStamp = ctx.getValueStorage(fireTimeDescriptor);
ValueStorage<Long> fireStamp = ctx.getValueStorage(FIRE_TIME_DESCR);

if (fireStamp.get() == Long.MAX_VALUE) {
// register first timer aligned with window start
Expand All @@ -53,7 +53,7 @@ public TriggerResult onElement(long time, TimeInterval window, TriggerContext ct

@Override
public TriggerResult onTimer(long time, TimeInterval window, TriggerContext ctx) {
ValueStorage<Long> fireStamp = ctx.getValueStorage(fireTimeDescriptor);
ValueStorage<Long> fireStamp = ctx.getValueStorage(FIRE_TIME_DESCR);

if (fireStamp.get() == time) {
long nextTimestamp = time + interval;
Expand All @@ -70,16 +70,16 @@ public TriggerResult onTimer(long time, TimeInterval window, TriggerContext ctx)

@Override
public void onClear(TimeInterval window, TriggerContext ctx) {
ValueStorage<Long> fireStamp = ctx.getValueStorage(fireTimeDescriptor);
ValueStorage<Long> fireStamp = ctx.getValueStorage(FIRE_TIME_DESCR);
ctx.deleteTimer(fireStamp.get(), window);
fireStamp.clear();
}

@Override
public TriggerResult onMerge(TimeInterval window, TriggerContext.TriggerMergeContext ctx) {
ctx.mergeStoredState(fireTimeDescriptor);
ctx.mergeStoredState(FIRE_TIME_DESCR);
// register timer according to merged state
ValueStorage<Long> fireStamp = ctx.getValueStorage(fireTimeDescriptor);
ValueStorage<Long> fireStamp = ctx.getValueStorage(FIRE_TIME_DESCR);
ctx.registerTimer(fireStamp.get(), window);

return TriggerResult.NOOP;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,20 @@

class ReduceStateByKeyTranslator implements StreamingOperatorTranslator<ReduceStateByKey> {

static String CFG_VALUE_OF_AFTER_SHUFFLE_KEY = "euphoria.flink.streaming.windowing.only.after.shuffle";
static boolean CFG_VALUE_OF_AFTER_SHUFFLE_DEFAULT = false;
static final String CFG_VALUE_OF_AFTER_SHUFFLE_KEY = "euphoria.flink.streaming.windowing.only.after.shuffle";
static final boolean CFG_VALUE_OF_AFTER_SHUFFLE_DEFAULT = false;

private boolean valueOfAfterShuffle = true;
static final String CFG_DESCRIPTORS_CACHE_SIZE_MAX_KEY = "euphoria.flink.streaming.descriptors.cache.max.size";
static final int CFG_DESCRIPTORS_CACHE_MAX_SIZE_DEFAULT = 1000;

private boolean valueOfAfterShuffle;
private int descriptorsCacheMaxSize;

public ReduceStateByKeyTranslator(Settings settings) {
this.valueOfAfterShuffle =
settings.getBoolean(CFG_VALUE_OF_AFTER_SHUFFLE_KEY, CFG_VALUE_OF_AFTER_SHUFFLE_DEFAULT);
this.descriptorsCacheMaxSize =
settings.getInt(CFG_DESCRIPTORS_CACHE_SIZE_MAX_KEY, CFG_DESCRIPTORS_CACHE_MAX_SIZE_DEFAULT);
}

@Override
Expand Down Expand Up @@ -91,7 +97,9 @@ public DataStream<?> translate(FlinkOperator<ReduceStateByKey> operator,
if (valueOfAfterShuffle) {
reduced = input.keyBy(new UnaryFunctionKeyExtractor(keyExtractor))
.transform(operator.getName(), TypeInformation.of(StreamingElement.class),
new StreamingElementWindowOperator(elMapper, windowing, stateFactory, stateCombiner, context.isLocalMode()))
new StreamingElementWindowOperator(
elMapper, windowing, stateFactory, stateCombiner,
context.isLocalMode(), descriptorsCacheMaxSize))
.setParallelism(operator.getParallelism());
} else {
// assign windows
Expand All @@ -104,7 +112,9 @@ public DataStream<?> translate(FlinkOperator<ReduceStateByKey> operator,
.setParallelism(input.getParallelism());
reduced = (DataStream) windowed.keyBy(new KeyedMultiWindowedElementKeyExtractor())
.transform(operator.getName(), TypeInformation.of(StreamingElement.class),
new KeyedMultiWindowedElementWindowOperator<>(windowing, stateFactory, stateCombiner, context.isLocalMode()))
new KeyedMultiWindowedElementWindowOperator<>(
windowing, stateFactory, stateCombiner,
context.isLocalMode(), descriptorsCacheMaxSize))
.setParallelism(operator.getParallelism());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
*/
package cz.seznam.euphoria.flink.streaming.windowing;

import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Table;
import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing;
import cz.seznam.euphoria.core.client.dataset.windowing.TimedWindow;
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
Expand Down Expand Up @@ -79,6 +77,9 @@ public abstract class AbstractWindowOperator<I, KEY, WID extends Window>
/** True when executor is running in local test (mode) */
private final boolean localMode;

// see {@link WindowedStorageProvider}
private final int descriptorsCacheMaxSize;

private transient InternalTimerService<WID> timerService;

// tracks existing windows to flush them in case of end of stream is reached
Expand All @@ -92,18 +93,17 @@ public abstract class AbstractWindowOperator<I, KEY, WID extends Window>

private transient TypeSerializer<WID> windowSerializer;

// cached window states by key+window
private transient Table<KEY, WID, State> windowStates;

public AbstractWindowOperator(Windowing<?, WID> windowing,
StateFactory<?, State> stateFactory,
CombinableReduceFunction<State> stateCombiner,
boolean localMode) {
boolean localMode,
int descriptorsCacheMaxSize) {
this.windowing = Objects.requireNonNull(windowing);
this.trigger = windowing.getTrigger();
this.stateFactory = Objects.requireNonNull(stateFactory);
this.stateCombiner = Objects.requireNonNull(stateCombiner);
this.localMode = localMode;
this.descriptorsCacheMaxSize = descriptorsCacheMaxSize;
}

@Override
Expand All @@ -122,9 +122,7 @@ public void open() throws Exception {
this.triggerContext = new TriggerContextAdapter();
this.outputContext = new OutputContext();
this.storageProvider = new WindowedStorageProvider<>(
getKeyedStateBackend(), windowSerializer);

this.windowStates = HashBasedTable.create();
getKeyedStateBackend(), windowSerializer, descriptorsCacheMaxSize);

if (windowing instanceof MergingWindowing) {
TupleSerializer<Tuple2<WID, WID>> tupleSerializer =
Expand Down Expand Up @@ -173,6 +171,7 @@ public void processElement(StreamRecord<I> record)
triggerContext.setWindow(mergeResult);

processTriggerResult(mergeResult,
null,
triggerContext.onMerge(mergedWindows),
mergingWindowSet);

Expand All @@ -197,7 +196,6 @@ public void processElement(StreamRecord<I> record)
// remove merged window states
mergedStateWindows.forEach(sw -> {
getWindowState(sw).close();
removeState(sw);
});
});

Expand All @@ -215,9 +213,10 @@ public void processElement(StreamRecord<I> record)

WID stateWindow = mergingWindowSet.getStateWindow(currentWindow);
setupEnvironment(getCurrentKey(), stateWindow);
getWindowState(stateWindow).add(element.getValue());
State stateWindowState = getWindowState(stateWindow);
stateWindowState.add(element.getValue());

processTriggerResult(currentWindow, triggerResult, mergingWindowSet);
processTriggerResult(currentWindow, stateWindowState, triggerResult, mergingWindowSet);
}
mergingWindowSet.persist();

Expand All @@ -230,15 +229,16 @@ public void processElement(StreamRecord<I> record)
endOfStreamTimerService.registerEventTimeTimer(window, Long.MAX_VALUE);
}

getWindowState(window).add(element.getValue());
State windowState = getWindowState(window);
windowState.add(element.getValue());

// process trigger
Trigger.TriggerResult triggerResult = trigger.onElement(
record.getTimestamp(),
window,
triggerContext);

processTriggerResult(window, triggerResult, null);
processTriggerResult(window, windowState, triggerResult, null);
}
}
}
Expand All @@ -263,7 +263,7 @@ public void onEventTime(InternalTimer<KEY, WID> timer) throws Exception {
mergingWindowSet = getMergingWindowSet();
}

processTriggerResult(window, triggerResult, mergingWindowSet);
processTriggerResult(window, null, triggerResult, mergingWindowSet);

if (mergingWindowSet != null) {
mergingWindowSet.persist();
Expand Down Expand Up @@ -291,45 +291,38 @@ public void processWatermark(Watermark mark) throws Exception {
}

private void processTriggerResult(WID window,
// ~ @windowState the state of `window` to
// use; if `null` the state will be fetched
@Nullable State windowState,
Trigger.TriggerResult tr,
@Nullable MergingWindowSet<WID> mergingWindowSet) {

if (tr.isFlush() || tr.isPurge()) {
WID stateWindow = window;
State windowState;

if (windowing instanceof MergingWindowing) {
Objects.requireNonNull(mergingWindowSet);
stateWindow = mergingWindowSet.getStateWindow(window);
windowState = getWindowState(stateWindow);
} else {
windowState = getWindowState(window);
if (windowState == null) {
if (windowing instanceof MergingWindowing) {
Objects.requireNonNull(mergingWindowSet);
windowState = getWindowState(mergingWindowSet.getStateWindow(window));
} else {
windowState = getWindowState(window);
}
}

if (tr.isFlush() || tr.isPurge()) {
if (tr.isFlush()) {
windowState.flush();
}
if (tr.isFlush()) {
windowState.flush();
}

if (tr.isPurge()) {
windowState.close();
trigger.onClear(window, triggerContext);
removeWindow(window, mergingWindowSet);
removeState(stateWindow);
}
if (tr.isPurge()) {
windowState.close();
trigger.onClear(window, triggerContext);
removeWindow(window, mergingWindowSet);
}
}
}

@SuppressWarnings("unchecked")
private State getWindowState(WID window) {
State windowState = windowStates.get(getCurrentKey(), window);
if (windowState == null) {
windowState = stateFactory.apply(outputContext, storageProvider);
windowStates.put((KEY) getCurrentKey(), window, windowState);
}

return windowState;
storageProvider.setWindow(window);
return stateFactory.apply(outputContext, storageProvider);
}

private MergingWindowSet<WID> getMergingWindowSet() {
Expand Down Expand Up @@ -357,10 +350,6 @@ private void removeWindow(WID window, @Nullable MergingWindowSet<WID> windowSet)
}
}

private void removeState(WID stateWindow) {
windowStates.remove(getCurrentKey(), stateWindow);
}

// -------------------

private class TriggerContextAdapter implements TriggerContext, TriggerContext.TriggerMergeContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ public KeyedMultiWindowedElementWindowOperator(
Windowing<?, WID> windowing,
StateFactory<?, State> stateFactory,
CombinableReduceFunction<State> stateCombiner,
boolean localMode) {
super(windowing, stateFactory, stateCombiner, localMode);
boolean localMode,
int descriptorsCacheMaxSize) {
super(windowing, stateFactory, stateCombiner, localMode, descriptorsCacheMaxSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public W addWindow(W newWindow, MergeCallback<W> callback) throws Exception {

// Compute the state windows that we are merging
List<W> mergedStateWindows = new ArrayList<>();
for (W mergedWindow: mergedWindows) {
for (W mergedWindow : mergedWindows) {
W res = this.mapping.remove(mergedWindow);
if (res != null) {
mergedStateWindows.add(res);
Expand All @@ -153,7 +153,7 @@ public W addWindow(W newWindow, MergeCallback<W> callback) throws Exception {

callback.merge(mergeResult,
mergedWindows,
mapping.get(mergeResult),
mergedStateWindow,
mergedStateWindows);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ public StreamingElementWindowOperator(
Windowing<?, WID> windowing,
StateFactory<?, State> stateFactory,
CombinableReduceFunction<State> stateCombiner,
boolean localMode) {
super(windowing, stateFactory, stateCombiner, localMode);
boolean localMode,
int descriptorsCacheMaxSize) {
super(windowing, stateFactory, stateCombiner, localMode, descriptorsCacheMaxSize);
this.windowAssigner = Objects.requireNonNull(windowAssigner);
}

Expand Down
Loading

0 comments on commit 4eac579

Please sign in to comment.