Skip to content

Commit

Permalink
Merge pull request apache#5447 from seznam/dsl-euphoria
Browse files Browse the repository at this point in the history
[BEAM-3900] Join operator translation, Beam windowing, new tests from operator testsuite enabled .
  • Loading branch information
jbonofre authored Jun 4, 2018
2 parents eff3ffd + 939a104 commit e282336
Show file tree
Hide file tree
Showing 32 changed files with 1,545 additions and 234 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ job('beam_PreCommit_Java_GradleBuild') {
rootBuildScriptDir(common_job_properties.checkoutDir)
tasks(':javaPreCommit')
common_job_properties.setGradleSwitches(delegate)
// Specify maven home on Jenkins, needed by Maven archetype integration tests.
switches('-Pmaven_home=/home/jenkins/tools/maven/apache-maven-3.5.2')
}
}
}
2 changes: 0 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,6 @@ task javaPreCommit() {
dependsOn ":rat"
dependsOn ":beam-sdks-java-core:buildNeeded"
dependsOn ":beam-sdks-java-core:buildDependents"
dependsOn ":beam-sdks-java-maven-archetypes-examples:generateAndBuildArchetypeTest"
dependsOn ":beam-sdks-java-maven-archetypes-starter:generateAndBuildArchetypeTest"
dependsOn ":beam-examples-java:preCommit"
}

Expand Down
2 changes: 0 additions & 2 deletions sdks/java/extensions/euphoria/euphoria-beam/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,10 @@ dependencies {
compile "com.esotericsoftware.kryo:kryo:${kryoVersion}"
shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
shadow 'com.google.code.findbugs:annotations:3.0.1'
testCompile project(':beam-sdks-java-extensions-euphoria-operator-testkit')
testCompile project(':beam-sdks-java-extensions-euphoria-testing')
testCompile project(':beam-runners-direct-java')
testCompile library.java.slf4j_api
testCompile library.java.hamcrest_core

}

test.testLogging.showStandardStreams = true
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
*/
package org.apache.beam.sdk.extensions.euphoria.beam;

import java.util.IdentityHashMap;
import java.util.Map;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.Collection;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.euphoria.beam.io.BeamWriteSink;
import org.apache.beam.sdk.extensions.euphoria.core.client.accumulators.AccumulatorProvider;
import org.apache.beam.sdk.extensions.euphoria.core.client.flow.Flow;
import org.apache.beam.sdk.extensions.euphoria.core.client.io.DataSink;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.FlatMap;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Operator;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceByKey;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.ReduceStateByKey;
Expand All @@ -43,8 +46,10 @@
*/
class FlowTranslator {

private static final Map<Class, OperatorTranslator> translators = new IdentityHashMap<>();
private static final Multimap<Class, OperatorTranslator> translators = ArrayListMultimap.create();

//Note that when there are more than one translator ordering defines priority.
//First added to `translators` is first asked whenever it can translate the operator.
static {
translators.put(FlowUnfolder.InputOperator.class, new InputTranslator());
translators.put(FlatMap.class, new FlatMapTranslator());
Expand All @@ -54,6 +59,40 @@ class FlowTranslator {
// extended operators
translators.put(ReduceByKey.class, new ReduceByKeyTranslator());
translators.put(ReduceStateByKey.class, new ReduceStateByKeyTranslator());
translators.put(Join.class, new JoinTranslator());
}

@SuppressWarnings("unchecked")
private static boolean isOperatorDirectlyTranslatable(Operator operator){
Collection<OperatorTranslator> availableTranslators = translators.get(operator.getClass());
if (availableTranslators.isEmpty()){
return false;
}

for (OperatorTranslator translator : availableTranslators){
if (translator.canTranslate(operator)){
return true;
}
}

return false;
}

@Nullable
@SuppressWarnings("unchecked")
private static OperatorTranslator getTranslatorIfAvailable(Operator operator){
Collection<OperatorTranslator> availableTranslators = translators.get(operator.getClass());
if (availableTranslators.isEmpty()){
return null;
}

for (OperatorTranslator translator : availableTranslators){
if (translator.canTranslate(operator)){
return translator;
}
}

return null;
}

static Pipeline toPipeline(
Expand All @@ -75,12 +114,12 @@ static Pipeline toPipeline(

static DAG<Operator<?, ?>> toDAG(Flow flow) {
final DAG<Operator<?, ?>> dag =
FlowUnfolder.unfold(flow, operator -> translators.containsKey(operator.getClass()));
FlowUnfolder.unfold(flow, FlowTranslator::isOperatorDirectlyTranslatable);
return dag;
}

static DAG<Operator<?, ?>> unfold(DAG<Operator<?, ?>> dag) {
return FlowUnfolder.translate(dag, operator -> translators.containsKey(operator.getClass()));
return FlowUnfolder.translate(dag, FlowTranslator::isOperatorDirectlyTranslatable);
}

@SuppressWarnings("unchecked")
Expand All @@ -91,7 +130,7 @@ static void updateContextBy(DAG<Operator<?, ?>> dag, BeamExecutorContext context
.map(Node::get)
.forEach(
op -> {
final OperatorTranslator translator = translators.get(op.getClass());
final OperatorTranslator translator = getTranslatorIfAvailable(op);
if (translator == null) {
throw new UnsupportedOperationException(
"Operator " + op.getClass().getSimpleName() + " not supported");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.euphoria.beam;

import java.util.List;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.euphoria.beam.common.InputToKvDoFn;
import org.apache.beam.sdk.extensions.euphoria.beam.io.KryoCoder;
import org.apache.beam.sdk.extensions.euphoria.beam.join.FullJoinFn;
import org.apache.beam.sdk.extensions.euphoria.beam.join.InnerJoinFn;
import org.apache.beam.sdk.extensions.euphoria.beam.join.JoinFn;
import org.apache.beam.sdk.extensions.euphoria.beam.join.LeftOuterJoinFn;
import org.apache.beam.sdk.extensions.euphoria.beam.join.RightOuterJoinFn;
import org.apache.beam.sdk.extensions.euphoria.beam.window.WindowingUtils;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Window;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.BinaryFunctor;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;


/**
* {@link OperatorTranslator Translator } for Euphoria {@link Join} operator.
*/
public class JoinTranslator implements OperatorTranslator<Join> {

@Override
@SuppressWarnings("unchecked")
public PCollection<?> translate(Join operator, BeamExecutorContext context) {
return doTranslate(operator, context);
}


public <K, LeftT, RightT, OutputT, W extends Window<W>> PCollection<Pair<K, OutputT>>
doTranslate(Join<LeftT, RightT, K, OutputT, W> operator, BeamExecutorContext context) {

Coder<K> keyCoder = context.getCoder(operator.getLeftKeyExtractor());

// get input data-sets transformed to Pcollections<KV<K,LeftT/RightT>>
List<PCollection<Object>> inputs = context.getInputs(operator);

PCollection<KV<K, LeftT>> leftKvInput = getKVInputCollection(inputs.get(0),
operator.getLeftKeyExtractor(),
keyCoder, new KryoCoder<>(), "::extract-keys-left");

PCollection<KV<K, RightT>> rightKvInput = getKVInputCollection(inputs.get(1),
operator.getRightKeyExtractor(),
keyCoder, new KryoCoder<>(), "::extract-keys-right");

// and apply the same widowing on input Pcolections since the documentation states:
//'all of the PCollections you want to group must use the same
// windowing strategy and window sizing'
leftKvInput = WindowingUtils.applyWindowingIfSpecified(
operator, leftKvInput, context.getAllowedLateness(operator));
rightKvInput = WindowingUtils.applyWindowingIfSpecified(
operator, rightKvInput, context.getAllowedLateness(operator));

// GoGroupByKey collections
TupleTag<LeftT> leftTag = new TupleTag<>();
TupleTag<RightT> rightTag = new TupleTag<>();

PCollection<KV<K, CoGbkResult>> coGrouped = KeyedPCollectionTuple
.of(leftTag, leftKvInput)
.and(rightTag, rightKvInput)
.apply("::co-group-by-key", CoGroupByKey.create());

// Join
JoinFn<LeftT, RightT, K, OutputT> joinFn = chooseJoinFn(operator, leftTag, rightTag);

return coGrouped.apply(joinFn.getFnName(), ParDo.of(joinFn));
}

private <K, ValueT> PCollection<KV<K, ValueT>> getKVInputCollection(
PCollection<Object> inputPCollection,
UnaryFunction<ValueT, K> keyExtractor,
Coder<K> keyCoder, Coder<ValueT> valueCoder, String transformName) {

@SuppressWarnings("unchecked")
PCollection<ValueT> typedInput = (PCollection<ValueT>) inputPCollection;
typedInput.setCoder(valueCoder);

PCollection<KV<K, ValueT>> kvInput =
typedInput.apply(transformName, ParDo.of(new InputToKvDoFn<>(keyExtractor)));
kvInput.setCoder(KvCoder.of(keyCoder, valueCoder));

return kvInput;
}

private <K, LeftT, RightT, OutputT, W extends Window<W>> JoinFn<LeftT, RightT, K, OutputT>
chooseJoinFn(
Join<LeftT, RightT, K, OutputT, W> operator, TupleTag<LeftT> leftTag,
TupleTag<RightT> rightTag) {

JoinFn<LeftT, RightT, K, OutputT> joinFn;
BinaryFunctor<LeftT, RightT, OutputT> joiner = operator.getJoiner();

switch (operator.getType()) {
case INNER:
joinFn = new InnerJoinFn<>(joiner, leftTag, rightTag);
break;
case LEFT:
joinFn = new LeftOuterJoinFn<>(joiner, leftTag, rightTag);
break;
case RIGHT:
joinFn = new RightOuterJoinFn<>(joiner, leftTag, rightTag);
break;
case FULL:
joinFn = new FullJoinFn<>(joiner, leftTag, rightTag);
break;

default:
throw new UnsupportedOperationException(String.format(
"Cannot translate Euphoria '%s' operator to Beam transformations."
+ " Given join type '%s' is not supported.",
Join.class.getSimpleName(), operator.getType()));
}
return joinFn;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
*
* @param <OperatorT> the type of the user defined euphoria operator definition
*/
@FunctionalInterface
interface OperatorTranslator<OperatorT extends Operator> {

/**
Expand All @@ -36,4 +35,15 @@ interface OperatorTranslator<OperatorT extends Operator> {
* @return a beam transformation
*/
PCollection<?> translate(OperatorT operator, BeamExecutorContext context);

/**
* Returns true when implementing {@link OperatorTranslator} is able to translate given instance
* of an operator, false otherwise.
*
* <p>This method allow us to have more {@link OperatorTranslator}
* implementations for one {@link Operator} in case when some specialized translators are needed.
*/
default boolean canTranslate(OperatorT operator) {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
*/
package org.apache.beam.sdk.extensions.euphoria.beam;


import static com.google.common.base.Preconditions.checkState;

import java.util.stream.StreamSupport;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.euphoria.beam.window.BeamWindowFn;
import org.apache.beam.sdk.extensions.euphoria.beam.window.WindowingUtils;
import org.apache.beam.sdk.extensions.euphoria.core.client.accumulators.AccumulatorProvider;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Window;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.ReduceFunctor;
Expand All @@ -35,7 +38,6 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

Expand All @@ -48,6 +50,9 @@ class ReduceByKeyTranslator implements OperatorTranslator<ReduceByKey> {
private static <InputT, K, V, OutputT, W extends Window<W>> PCollection<Pair<K, OutputT>>
doTranslate(ReduceByKey<InputT, K, V, OutputT, W> operator, BeamExecutorContext context) {

//TODO Could we even do values sorting ?
checkState(operator.getValueComparator() == null, "Values sorting is not supported.");

final UnaryFunction<InputT, K> keyExtractor = operator.getKeyExtractor();
final UnaryFunction<InputT, V> valueExtractor = operator.getValueExtractor();
final ReduceFunctor<V, OutputT> reducer = operator.getReducer();
Expand All @@ -56,24 +61,8 @@ class ReduceByKeyTranslator implements OperatorTranslator<ReduceByKey> {
final Coder<K> keyCoder = context.getCoder(keyExtractor);
final Coder<V> valueCoder = context.getCoder(valueExtractor);

final PCollection<InputT> input;

// ~ apply windowing if specified
if (operator.getWindowing() == null) {
input = context.getInput(operator);
} else {
input =
context
.getInput(operator)
.apply(
operator.getName() + "::windowing",
org.apache.beam.sdk.transforms.windowing.Window.into(
BeamWindowFn.wrap(operator.getWindowing()))
// TODO: trigger
.triggering(AfterWatermark.pastEndOfWindow())
.discardingFiredPanes()
.withAllowedLateness(context.getAllowedLateness(operator)));
}
final PCollection<InputT> input = WindowingUtils.applyWindowingIfSpecified(operator,
context.getInput(operator), context.getAllowedLateness(operator));

// ~ create key & value extractor
final MapElements<InputT, KV<K, V>> extractor =
Expand Down Expand Up @@ -119,11 +108,17 @@ public Pair<K, V> apply(KV<K, V> in) {
}
}

@Override
public boolean canTranslate(ReduceByKey operator) {
// translation of sorted values is not supported yet
return operator.getValueComparator() == null;
}

private static <InputT, OutputT> SerializableFunction<Iterable<InputT>, InputT> asCombiner(
ReduceFunctor<InputT, OutputT> reducer) {

@SuppressWarnings("unchecked")
final ReduceFunctor<InputT, InputT> combiner = (ReduceFunctor<InputT, InputT>) reducer;
@SuppressWarnings("unchecked") final ReduceFunctor<InputT, InputT> combiner =
(ReduceFunctor<InputT, InputT>) reducer;
final SingleValueCollector<InputT> collector = new SingleValueCollector<>();
return (Iterable<InputT> input) -> {
combiner.apply(StreamSupport.stream(input.spliterator(), false), collector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,6 @@ public PCollection<?> translate(ReduceStateByKey operator, BeamExecutorContext c
.setCoder(new KryoCoder<>());
}
*/
throw new UnsupportedOperationException("Not supported yet");
throw new UnsupportedOperationException("ReduceStateByKy is not supported yet.");
}
}
Loading

0 comments on commit e282336

Please sign in to comment.