Skip to content

Commit

Permalink
#19 [euphoria-flink] Avoid extra shuffle (on the batch executor)
Browse files Browse the repository at this point in the history
  • Loading branch information
xitep authored and David Moravek committed Oct 5, 2018
1 parent aca00ec commit f14b1a1
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import cz.seznam.euphoria.shaded.guava.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.datastream.DataStream;

import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;

import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

/**
* Keeps track of mapping between Euphoria {@link Dataset} and
* Flink output {@link DataStream} or {@link org.apache.flink.api.java.DataSet}.
Expand All @@ -53,7 +55,7 @@ public E getExecutionEnvironment() {
}

/**
* Retrieve list of Flink {@link DataStream} inputs of given operator
* Retrieves list of Flink {@link DataStream} inputs of given operator
*
* @param operator the operator to inspect
*
Expand All @@ -64,17 +66,32 @@ public E getExecutionEnvironment() {
* @see #setOutput(FlinkOperator, Object)
*/
public List<D> getInputStreams(FlinkOperator<?> operator) {
List<Node<FlinkOperator<?>>> parents = dag.getNode(operator).getParents();
List<D> inputs = new ArrayList<>(parents.size());
for (Node<FlinkOperator<?>> p : parents) {
D pout = outputs.get(dag.getNode(p.get()).get());
if (pout == null) {
throw new IllegalArgumentException(
"Output DataStream/DataSet missing for operator " + p.get().getName());
}
inputs.add(pout);
}
return inputs;
return getInputOperators(operator).stream()
.map(p -> {
D pout = outputs.get(dag.getNode(p).get());
if (pout == null) {
throw new IllegalArgumentException(
"Output DataStream/DataSet missing for operator " + p.getName());
}
return pout;
})
.collect(toList());
}

/**
* Retrieves a list of the producers of the given operator's inputs. This
* corresponds to {@link #getInputStreams(FlinkOperator)}
*
* @param operator the operator whose input producers to retrieve
*
* @return a list of all the specified opertor's input producers; never {@code null}
*
* @see #getInputStreams(FlinkOperator)
*/
public List<FlinkOperator<?>> getInputOperators(FlinkOperator<?> operator) {
return dag.getNode(requireNonNull(operator)).getParents().stream()
.map(Node::get)
.collect(toList());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,7 @@ private Executor.Result execute(Flow flow) {
Settings settings = flow.getSettings();

if (mode == ExecutionEnvironment.Mode.STREAMING) {
if (stateBackend.isPresent()) {
environment.getStreamEnv().setStateBackend(stateBackend.get());
}
stateBackend.ifPresent(be -> environment.getStreamEnv().setStateBackend(be));
if (checkpointInterval != null) {
LOG.info("Enabled checkpoints every: {}", checkpointInterval);
environment.getStreamEnv().enableCheckpointing(checkpointInterval.toMillis());
Expand All @@ -138,9 +136,9 @@ private Executor.Result execute(Flow flow) {
}

try {
LOG.info("Before execute");
LOG.debug("Before execute");
environment.execute(); // blocking operation
LOG.info("After execute");
LOG.debug("After execute");
} catch (Throwable e) {
// when exception thrown rollback all sinks
for (DataSink<?> s : sinks) {
Expand All @@ -153,7 +151,7 @@ private Executor.Result execute(Flow flow) {
throw e;
}

LOG.info("Before commit");
LOG.debug("Before commit");
// when the execution is successful commit all sinks
Exception ex = null;
for (DataSink<?> s : sinks) {
Expand All @@ -164,6 +162,7 @@ private Executor.Result execute(Flow flow) {
ex = e;
}
}
LOG.debug("After commit");

// rethrow the exception if any
if (ex != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,13 @@ private Translation(

private final Map<Class, Translation> translations = new IdentityHashMap<>();
private final ExecutionEnvironment env;
private final Settings settings;


public BatchFlowTranslator(Settings settings, ExecutionEnvironment env) {
this(settings, env, DEFAULT_SPLIT_ASSIGNER_FACTORY);
}

public BatchFlowTranslator(Settings settings, ExecutionEnvironment env,
SplitAssignerFactory splitAssignerFactory) {
this.settings = settings;
this.env = Objects.requireNonNull(env);

// basic operators
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ static boolean wantTranslate(ReduceByKey operator) {
public DataSet translate(FlinkOperator<ReduceByKey> operator,
BatchExecutorContext context) {

// FIXME parallelism should be set to the same level as parent until we reach "shuffling"

DataSet input = Iterables.getOnlyElement(context.getInputStreams(operator));
int inputParallelism =
Iterables.getOnlyElement(context.getInputOperators(operator)).getParallelism();
DataSet input =
Iterables.getOnlyElement(context.getInputStreams(operator));

ReduceByKey origOperator = operator.getOriginalOperator();
final UnaryFunction<Iterable, Object> reducer = origOperator.getReducer();
Expand Down Expand Up @@ -101,18 +102,16 @@ public DataSet translate(FlinkOperator<ReduceByKey> operator,
});
tuples = wAssigned
.name(operator.getName() + "::map-input")
.setParallelism(operator.getParallelism())
.setParallelism(inputParallelism)
.returns(new TypeHint<BatchElement<Window, Pair>>() {});
}

// ~ reduce the data now
Operator<BatchElement<Window, Pair>, ?> reduced;
reduced = tuples
.groupBy(new RBKKeySelector())
.reduce(new RBKReducer(reducer));
reduced = reduced
.setParallelism(operator.getParallelism())
.name(operator.getName() + "::reduce");
Operator<BatchElement<Window, Pair>, ?> reduced =
tuples.groupBy(new RBKKeySelector())
.reduce(new RBKReducer(reducer))
.setParallelism(operator.getParallelism())
.name(operator.getName() + "::reduce");

// FIXME partitioner should be applied during "reduce" to avoid
// unnecessary shuffle, but there is no (known) way how to set custom
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ public ReduceStateByKeyTranslator(Settings settings, ExecutionEnvironment env) {
public DataSet translate(FlinkOperator<ReduceStateByKey> operator,
BatchExecutorContext context) {

// FIXME parallelism should be set to the same level as parent until we reach "shuffling"

int inputParallelism = Iterables.getOnlyElement(context.getInputOperators(operator)).getParallelism();
DataSet input = Iterables.getOnlyElement(context.getInputStreams(operator));

ReduceStateByKey origOperator = operator.getOriginalOperator();
Expand Down Expand Up @@ -91,7 +90,7 @@ public DataSet translate(FlinkOperator<ReduceStateByKey> operator,
})
.returns(BatchElement.class)
.name(operator.getName() + "::map-input")
.setParallelism(operator.getParallelism());
.setParallelism(inputParallelism);

// ~ reduce the data now
DataSet<BatchElement<?, Pair>> reduced =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public DataSet translate(

List<DataSet> inputs = (List) context.getInputStreams(operator);
if (inputs.size() != 2) {
throw new IllegalStateException("Should have two datasets on input, got "
+ inputs.size());
throw new IllegalStateException(
"Should have two datasets on input, got " + inputs.size());
}
Optional<DataSet> reduce = inputs.stream().reduce((l, r) -> l.union(r));
return reduce.get();
Expand Down

0 comments on commit f14b1a1

Please sign in to comment.