Skip to content

Commit

Permalink
apache#51 StateFactory interface revised
Browse files Browse the repository at this point in the history
  • Loading branch information
Novotnik, Petr authored and David Moravek committed May 15, 2018
1 parent aaef209 commit 28b5855
Show file tree
Hide file tree
Showing 14 changed files with 83 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.functional.CombinableReduceFunction;
import cz.seznam.euphoria.core.client.functional.ReduceFunction;
import cz.seznam.euphoria.core.client.operator.state.StateFactory;
import cz.seznam.euphoria.core.client.functional.UnaryFunction;
import cz.seznam.euphoria.core.client.graph.DAG;
import cz.seznam.euphoria.core.client.io.Context;
import cz.seznam.euphoria.core.client.operator.state.ListStorage;
import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor;
import cz.seznam.euphoria.core.client.operator.state.State;
import cz.seznam.euphoria.core.client.operator.state.StateFactory;
import cz.seznam.euphoria.core.client.operator.state.StorageProvider;
import cz.seznam.euphoria.core.client.operator.state.ValueStorage;
import cz.seznam.euphoria.core.client.operator.state.ValueStorageDescriptor;
Expand Down Expand Up @@ -287,16 +287,16 @@ static class CombiningReduceState<E>
extends State<E, E>
implements StateSupport.MergeFrom<CombiningReduceState<E>> {

static final class Factory<E> implements StateFactory<E, State<E, E>> {
static final class Factory<E> implements StateFactory<E, E, State<E, E>> {
private final CombinableReduceFunction<E> r;

Factory(CombinableReduceFunction<E> r) {
this.r = Objects.requireNonNull(r);
}

@Override
public State<E, E> apply(Context<E> ctx, StorageProvider storageProvider) {
return new CombiningReduceState<>(ctx, storageProvider, r);
public State<E, E> createState(Context<E> context, StorageProvider storageProvider) {
return new CombiningReduceState<>(context, storageProvider, r);
}
}

Expand Down Expand Up @@ -344,45 +344,45 @@ public void mergeFrom(CombiningReduceState<E> other) {
}
}

private static class NonCombiningReduceState<VALUE, OUT>
extends State<VALUE, OUT>
implements StateSupport.MergeFrom<NonCombiningReduceState<VALUE, OUT>> {
private static class NonCombiningReduceState<IN, OUT>
extends State<IN, OUT>
implements StateSupport.MergeFrom<NonCombiningReduceState<IN, OUT>> {

static final class Factory<VALUE, OUT>
implements StateFactory<OUT, NonCombiningReduceState<VALUE, OUT>> {
private final ReduceFunction<VALUE, OUT> r;
static final class Factory<IN, OUT>
implements StateFactory<IN, OUT, NonCombiningReduceState<IN, OUT>> {
private final ReduceFunction<IN, OUT> r;

Factory(ReduceFunction<VALUE, OUT> r) {
Factory(ReduceFunction<IN, OUT> r) {
this.r = Objects.requireNonNull(r);
}

@Override
public NonCombiningReduceState<VALUE, OUT>
apply(Context<OUT> ctx, StorageProvider storageProvider) {
return new NonCombiningReduceState<>(ctx, storageProvider, r);
public NonCombiningReduceState<IN, OUT>
createState(Context<OUT> context, StorageProvider storageProvider) {
return new NonCombiningReduceState<>(context, storageProvider, r);
}
}

@SuppressWarnings("unchecked")
private static final ListStorageDescriptor STORAGE_DESC =
ListStorageDescriptor.of("values", (Class) Object.class);

private final ReduceFunction<VALUE, OUT> reducer;
private final ListStorage<VALUE> reducibleValues;
private final ReduceFunction<IN, OUT> reducer;
private final ListStorage<IN> reducibleValues;

NonCombiningReduceState(Context<OUT> context,
StorageProvider storageProvider,
ReduceFunction<VALUE, OUT> reducer) {
ReduceFunction<IN, OUT> reducer) {
super(context);
this.reducer = Objects.requireNonNull(reducer);

@SuppressWarnings("unchecked")
ListStorage<VALUE> ls = storageProvider.getListStorage(STORAGE_DESC);
ListStorage<IN> ls = storageProvider.getListStorage(STORAGE_DESC);
reducibleValues = ls;
}

@Override
public void add(VALUE element) {
public void add(IN element) {
reducibleValues.add(element);
}

Expand All @@ -398,7 +398,7 @@ public void close() {
}

@Override
public void mergeFrom(NonCombiningReduceState<VALUE, OUT> other) {
public void mergeFrom(NonCombiningReduceState<IN, OUT> other) {
this.reducibleValues.addAll(other.reducibleValues.get());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.functional.CombinableReduceFunction;
import cz.seznam.euphoria.core.client.operator.state.StateFactory;
import cz.seznam.euphoria.core.client.functional.UnaryFunction;
import cz.seznam.euphoria.core.client.operator.state.State;
import cz.seznam.euphoria.core.client.operator.state.StateFactory;
import cz.seznam.euphoria.core.client.util.Pair;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -109,7 +109,7 @@ public static class DatasetBuilder3<IN, KEY, VALUE> {
}
public <OUT, STATE extends State<VALUE, OUT>> DatasetBuilder4<
IN, KEY, VALUE, OUT, STATE> stateFactory(
StateFactory<OUT, STATE> stateFactory) {
StateFactory<VALUE, OUT, STATE> stateFactory) {
return new DatasetBuilder4<>(
name, input, keyExtractor, valueExtractor, stateFactory);
}
Expand All @@ -120,12 +120,12 @@ public static class DatasetBuilder4<
private final Dataset<IN> input;
private final UnaryFunction<IN, KEY> keyExtractor;
private final UnaryFunction<IN, VALUE> valueExtractor;
private final StateFactory<OUT, STATE> stateFactory;
private final StateFactory<VALUE, OUT, STATE> stateFactory;
DatasetBuilder4(String name,
Dataset<IN> input,
UnaryFunction<IN, KEY> keyExtractor,
UnaryFunction<IN, VALUE> valueExtractor,
StateFactory<OUT, STATE> stateFactory)
StateFactory<VALUE, OUT, STATE> stateFactory)
{
this.name = Objects.requireNonNull(name);
this.input = Objects.requireNonNull(input);
Expand All @@ -148,14 +148,14 @@ public static class DatasetBuilder5<
private final Dataset<IN> input;
private final UnaryFunction<IN, KEY> keyExtractor;
private final UnaryFunction<IN, VALUE> valueExtractor;
private final StateFactory<OUT, STATE> stateFactory;
private final StateFactory<VALUE, OUT, STATE> stateFactory;
private final CombinableReduceFunction<STATE> stateCombiner;

DatasetBuilder5(String name,
Dataset<IN> input,
UnaryFunction<IN, KEY> keyExtractor,
UnaryFunction<IN, VALUE> valueExtractor,
StateFactory<OUT, STATE> stateFactory,
StateFactory<VALUE, OUT, STATE> stateFactory,
CombinableReduceFunction<STATE> stateCombiner) {
// initialize default partitioning according to input
super(new DefaultPartitioning<>(input.getNumPartitions()));
Expand Down Expand Up @@ -199,7 +199,7 @@ public static class DatasetBuilder6<
private final Dataset<IN> input;
private final UnaryFunction<IN, KEY> keyExtractor;
private final UnaryFunction<IN, VALUE> valueExtractor;
private final StateFactory<OUT, STATE> stateFactory;
private final StateFactory<VALUE, OUT, STATE> stateFactory;
private final CombinableReduceFunction<STATE> stateCombiner;
@Nullable
private final Windowing<WIN, W> windowing;
Expand All @@ -210,7 +210,7 @@ public static class DatasetBuilder6<
Dataset<IN> input,
UnaryFunction<IN, KEY> keyExtractor,
UnaryFunction<IN, VALUE> valueExtractor,
StateFactory<OUT, STATE> stateFactory,
StateFactory<VALUE, OUT, STATE> stateFactory,
CombinableReduceFunction<STATE> stateCombiner,
@Nullable Windowing<WIN, W> windowing,
@Nullable ExtractEventTime<WIN> eventTimeAssigner,
Expand Down Expand Up @@ -253,7 +253,7 @@ public static OfBuilder named(String name) {
return new OfBuilder(name);
}

private final StateFactory<OUT, STATE> stateFactory;
private final StateFactory<VALUE, OUT, STATE> stateFactory;
private final UnaryFunction<KIN, VALUE> valueExtractor;
private final CombinableReduceFunction<STATE> stateCombiner;

Expand All @@ -264,7 +264,7 @@ public static OfBuilder named(String name) {
UnaryFunction<KIN, VALUE> valueExtractor,
@Nullable Windowing<WIN, W> windowing,
@Nullable ExtractEventTime<WIN> eventTimeAssigner,
StateFactory<OUT, STATE> stateFactory,
StateFactory<VALUE, OUT, STATE> stateFactory,
CombinableReduceFunction<STATE> stateCombiner,
Partitioning<KEY> partitioning)
{
Expand All @@ -274,7 +274,7 @@ public static OfBuilder named(String name) {
this.stateCombiner = stateCombiner;
}

public StateFactory<OUT, STATE> getStateFactory() {
public StateFactory<VALUE, OUT, STATE> getStateFactory() {
return stateFactory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import cz.seznam.euphoria.core.client.graph.DAG;
import cz.seznam.euphoria.core.client.io.Context;
import cz.seznam.euphoria.core.client.operator.state.State;
import cz.seznam.euphoria.core.client.operator.state.StateFactory;
import cz.seznam.euphoria.core.client.operator.state.StorageProvider;
import cz.seznam.euphoria.core.client.operator.state.ValueStorage;
import cz.seznam.euphoria.core.client.operator.state.ValueStorageDescriptor;
Expand Down Expand Up @@ -294,13 +295,13 @@ public UnaryFunction<IN, SCORE> getScoreExtractor() {
MaxScored<VALUE, SCORE>, W>
reduce =
new ReduceStateByKey<>(getName() + "::ReduceStateByKey", flow, input,
keyExtractor,
e -> Pair.of(valueFn.apply(e), scoreFn.apply(e)),
windowing,
eventTimeAssigner,
MaxScored::new,
stateCombiner,
partitioning);
keyExtractor,
e -> Pair.of(valueFn.apply(e), scoreFn.apply(e)),
windowing,
eventTimeAssigner,
(StateFactory<Pair<VALUE, SCORE>, Pair<VALUE, SCORE>, MaxScored<VALUE, SCORE>>) MaxScored::new,
stateCombiner,
partitioning);

MapElements<Pair<KEY, Pair<VALUE, SCORE>>, Triple<KEY, VALUE, SCORE>>
format =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
*/
package cz.seznam.euphoria.core.client.operator.state;

import cz.seznam.euphoria.core.client.functional.BinaryFunction;
import cz.seznam.euphoria.core.client.io.Context;
import cz.seznam.euphoria.core.client.operator.state.StorageProvider;

import java.io.Serializable;

/**
* Factory for states.
*/
public interface StateFactory<T, STATE>
extends BinaryFunction<Context<T>, StorageProvider, STATE> {
@FunctionalInterface
public interface StateFactory<IN, OUT, STATE extends State<IN, OUT>> extends Serializable {

STATE createState(Context<OUT> context, StorageProvider storageProvider);

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.functional.BinaryFunction;
import cz.seznam.euphoria.core.client.functional.CombinableReduceFunction;
import cz.seznam.euphoria.core.client.operator.state.StateFactory;
import cz.seznam.euphoria.core.client.io.Context;
import cz.seznam.euphoria.core.client.operator.state.ListStorage;
import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor;
import cz.seznam.euphoria.core.client.operator.state.MergingStorageDescriptor;
import cz.seznam.euphoria.core.client.operator.state.State;
import cz.seznam.euphoria.core.client.operator.state.StateFactory;
import cz.seznam.euphoria.core.client.operator.state.Storage;
import cz.seznam.euphoria.core.client.operator.state.StorageDescriptor;
import cz.seznam.euphoria.core.client.operator.state.StorageProvider;
Expand Down Expand Up @@ -68,7 +68,7 @@ public interface WindowedElementFactory<W extends Window, T> {
WindowedElement<W, T> create(W window, long timestamp, T element);
}

private final StateFactory<?, State> stateFactory;
private final StateFactory<I, ?, State<I, ?>> stateFactory;
private final WindowedElementFactory<WID, Object> elementFactory;
private final CombinableReduceFunction<State> stateCombiner;
private final StorageProvider stateStorageProvider;
Expand All @@ -82,7 +82,7 @@ public interface WindowedElementFactory<W extends Window, T> {
final HashMap<WID, State> states = new HashMap<>();
KEY key;

public GroupReducer(StateFactory<?, State> stateFactory,
public GroupReducer(StateFactory<I, ?, State<I, ?>> stateFactory,
WindowedElementFactory<WID, Object> elementFactory,
CombinableReduceFunction<State> stateCombiner,
StorageProvider stateStorageProvider,
Expand Down Expand Up @@ -123,7 +123,7 @@ public void process(WindowedElement<WID, Pair<KEY, I>> elem) {
{
State state = states.get(window);
if (state == null) {
state = stateFactory.apply(
state = stateFactory.createState(
new ElementCollectContext(collector, window), stateStorageProvider);
states.put(window, state);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ static class RSBKReducer
implements GroupReduceFunction<BatchElement<?, Pair>, BatchElement<?, Pair>>,
ResultTypeQueryable<BatchElement<?, Pair>>
{
private final StateFactory<?, State> stateFactory;
private final CombinableReduceFunction<State> stateCombiner;
private final StateFactory<?, ?, State<?, ?>> stateFactory;
private final CombinableReduceFunction<State<?, ?>> stateCombiner;
private final StorageProvider stateStorageProvider;
private final Windowing windowing;
private final Trigger trigger;
Expand All @@ -152,7 +152,7 @@ static class RSBKReducer
public void reduce(Iterable<BatchElement<?, Pair>> values,
org.apache.flink.util.Collector<BatchElement<?, Pair>> out)
{
GroupReducer reducer = new GroupReducer<>(
GroupReducer reducer = new GroupReducer(
stateFactory,
BatchElement::new,
stateCombiner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ public DataStream<?> translate(FlinkOperator<ReduceStateByKey> operator,

ReduceStateByKey origOperator = operator.getOriginalOperator();

StateFactory<?, State> stateFactory = origOperator.getStateFactory();
CombinableReduceFunction stateCombiner = origOperator.getStateCombiner();
StateFactory<?, ?, State<?, ?>> stateFactory = origOperator.getStateFactory();
CombinableReduceFunction<State<?, ?>> stateCombiner = origOperator.getStateCombiner();

Windowing windowing = origOperator.getWindowing();
if (windowing == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.functional.CombinableReduceFunction;
import cz.seznam.euphoria.core.client.operator.state.StateFactory;
import cz.seznam.euphoria.core.client.io.Context;
import cz.seznam.euphoria.core.client.operator.state.ListStorage;
import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor;
import cz.seznam.euphoria.core.client.operator.state.MergingStorageDescriptor;
import cz.seznam.euphoria.core.client.operator.state.State;
import cz.seznam.euphoria.core.client.operator.state.StateFactory;
import cz.seznam.euphoria.core.client.operator.state.StorageDescriptor;
import cz.seznam.euphoria.core.client.operator.state.ValueStorage;
import cz.seznam.euphoria.core.client.operator.state.ValueStorageDescriptor;
Expand Down Expand Up @@ -65,8 +65,8 @@ public abstract class AbstractWindowOperator<I, KEY, WID extends Window>

private final Windowing<?, WID> windowing;
private final Trigger<WID> trigger;
private final StateFactory<?, State> stateFactory;
private final CombinableReduceFunction<State> stateCombiner;
private final StateFactory<?, ?, State<?, ?>> stateFactory;
private final CombinableReduceFunction<State<?, ?>> stateCombiner;


// FIXME Arguable hack that ensures all remaining opened windows
Expand Down Expand Up @@ -94,8 +94,8 @@ public abstract class AbstractWindowOperator<I, KEY, WID extends Window>
private transient TypeSerializer<WID> windowSerializer;

public AbstractWindowOperator(Windowing<?, WID> windowing,
StateFactory<?, State> stateFactory,
CombinableReduceFunction<State> stateCombiner,
StateFactory<?, ?, State<?, ?>> stateFactory,
CombinableReduceFunction<State<?, ?>> stateCombiner,
boolean localMode,
int descriptorsCacheMaxSize) {
this.windowing = Objects.requireNonNull(windowing);
Expand Down Expand Up @@ -191,7 +191,7 @@ public void processElement(StreamRecord<I> record)
List<State> states = new ArrayList<>();
states.add(getWindowState(stateResultWindow));
mergedStateWindows.forEach(sw -> states.add(getWindowState(sw)));
stateCombiner.apply(states);
stateCombiner.apply((Iterable) states);

// remove merged window states
mergedStateWindows.forEach(sw -> {
Expand Down Expand Up @@ -322,7 +322,7 @@ private void processTriggerResult(WID window,
@SuppressWarnings("unchecked")
private State getWindowState(WID window) {
storageProvider.setWindow(window);
return stateFactory.apply(outputContext, storageProvider);
return stateFactory.createState(outputContext, storageProvider);
}

private MergingWindowSet<WID> getMergingWindowSet() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public class KeyedMultiWindowedElementWindowOperator<KEY, WID extends Window>

public KeyedMultiWindowedElementWindowOperator(
Windowing<?, WID> windowing,
StateFactory<?, State> stateFactory,
CombinableReduceFunction<State> stateCombiner,
StateFactory<?, ?, State<?, ?>> stateFactory,
CombinableReduceFunction<State<?, ?>> stateCombiner,
boolean localMode,
int descriptorsCacheMaxSize) {
super(windowing, stateFactory, stateCombiner, localMode, descriptorsCacheMaxSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public class StreamingElementWindowOperator<KEY, WID extends Window>
public StreamingElementWindowOperator(
WindowAssigner<?, KEY, ?, WID> windowAssigner,
Windowing<?, WID> windowing,
StateFactory<?, State> stateFactory,
CombinableReduceFunction<State> stateCombiner,
StateFactory<?, ?, State<?, ?>> stateFactory,
CombinableReduceFunction<State<?, ?>> stateCombiner,
boolean localMode,
int descriptorsCacheMaxSize) {
super(windowing, stateFactory, stateCombiner, localMode, descriptorsCacheMaxSize);
Expand Down
Loading

0 comments on commit 28b5855

Please sign in to comment.