Skip to content

Commit

Permalink
Merge pull request apache#71 from seznam/pete/state-combiner
Browse files Browse the repository at this point in the history
apache#51 [euphoria-core] Introduce StateMerger
  • Loading branch information
xitep authored Apr 8, 2017
2 parents 5b164dc + 60ca092 commit 0139963
Show file tree
Hide file tree
Showing 22 changed files with 308 additions and 291 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ private class JoinState

@SuppressWarnings("unchecked")
public JoinState(Context<OUT> context, StorageProvider storageProvider) {
super(context, storageProvider);
super(context);
leftElements = storageProvider.getListStorage(LEFT_STATE_DESCR);
rightElements = storageProvider.getListStorage(RIGHT_STATE_DESCR);
}
Expand Down Expand Up @@ -451,7 +451,7 @@ public BinaryFunctor<LEFT, RIGHT, OUT> getJoiner() {
getWindowing(),
getEventTimeAssigner(),
JoinState::new,
new StateSupport.MergeFromStateCombiner<>(),
new StateSupport.MergeFromStateMerger<>(),
partitioning
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.functional.CombinableReduceFunction;
import cz.seznam.euphoria.core.client.functional.ReduceFunction;
import cz.seznam.euphoria.core.client.functional.StateFactory;
import cz.seznam.euphoria.core.client.functional.UnaryFunction;
import cz.seznam.euphoria.core.client.graph.DAG;
import cz.seznam.euphoria.core.client.io.Context;
import cz.seznam.euphoria.core.client.operator.state.ListStorage;
import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor;
import cz.seznam.euphoria.core.client.operator.state.State;
import cz.seznam.euphoria.core.client.operator.state.StateFactory;
import cz.seznam.euphoria.core.client.operator.state.StorageProvider;
import cz.seznam.euphoria.core.client.operator.state.ValueStorage;
import cz.seznam.euphoria.core.client.operator.state.ValueStorageDescriptor;
Expand Down Expand Up @@ -268,8 +268,8 @@ public boolean isCombinable() {
@SuppressWarnings("unchecked")
@Override
public DAG<Operator<?, ?>> getBasicOps() {
CombinableReduceFunction<StateSupport.MergeFrom<OUT>> stateCombine =
new StateSupport.MergeFromStateCombiner();
StateSupport.MergeFromStateMerger stateCombine =
new StateSupport.MergeFromStateMerger<>();
StateFactory stateFactory = isCombinable()
? new CombiningReduceState.Factory<>((CombinableReduceFunction) reducer)
: new NonCombiningReduceState.Factory<>(reducer);
Expand All @@ -287,16 +287,16 @@ static class CombiningReduceState<E>
extends State<E, E>
implements StateSupport.MergeFrom<CombiningReduceState<E>> {

static final class Factory<E> implements StateFactory<E, State<E, E>> {
static final class Factory<E> implements StateFactory<E, E, State<E, E>> {
private final CombinableReduceFunction<E> r;

Factory(CombinableReduceFunction<E> r) {
this.r = Objects.requireNonNull(r);
}

@Override
public State<E, E> apply(Context<E> ctx, StorageProvider storageProvider) {
return new CombiningReduceState<>(ctx, storageProvider, r);
public State<E, E> createState(Context<E> context, StorageProvider storageProvider) {
return new CombiningReduceState<>(context, storageProvider, r);
}
}

Expand All @@ -310,7 +310,7 @@ public State<E, E> apply(Context<E> ctx, StorageProvider storageProvider) {
CombiningReduceState(Context<E> context,
StorageProvider storageProvider,
CombinableReduceFunction<E> reducer) {
super(context, storageProvider);
super(context);
this.reducer = Objects.requireNonNull(reducer);

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -340,49 +340,49 @@ public void close() {

@Override
public void mergeFrom(CombiningReduceState<E> other) {
this.storage.set(this.reducer.apply(Arrays.asList(this.storage.get(), other.storage.get())));
this.add(other.storage.get());
}
}

private static class NonCombiningReduceState<VALUE, OUT>
extends State<VALUE, OUT>
implements StateSupport.MergeFrom<NonCombiningReduceState<VALUE, OUT>> {
private static class NonCombiningReduceState<IN, OUT>
extends State<IN, OUT>
implements StateSupport.MergeFrom<NonCombiningReduceState<IN, OUT>> {

static final class Factory<VALUE, OUT>
implements StateFactory<OUT, NonCombiningReduceState<VALUE, OUT>> {
private final ReduceFunction<VALUE, OUT> r;
static final class Factory<IN, OUT>
implements StateFactory<IN, OUT, NonCombiningReduceState<IN, OUT>> {
private final ReduceFunction<IN, OUT> r;

Factory(ReduceFunction<VALUE, OUT> r) {
Factory(ReduceFunction<IN, OUT> r) {
this.r = Objects.requireNonNull(r);
}

@Override
public NonCombiningReduceState<VALUE, OUT>
apply(Context<OUT> ctx, StorageProvider storageProvider) {
return new NonCombiningReduceState<>(ctx, storageProvider, r);
public NonCombiningReduceState<IN, OUT>
createState(Context<OUT> context, StorageProvider storageProvider) {
return new NonCombiningReduceState<>(context, storageProvider, r);
}
}

@SuppressWarnings("unchecked")
private static final ListStorageDescriptor STORAGE_DESC =
ListStorageDescriptor.of("values", (Class) Object.class);

private final ReduceFunction<VALUE, OUT> reducer;
private final ListStorage<VALUE> reducibleValues;
private final ReduceFunction<IN, OUT> reducer;
private final ListStorage<IN> reducibleValues;

NonCombiningReduceState(Context<OUT> context,
StorageProvider storageProvider,
ReduceFunction<VALUE, OUT> reducer) {
super(context, storageProvider);
ReduceFunction<IN, OUT> reducer) {
super(context);
this.reducer = Objects.requireNonNull(reducer);

@SuppressWarnings("unchecked")
ListStorage<VALUE> ls = storageProvider.getListStorage(STORAGE_DESC);
ListStorage<IN> ls = storageProvider.getListStorage(STORAGE_DESC);
reducibleValues = ls;
}

@Override
public void add(VALUE element) {
public void add(IN element) {
reducibleValues.add(element);
}

Expand All @@ -398,7 +398,7 @@ public void close() {
}

@Override
public void mergeFrom(NonCombiningReduceState<VALUE, OUT> other) {
public void mergeFrom(NonCombiningReduceState<IN, OUT> other) {
this.reducibleValues.addAll(other.reducibleValues.get());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.functional.CombinableReduceFunction;
import cz.seznam.euphoria.core.client.functional.StateFactory;
import cz.seznam.euphoria.core.client.functional.UnaryFunction;
import cz.seznam.euphoria.core.client.operator.state.State;
import cz.seznam.euphoria.core.client.operator.state.StateFactory;
import cz.seznam.euphoria.core.client.operator.state.StateMerger;
import cz.seznam.euphoria.core.client.util.Pair;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -79,24 +79,29 @@ public <KEY> DatasetBuilder2<IN, KEY> keyBy(UnaryFunction<IN, KEY> keyExtractor)
return new DatasetBuilder2<>(name, input, keyExtractor);
}
}

public static class DatasetBuilder2<IN, KEY> {
private final String name;
private final Dataset<IN> input;
private final UnaryFunction<IN, KEY> keyExtractor;

DatasetBuilder2(String name, Dataset<IN> input, UnaryFunction<IN, KEY> keyExtractor) {
this.name = Objects.requireNonNull(name);
this.input = Objects.requireNonNull(input);
this.keyExtractor = Objects.requireNonNull(keyExtractor);
}

public <VALUE> DatasetBuilder3<IN, KEY, VALUE> valueBy(UnaryFunction<IN, VALUE> valueExtractor) {
return new DatasetBuilder3<>(name, input, keyExtractor, valueExtractor);
}
}

public static class DatasetBuilder3<IN, KEY, VALUE> {
private final String name;
private final Dataset<IN> input;
private final UnaryFunction<IN, KEY> keyExtractor;
private final UnaryFunction<IN, VALUE> valueExtractor;

DatasetBuilder3(String name,
Dataset<IN> input,
UnaryFunction<IN, KEY> keyExtractor,
Expand All @@ -107,38 +112,43 @@ public static class DatasetBuilder3<IN, KEY, VALUE> {
this.keyExtractor = Objects.requireNonNull(keyExtractor);
this.valueExtractor = Objects.requireNonNull(valueExtractor);
}

public <OUT, STATE extends State<VALUE, OUT>> DatasetBuilder4<
IN, KEY, VALUE, OUT, STATE> stateFactory(
StateFactory<OUT, STATE> stateFactory) {
StateFactory<VALUE, OUT, STATE> stateFactory) {
return new DatasetBuilder4<>(
name, input, keyExtractor, valueExtractor, stateFactory);
}
}

public static class DatasetBuilder4<
IN, KEY, VALUE, OUT, STATE extends State<VALUE, OUT>> {
private final String name;
private final Dataset<IN> input;
private final UnaryFunction<IN, KEY> keyExtractor;
private final UnaryFunction<IN, VALUE> valueExtractor;
private final StateFactory<OUT, STATE> stateFactory;
private final StateFactory<VALUE, OUT, STATE> stateFactory;

DatasetBuilder4(String name,
Dataset<IN> input,
UnaryFunction<IN, KEY> keyExtractor,
UnaryFunction<IN, VALUE> valueExtractor,
StateFactory<OUT, STATE> stateFactory)
StateFactory<VALUE, OUT, STATE> stateFactory)
{
this.name = Objects.requireNonNull(name);
this.input = Objects.requireNonNull(input);
this.keyExtractor = Objects.requireNonNull(keyExtractor);
this.valueExtractor = Objects.requireNonNull(valueExtractor);
this.stateFactory = Objects.requireNonNull(stateFactory);
}
public DatasetBuilder5<IN, KEY, VALUE, OUT, STATE> combineStateBy(
CombinableReduceFunction<STATE> stateCombiner) {

public DatasetBuilder5<IN, KEY, VALUE, OUT, STATE>
mergeStatesBy(StateMerger<VALUE, OUT, STATE> stateMerger) {
return new DatasetBuilder5<>(name, input, keyExtractor, valueExtractor,
stateFactory, stateCombiner);
stateFactory, stateMerger);
}
}

public static class DatasetBuilder5<
IN, KEY, VALUE, OUT, STATE extends State<VALUE, OUT>>
extends PartitioningBuilder<KEY, DatasetBuilder5<IN, KEY, VALUE, OUT, STATE>>
Expand All @@ -148,15 +158,15 @@ public static class DatasetBuilder5<
private final Dataset<IN> input;
private final UnaryFunction<IN, KEY> keyExtractor;
private final UnaryFunction<IN, VALUE> valueExtractor;
private final StateFactory<OUT, STATE> stateFactory;
private final CombinableReduceFunction<STATE> stateCombiner;
private final StateFactory<VALUE, OUT, STATE> stateFactory;
private final StateMerger<VALUE, OUT, STATE> stateMerger;

DatasetBuilder5(String name,
Dataset<IN> input,
UnaryFunction<IN, KEY> keyExtractor,
UnaryFunction<IN, VALUE> valueExtractor,
StateFactory<OUT, STATE> stateFactory,
CombinableReduceFunction<STATE> stateCombiner) {
StateFactory<VALUE, OUT, STATE> stateFactory,
StateMerger<VALUE, OUT, STATE> stateMerger) {
// initialize default partitioning according to input
super(new DefaultPartitioning<>(input.getNumPartitions()));

Expand All @@ -165,25 +175,28 @@ public static class DatasetBuilder5<
this.keyExtractor = Objects.requireNonNull(keyExtractor);
this.valueExtractor = Objects.requireNonNull(valueExtractor);
this.stateFactory = Objects.requireNonNull(stateFactory);
this.stateCombiner = Objects.requireNonNull(stateCombiner);
this.stateMerger = Objects.requireNonNull(stateMerger);
}

public <WIN, W extends Window>
DatasetBuilder6<IN, WIN, KEY, VALUE, OUT, STATE, W>
windowBy(Windowing<WIN, W> windowing)
{
return windowBy(windowing, null);
}

public <WIN, W extends Window>
DatasetBuilder6<IN, WIN, KEY, VALUE, OUT, STATE, W>
windowBy(Windowing<WIN, W> windowing, ExtractEventTime<WIN> eventTimeAssigner) {
return new DatasetBuilder6<>(name, input, keyExtractor, valueExtractor,
stateFactory, stateCombiner,
stateFactory, stateMerger,
Objects.requireNonNull(windowing), eventTimeAssigner, this);
}

@Override
public Dataset<Pair<KEY, OUT>> output() {
return new DatasetBuilder6<>(name, input, keyExtractor, valueExtractor,
stateFactory, stateCombiner, null, null, this)
stateFactory, stateMerger, null, null, this)
.output();
}
}
Expand All @@ -199,8 +212,8 @@ public static class DatasetBuilder6<
private final Dataset<IN> input;
private final UnaryFunction<IN, KEY> keyExtractor;
private final UnaryFunction<IN, VALUE> valueExtractor;
private final StateFactory<OUT, STATE> stateFactory;
private final CombinableReduceFunction<STATE> stateCombiner;
private final StateFactory<VALUE, OUT, STATE> stateFactory;
private final StateMerger<VALUE, OUT, STATE> stateMerger;
@Nullable
private final Windowing<WIN, W> windowing;
@Nullable
Expand All @@ -210,8 +223,8 @@ public static class DatasetBuilder6<
Dataset<IN> input,
UnaryFunction<IN, KEY> keyExtractor,
UnaryFunction<IN, VALUE> valueExtractor,
StateFactory<OUT, STATE> stateFactory,
CombinableReduceFunction<STATE> stateCombiner,
StateFactory<VALUE, OUT, STATE> stateFactory,
StateMerger<VALUE, OUT, STATE> stateMerger,
@Nullable Windowing<WIN, W> windowing,
@Nullable ExtractEventTime<WIN> eventTimeAssigner,
PartitioningBuilder<KEY, ?> partitioning)
Expand All @@ -224,7 +237,7 @@ public static class DatasetBuilder6<
this.keyExtractor = Objects.requireNonNull(keyExtractor);
this.valueExtractor = Objects.requireNonNull(valueExtractor);
this.stateFactory = Objects.requireNonNull(stateFactory);
this.stateCombiner = Objects.requireNonNull(stateCombiner);
this.stateMerger = Objects.requireNonNull(stateMerger);
this.windowing = windowing;
this.eventTimeAssigner = eventTimeAssigner;
}
Expand All @@ -236,7 +249,7 @@ public Dataset<Pair<KEY, OUT>> output() {
ReduceStateByKey<IN, IN, WIN, KEY, VALUE, KEY, OUT, STATE, W>
reduceStateByKey =
new ReduceStateByKey<>(name, flow, input, keyExtractor, valueExtractor,
windowing, eventTimeAssigner, stateFactory, stateCombiner, getPartitioning());
windowing, eventTimeAssigner, stateFactory, stateMerger, getPartitioning());
flow.add(reduceStateByKey);

return reduceStateByKey.output();
Expand All @@ -253,9 +266,9 @@ public static OfBuilder named(String name) {
return new OfBuilder(name);
}

private final StateFactory<OUT, STATE> stateFactory;
private final StateFactory<VALUE, OUT, STATE> stateFactory;
private final UnaryFunction<KIN, VALUE> valueExtractor;
private final CombinableReduceFunction<STATE> stateCombiner;
private final StateMerger<VALUE, OUT, STATE> stateCombiner;

ReduceStateByKey(String name,
Flow flow,
Expand All @@ -264,25 +277,25 @@ public static OfBuilder named(String name) {
UnaryFunction<KIN, VALUE> valueExtractor,
@Nullable Windowing<WIN, W> windowing,
@Nullable ExtractEventTime<WIN> eventTimeAssigner,
StateFactory<OUT, STATE> stateFactory,
CombinableReduceFunction<STATE> stateCombiner,
StateFactory<VALUE, OUT, STATE> stateFactory,
StateMerger<VALUE, OUT, STATE> stateMerger,
Partitioning<KEY> partitioning)
{
super(name, flow, input, keyExtractor, windowing, eventTimeAssigner, partitioning);
this.stateFactory = stateFactory;
this.valueExtractor = valueExtractor;
this.stateCombiner = stateCombiner;
this.stateCombiner = stateMerger;
}

public StateFactory<OUT, STATE> getStateFactory() {
public StateFactory<VALUE, OUT, STATE> getStateFactory() {
return stateFactory;
}

public UnaryFunction<KIN, VALUE> getValueExtractor() {
return valueExtractor;
}

public CombinableReduceFunction<STATE> getStateCombiner() {
public StateMerger<VALUE, OUT, STATE> getStateMerger() {
return stateCombiner;
}
}
Loading

0 comments on commit 0139963

Please sign in to comment.