Skip to content

Commit

Permalink
apache#51 Replace StateCombiner with StateMerger
Browse files Browse the repository at this point in the history
  • Loading branch information
Novotnik, Petr authored and David Moravek committed Oct 5, 2018
1 parent 32fc452 commit 59ca612
Show file tree
Hide file tree
Showing 20 changed files with 218 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ public BinaryFunctor<LEFT, RIGHT, OUT> getJoiner() {
getWindowing(),
getEventTimeAssigner(),
JoinState::new,
new StateSupport.MergeFromStateCombiner<>(),
new StateSupport.MergeFromStateMerger<>(),
partitioning
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,8 @@ public boolean isCombinable() {
@SuppressWarnings("unchecked")
@Override
public DAG<Operator<?, ?>> getBasicOps() {
CombinableReduceFunction<StateSupport.MergeFrom<OUT>> stateCombine =
new StateSupport.MergeFromStateCombiner();
StateSupport.MergeFromStateMerger stateCombine =
new StateSupport.MergeFromStateMerger<>();
StateFactory stateFactory = isCombinable()
? new CombiningReduceState.Factory<>((CombinableReduceFunction) reducer)
: new NonCombiningReduceState.Factory<>(reducer);
Expand Down Expand Up @@ -340,7 +340,7 @@ public void close() {

@Override
public void mergeFrom(CombiningReduceState<E> other) {
this.storage.set(this.reducer.apply(Arrays.asList(this.storage.get(), other.storage.get())));
this.add(other.storage.get());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.functional.CombinableReduceFunction;
import cz.seznam.euphoria.core.client.functional.UnaryFunction;
import cz.seznam.euphoria.core.client.operator.state.State;
import cz.seznam.euphoria.core.client.operator.state.StateFactory;
import cz.seznam.euphoria.core.client.operator.state.StateMerger;
import cz.seznam.euphoria.core.client.util.Pair;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -79,24 +79,29 @@ public <KEY> DatasetBuilder2<IN, KEY> keyBy(UnaryFunction<IN, KEY> keyExtractor)
return new DatasetBuilder2<>(name, input, keyExtractor);
}
}

public static class DatasetBuilder2<IN, KEY> {
private final String name;
private final Dataset<IN> input;
private final UnaryFunction<IN, KEY> keyExtractor;

DatasetBuilder2(String name, Dataset<IN> input, UnaryFunction<IN, KEY> keyExtractor) {
this.name = Objects.requireNonNull(name);
this.input = Objects.requireNonNull(input);
this.keyExtractor = Objects.requireNonNull(keyExtractor);
}

public <VALUE> DatasetBuilder3<IN, KEY, VALUE> valueBy(UnaryFunction<IN, VALUE> valueExtractor) {
return new DatasetBuilder3<>(name, input, keyExtractor, valueExtractor);
}
}

public static class DatasetBuilder3<IN, KEY, VALUE> {
private final String name;
private final Dataset<IN> input;
private final UnaryFunction<IN, KEY> keyExtractor;
private final UnaryFunction<IN, VALUE> valueExtractor;

DatasetBuilder3(String name,
Dataset<IN> input,
UnaryFunction<IN, KEY> keyExtractor,
Expand All @@ -107,20 +112,23 @@ public static class DatasetBuilder3<IN, KEY, VALUE> {
this.keyExtractor = Objects.requireNonNull(keyExtractor);
this.valueExtractor = Objects.requireNonNull(valueExtractor);
}

public <OUT, STATE extends State<VALUE, OUT>> DatasetBuilder4<
IN, KEY, VALUE, OUT, STATE> stateFactory(
StateFactory<VALUE, OUT, STATE> stateFactory) {
return new DatasetBuilder4<>(
name, input, keyExtractor, valueExtractor, stateFactory);
}
}

public static class DatasetBuilder4<
IN, KEY, VALUE, OUT, STATE extends State<VALUE, OUT>> {
private final String name;
private final Dataset<IN> input;
private final UnaryFunction<IN, KEY> keyExtractor;
private final UnaryFunction<IN, VALUE> valueExtractor;
private final StateFactory<VALUE, OUT, STATE> stateFactory;

DatasetBuilder4(String name,
Dataset<IN> input,
UnaryFunction<IN, KEY> keyExtractor,
Expand All @@ -133,12 +141,14 @@ public static class DatasetBuilder4<
this.valueExtractor = Objects.requireNonNull(valueExtractor);
this.stateFactory = Objects.requireNonNull(stateFactory);
}
public DatasetBuilder5<IN, KEY, VALUE, OUT, STATE> combineStateBy(
CombinableReduceFunction<STATE> stateCombiner) {

public DatasetBuilder5<IN, KEY, VALUE, OUT, STATE>
mergeStatesBy(StateMerger<VALUE, OUT, STATE> stateMerger) {
return new DatasetBuilder5<>(name, input, keyExtractor, valueExtractor,
stateFactory, stateCombiner);
stateFactory, stateMerger);
}
}

public static class DatasetBuilder5<
IN, KEY, VALUE, OUT, STATE extends State<VALUE, OUT>>
extends PartitioningBuilder<KEY, DatasetBuilder5<IN, KEY, VALUE, OUT, STATE>>
Expand All @@ -149,14 +159,14 @@ public static class DatasetBuilder5<
private final UnaryFunction<IN, KEY> keyExtractor;
private final UnaryFunction<IN, VALUE> valueExtractor;
private final StateFactory<VALUE, OUT, STATE> stateFactory;
private final CombinableReduceFunction<STATE> stateCombiner;
private final StateMerger<VALUE, OUT, STATE> stateMerger;

DatasetBuilder5(String name,
Dataset<IN> input,
UnaryFunction<IN, KEY> keyExtractor,
UnaryFunction<IN, VALUE> valueExtractor,
StateFactory<VALUE, OUT, STATE> stateFactory,
CombinableReduceFunction<STATE> stateCombiner) {
StateMerger<VALUE, OUT, STATE> stateMerger) {
// initialize default partitioning according to input
super(new DefaultPartitioning<>(input.getNumPartitions()));

Expand All @@ -165,25 +175,28 @@ public static class DatasetBuilder5<
this.keyExtractor = Objects.requireNonNull(keyExtractor);
this.valueExtractor = Objects.requireNonNull(valueExtractor);
this.stateFactory = Objects.requireNonNull(stateFactory);
this.stateCombiner = Objects.requireNonNull(stateCombiner);
this.stateMerger = Objects.requireNonNull(stateMerger);
}

public <WIN, W extends Window>
DatasetBuilder6<IN, WIN, KEY, VALUE, OUT, STATE, W>
windowBy(Windowing<WIN, W> windowing)
{
return windowBy(windowing, null);
}

public <WIN, W extends Window>
DatasetBuilder6<IN, WIN, KEY, VALUE, OUT, STATE, W>
windowBy(Windowing<WIN, W> windowing, ExtractEventTime<WIN> eventTimeAssigner) {
return new DatasetBuilder6<>(name, input, keyExtractor, valueExtractor,
stateFactory, stateCombiner,
stateFactory, stateMerger,
Objects.requireNonNull(windowing), eventTimeAssigner, this);
}

@Override
public Dataset<Pair<KEY, OUT>> output() {
return new DatasetBuilder6<>(name, input, keyExtractor, valueExtractor,
stateFactory, stateCombiner, null, null, this)
stateFactory, stateMerger, null, null, this)
.output();
}
}
Expand All @@ -200,7 +213,7 @@ public static class DatasetBuilder6<
private final UnaryFunction<IN, KEY> keyExtractor;
private final UnaryFunction<IN, VALUE> valueExtractor;
private final StateFactory<VALUE, OUT, STATE> stateFactory;
private final CombinableReduceFunction<STATE> stateCombiner;
private final StateMerger<VALUE, OUT, STATE> stateMerger;
@Nullable
private final Windowing<WIN, W> windowing;
@Nullable
Expand All @@ -211,7 +224,7 @@ public static class DatasetBuilder6<
UnaryFunction<IN, KEY> keyExtractor,
UnaryFunction<IN, VALUE> valueExtractor,
StateFactory<VALUE, OUT, STATE> stateFactory,
CombinableReduceFunction<STATE> stateCombiner,
StateMerger<VALUE, OUT, STATE> stateMerger,
@Nullable Windowing<WIN, W> windowing,
@Nullable ExtractEventTime<WIN> eventTimeAssigner,
PartitioningBuilder<KEY, ?> partitioning)
Expand All @@ -224,7 +237,7 @@ public static class DatasetBuilder6<
this.keyExtractor = Objects.requireNonNull(keyExtractor);
this.valueExtractor = Objects.requireNonNull(valueExtractor);
this.stateFactory = Objects.requireNonNull(stateFactory);
this.stateCombiner = Objects.requireNonNull(stateCombiner);
this.stateMerger = Objects.requireNonNull(stateMerger);
this.windowing = windowing;
this.eventTimeAssigner = eventTimeAssigner;
}
Expand All @@ -236,7 +249,7 @@ public Dataset<Pair<KEY, OUT>> output() {
ReduceStateByKey<IN, IN, WIN, KEY, VALUE, KEY, OUT, STATE, W>
reduceStateByKey =
new ReduceStateByKey<>(name, flow, input, keyExtractor, valueExtractor,
windowing, eventTimeAssigner, stateFactory, stateCombiner, getPartitioning());
windowing, eventTimeAssigner, stateFactory, stateMerger, getPartitioning());
flow.add(reduceStateByKey);

return reduceStateByKey.output();
Expand All @@ -255,7 +268,7 @@ public static OfBuilder named(String name) {

private final StateFactory<VALUE, OUT, STATE> stateFactory;
private final UnaryFunction<KIN, VALUE> valueExtractor;
private final CombinableReduceFunction<STATE> stateCombiner;
private final StateMerger<VALUE, OUT, STATE> stateCombiner;

ReduceStateByKey(String name,
Flow flow,
Expand All @@ -265,13 +278,13 @@ public static OfBuilder named(String name) {
@Nullable Windowing<WIN, W> windowing,
@Nullable ExtractEventTime<WIN> eventTimeAssigner,
StateFactory<VALUE, OUT, STATE> stateFactory,
CombinableReduceFunction<STATE> stateCombiner,
StateMerger<VALUE, OUT, STATE> stateMerger,
Partitioning<KEY> partitioning)
{
super(name, flow, input, keyExtractor, windowing, eventTimeAssigner, partitioning);
this.stateFactory = stateFactory;
this.valueExtractor = valueExtractor;
this.stateCombiner = stateCombiner;
this.stateCombiner = stateMerger;
}

public StateFactory<VALUE, OUT, STATE> getStateFactory() {
Expand All @@ -282,7 +295,7 @@ public UnaryFunction<KIN, VALUE> getValueExtractor() {
return valueExtractor;
}

public CombinableReduceFunction<STATE> getStateCombiner() {
public StateMerger<VALUE, OUT, STATE> getStateMerger() {
return stateCombiner;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
*/
package cz.seznam.euphoria.core.client.operator;

import cz.seznam.euphoria.core.client.functional.CombinableReduceFunction;
import cz.seznam.euphoria.core.client.operator.state.State;
import cz.seznam.euphoria.core.client.operator.state.StateMerger;

import java.util.Iterator;
import java.io.Serializable;

/** Private helper class to provide utilities around state handling. */
class StateSupport {
Expand All @@ -29,17 +30,13 @@ interface MergeFrom<S> {
void mergeFrom(S other);
}

static class MergeFromStateCombiner<T extends MergeFrom<T>>
implements CombinableReduceFunction<T> {
static class MergeFromStateMerger<I, O, S extends State<I, O> & MergeFrom<S>>
implements StateMerger<I, O, S> {
@Override
public T apply(Iterable<T> xs) {
final T first;
Iterator<T> x = xs.iterator();
first = x.next();
while (x.hasNext()) {
first.mergeFrom(x.next());
public void merge(S target, Iterable<S> others) {
for (S other : others) {
target.mergeFrom(other);
}
return first;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,8 @@ public UnaryFunction<IN, SCORE> getScoreExtractor() {
public DAG<Operator<?, ?>> getBasicOps() {
Flow flow = getFlow();

StateSupport.MergeFromStateCombiner<MaxScored<VALUE, SCORE>> stateCombiner
= new StateSupport.MergeFromStateCombiner<>();
StateSupport.MergeFromStateMerger<Pair<VALUE, SCORE>, Pair<VALUE, SCORE>, MaxScored<VALUE, SCORE>>
stateCombiner = new StateSupport.MergeFromStateMerger<>();
ReduceStateByKey<IN, IN, IN, KEY, Pair<VALUE, SCORE>, KEY, Pair<VALUE, SCORE>,
MaxScored<VALUE, SCORE>, W>
reduce =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Copyright 2016 Seznam.cz, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.seznam.euphoria.core.client.operator.state;

import java.io.Serializable;

/**
* A function to merge specific types of states into a given target state.
* The need for merging states into one arise typically from the utilization
* of {@link cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing},
* e.g. session windows, where individual session windows need occasionally
* be merged and, thus, their states.
*
* @param <IN> the type of input elements for the states
* @param <OUT> the type of output elements of the states
* @param <STATE> the type of states being merged
*/
@FunctionalInterface
public interface StateMerger<IN, OUT, STATE extends State<IN, OUT>>
extends Serializable {

/**
* Merges <tt>others</tt> into the given <tt>target</tt>, which itself
* is guaranteed by the caller not to be part of <tt>others</tt>.
*
* @param target the target state to receive values from <tt>others</tt>
* @param others the states to be merged into <tt>target</tt>
*/
void merge(STATE target, Iterable<STATE> others);

}
Loading

0 comments on commit 59ca612

Please sign in to comment.