Skip to content

Commit

Permalink
Merge pull request #1 from mareksimunek/vasek/package-change-rebase
Browse files Browse the repository at this point in the history
[BEAM-4294] [BEAM-4360] Join translation and ReduceByKey test suite where moved to org.apache.beam.* package.
  • Loading branch information
VaclavPlajt authored May 22, 2018
2 parents eff3ffd + 9600446 commit 6711c13
Show file tree
Hide file tree
Showing 25 changed files with 1,351 additions and 168 deletions.
1 change: 0 additions & 1 deletion sdks/java/extensions/euphoria/euphoria-beam/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ dependencies {
testCompile project(':beam-runners-direct-java')
testCompile library.java.slf4j_api
testCompile library.java.hamcrest_core

}

test.testLogging.showStandardStreams = true
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
*/
package org.apache.beam.sdk.extensions.euphoria.beam;

import java.util.IdentityHashMap;
import java.util.Map;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.Collection;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.euphoria.beam.io.BeamWriteSink;
import org.apache.beam.sdk.extensions.euphoria.core.client.accumulators.AccumulatorProvider;
import org.apache.beam.sdk.extensions.euphoria.core.client.flow.Flow;
import org.apache.beam.sdk.extensions.euphoria.core.client.io.DataSink;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.FlatMap;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Operator;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceByKey;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceStateByKey;
Expand All @@ -43,8 +46,10 @@
*/
class FlowTranslator {

private static final Map<Class, OperatorTranslator> translators = new IdentityHashMap<>();
private static final Multimap<Class, OperatorTranslator> translators = ArrayListMultimap.create();

//Note that when there are more than one translator ordering defines priority.
//First added to `translators` is first asked whenever it can translate the operator.
static {
translators.put(FlowUnfolder.InputOperator.class, new InputTranslator());
translators.put(FlatMap.class, new FlatMapTranslator());
Expand All @@ -54,6 +59,40 @@ class FlowTranslator {
// extended operators
translators.put(ReduceByKey.class, new ReduceByKeyTranslator());
translators.put(ReduceStateByKey.class, new ReduceStateByKeyTranslator());
translators.put(Join.class, new JoinTranslator());
}

@SuppressWarnings("unchecked")
private static boolean isOperatorDirectlyTranslatable(Operator operator){
Collection<OperatorTranslator> availableTranslators = translators.get(operator.getClass());
if (availableTranslators.isEmpty()){
return false;
}

for (OperatorTranslator translator : availableTranslators){
if (translator.canTranslate(operator)){
return true;
}
}

return false;
}

@Nullable
@SuppressWarnings("unchecked")
private static OperatorTranslator getTranslatorIfAvailable(Operator operator){
Collection<OperatorTranslator> availableTranslators = translators.get(operator.getClass());
if (availableTranslators.isEmpty()){
return null;
}

for (OperatorTranslator translator : availableTranslators){
if (translator.canTranslate(operator)){
return translator;
}
}

return null;
}

static Pipeline toPipeline(
Expand All @@ -75,12 +114,12 @@ static Pipeline toPipeline(

static DAG<Operator<?, ?>> toDAG(Flow flow) {
final DAG<Operator<?, ?>> dag =
FlowUnfolder.unfold(flow, operator -> translators.containsKey(operator.getClass()));
FlowUnfolder.unfold(flow, FlowTranslator::isOperatorDirectlyTranslatable);
return dag;
}

static DAG<Operator<?, ?>> unfold(DAG<Operator<?, ?>> dag) {
return FlowUnfolder.translate(dag, operator -> translators.containsKey(operator.getClass()));
return FlowUnfolder.translate(dag, FlowTranslator::isOperatorDirectlyTranslatable);
}

@SuppressWarnings("unchecked")
Expand All @@ -91,7 +130,7 @@ static void updateContextBy(DAG<Operator<?, ?>> dag, BeamExecutorContext context
.map(Node::get)
.forEach(
op -> {
final OperatorTranslator translator = translators.get(op.getClass());
final OperatorTranslator translator = getTranslatorIfAvailable(op);
if (translator == null) {
throw new UnsupportedOperationException(
"Operator " + op.getClass().getSimpleName() + " not supported");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package org.apache.beam.sdk.extensions.euphoria.beam;

import java.util.List;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.euphoria.beam.common.InputToKvDoFn;
import org.apache.beam.sdk.extensions.euphoria.beam.io.KryoCoder;
import org.apache.beam.sdk.extensions.euphoria.beam.join.FullJoinFn;
import org.apache.beam.sdk.extensions.euphoria.beam.join.InnerJoinFn;
import org.apache.beam.sdk.extensions.euphoria.beam.join.JoinFn;
import org.apache.beam.sdk.extensions.euphoria.beam.join.LeftOuterJoinFn;
import org.apache.beam.sdk.extensions.euphoria.beam.join.RightOuterJoinFn;
import org.apache.beam.sdk.extensions.euphoria.beam.window.WindowingUtils;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Window;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.BinaryFunctor;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;


/**
* {@link OperatorTranslator Translator } for Euphoria {@link Join} operator.
*/
public class JoinTranslator implements OperatorTranslator<Join> {

@Override
@SuppressWarnings("unchecked")
public PCollection<?> translate(Join operator, BeamExecutorContext context) {
return doTranslate(operator, context);
}


public <K, LeftT, RightT, OutputT, W extends Window<W>> PCollection<Pair<K, OutputT>>
doTranslate(Join<LeftT, RightT, K, OutputT, W> operator, BeamExecutorContext context) {

Coder<K> keyCoder = context.getCoder(operator.getLeftKeyExtractor());

// get input data-sets transformed to Pcollections<KV<K,LeftT/RightT>>
List<PCollection<Object>> inputs = context.getInputs(operator);

PCollection<KV<K, LeftT>> leftKvInput = getKVInputCollection(inputs.get(0),
operator.getLeftKeyExtractor(),
keyCoder, new KryoCoder<>(), "::extract-keys-left");

PCollection<KV<K, RightT>> rightKvInput = getKVInputCollection(inputs.get(1),
operator.getRightKeyExtractor(),
keyCoder, new KryoCoder<>(), "::extract-keys-right");

// and apply the same widowing on input Pcolections since the documentation states:
//'all of the PCollections you want to group must use the same
// windowing strategy and window sizing'
leftKvInput = WindowingUtils.applyWindowingIfSpecified(
operator, leftKvInput, context.getAllowedLateness(operator));
rightKvInput = WindowingUtils.applyWindowingIfSpecified(
operator, rightKvInput, context.getAllowedLateness(operator));

// GoGroupByKey collections
TupleTag<LeftT> leftTag = new TupleTag<>();
TupleTag<RightT> rightTag = new TupleTag<>();

PCollection<KV<K, CoGbkResult>> coGrouped = KeyedPCollectionTuple
.of(leftTag, leftKvInput)
.and(rightTag, rightKvInput)
.apply("::co-group-by-key", CoGroupByKey.create());

// Join
JoinFn<LeftT, RightT, K, OutputT> joinFn = chooseJoinFn(operator, leftTag, rightTag);

return coGrouped.apply(joinFn.getFnName(), ParDo.of(joinFn));
}

private <K, ValueT> PCollection<KV<K, ValueT>> getKVInputCollection(
PCollection<Object> inputPCollection,
UnaryFunction<ValueT, K> keyExtractor,
Coder<K> keyCoder, Coder<ValueT> valueCoder, String transformName) {

@SuppressWarnings("unchecked")
PCollection<ValueT> typedInput = (PCollection<ValueT>) inputPCollection;
typedInput.setCoder(valueCoder);

PCollection<KV<K, ValueT>> kvInput =
typedInput.apply(transformName, ParDo.of(new InputToKvDoFn<>(keyExtractor)));
kvInput.setCoder(KvCoder.of(keyCoder, valueCoder));

return kvInput;
}

private <K, LeftT, RightT, OutputT, W extends Window<W>> JoinFn<LeftT, RightT, K, OutputT>
chooseJoinFn(
Join<LeftT, RightT, K, OutputT, W> operator, TupleTag<LeftT> leftTag,
TupleTag<RightT> rightTag) {

JoinFn<LeftT, RightT, K, OutputT> joinFn;
BinaryFunctor<LeftT, RightT, OutputT> joiner = operator.getJoiner();

switch (operator.getType()) {
case INNER:
joinFn = new InnerJoinFn<>(joiner, leftTag, rightTag);
break;
case LEFT:
joinFn = new LeftOuterJoinFn<>(joiner, leftTag, rightTag);
break;
case RIGHT:
joinFn = new RightOuterJoinFn<>(joiner, leftTag, rightTag);
break;
case FULL:
joinFn = new FullJoinFn<>(joiner, leftTag, rightTag);
break;

default:
throw new UnsupportedOperationException(String.format(
"Cannot translate Euphoria '%s' operator to Beam transformations."
+ " Given join type '%s' is not supported.",
Join.class.getSimpleName(), operator.getType()));
}
return joinFn;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
*
* @param <OperatorT> the type of the user defined euphoria operator definition
*/
@FunctionalInterface
interface OperatorTranslator<OperatorT extends Operator> {

/**
Expand All @@ -36,4 +35,15 @@ interface OperatorTranslator<OperatorT extends Operator> {
* @return a beam transformation
*/
PCollection<?> translate(OperatorT operator, BeamExecutorContext context);

/**
* Returns true when implementing {@link OperatorTranslator} is able to translate given instance
* of an operator, false otherwise.
*
* <p>This method allow us to have more {@link OperatorTranslator}
* implementations for one {@link Operator} in case when some specialized translators are needed.
*/
default boolean canTranslate(OperatorT operator) {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
*/
package org.apache.beam.sdk.extensions.euphoria.beam;


import static com.google.common.base.Preconditions.checkState;

import java.util.stream.StreamSupport;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.euphoria.beam.window.BeamWindowFn;
import org.apache.beam.sdk.extensions.euphoria.beam.window.WindowingUtils;
import org.apache.beam.sdk.extensions.euphoria.core.client.accumulators.AccumulatorProvider;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Window;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.ReduceFunctor;
Expand All @@ -35,7 +38,6 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

Expand All @@ -48,6 +50,9 @@ class ReduceByKeyTranslator implements OperatorTranslator<ReduceByKey> {
private static <InputT, K, V, OutputT, W extends Window<W>> PCollection<Pair<K, OutputT>>
doTranslate(ReduceByKey<InputT, K, V, OutputT, W> operator, BeamExecutorContext context) {

//TODO Could we even do values sorting ?
checkState(operator.getValueComparator() == null, "Values sorting is not supported.");

final UnaryFunction<InputT, K> keyExtractor = operator.getKeyExtractor();
final UnaryFunction<InputT, V> valueExtractor = operator.getValueExtractor();
final ReduceFunctor<V, OutputT> reducer = operator.getReducer();
Expand All @@ -56,24 +61,8 @@ class ReduceByKeyTranslator implements OperatorTranslator<ReduceByKey> {
final Coder<K> keyCoder = context.getCoder(keyExtractor);
final Coder<V> valueCoder = context.getCoder(valueExtractor);

final PCollection<InputT> input;

// ~ apply windowing if specified
if (operator.getWindowing() == null) {
input = context.getInput(operator);
} else {
input =
context
.getInput(operator)
.apply(
operator.getName() + "::windowing",
org.apache.beam.sdk.transforms.windowing.Window.into(
BeamWindowFn.wrap(operator.getWindowing()))
// TODO: trigger
.triggering(AfterWatermark.pastEndOfWindow())
.discardingFiredPanes()
.withAllowedLateness(context.getAllowedLateness(operator)));
}
final PCollection<InputT> input = WindowingUtils.applyWindowingIfSpecified(operator,
context.getInput(operator), context.getAllowedLateness(operator));

// ~ create key & value extractor
final MapElements<InputT, KV<K, V>> extractor =
Expand Down Expand Up @@ -119,11 +108,17 @@ public Pair<K, V> apply(KV<K, V> in) {
}
}

@Override
public boolean canTranslate(ReduceByKey operator) {
// translation of sorted values is not supported yet
return operator.getValueComparator() == null;
}

private static <InputT, OutputT> SerializableFunction<Iterable<InputT>, InputT> asCombiner(
ReduceFunctor<InputT, OutputT> reducer) {

@SuppressWarnings("unchecked")
final ReduceFunctor<InputT, InputT> combiner = (ReduceFunctor<InputT, InputT>) reducer;
@SuppressWarnings("unchecked") final ReduceFunctor<InputT, InputT> combiner =
(ReduceFunctor<InputT, InputT>) reducer;
final SingleValueCollector<InputT> collector = new SingleValueCollector<>();
return (Iterable<InputT> input) -> {
combiner.apply(StreamSupport.stream(input.spliterator(), false), collector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,6 @@ public PCollection<?> translate(ReduceStateByKey operator, BeamExecutorContext c
.setCoder(new KryoCoder<>());
}
*/
throw new UnsupportedOperationException("Not supported yet");
throw new UnsupportedOperationException("ReduceStateByKy is not supported yet.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.apache.beam.sdk.extensions.euphoria.beam.common;

import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;

/**
* {@link DoFn} which takes input elements and transforms them to {@link KV} using given key
* extractor.
*/
public class InputToKvDoFn<InputT, K> extends DoFn<InputT, KV<K, InputT>> {

private final UnaryFunction<InputT, K> keyExtractor;

public InputToKvDoFn(UnaryFunction<InputT, K> keyExtractor) {
this.keyExtractor = keyExtractor;
}

@ProcessElement
public void processElement(ProcessContext c) {
InputT element = c.element();
K key = keyExtractor.apply(element);
c.output(KV.of(key, element));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* A set of commonly used classes enabling some code reuse.
*/
package org.apache.beam.sdk.extensions.euphoria.beam.common;
Loading

0 comments on commit 6711c13

Please sign in to comment.