Skip to content

Commit

Permalink
Merge pull request apache#3 from seznam/simunek/broadcastHashJoinTran…
Browse files Browse the repository at this point in the history
…slation/BEAM-4410

[BEAM-4410] added BroadcastJoinTranslator
  • Loading branch information
VaclavPlajt authored Jun 4, 2018
2 parents e282336 + 0e6b33a commit e357f1a
Show file tree
Hide file tree
Showing 5 changed files with 403 additions and 134 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.euphoria.beam;

import static org.apache.beam.sdk.extensions.euphoria.beam.common.OperatorTranslatorUtil.getKVInputCollection;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.euphoria.beam.io.KryoCoder;
import org.apache.beam.sdk.extensions.euphoria.beam.window.BeamWindowing;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.Dataset;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Window;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Windowing;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.BinaryFunctor;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Operator;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.hint.SizeHint;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;

/**
* Translator for {@link org.apache.beam.sdk.extensions.euphoria.core.client.operator.RightJoin} and
* {@link org.apache.beam.sdk.extensions.euphoria.core.client.operator.LeftJoin} when one side of
* the join fits in memory so it can be distributed in hashmap with the other side.
*/
public class BrodcastHashJoinTranslator implements OperatorTranslator<Join> {

public static boolean hasFitsInMemoryHint(Operator operator) {
return operator != null
&& operator.getHints() != null
&& operator.getHints().contains(SizeHint.FITS_IN_MEMORY);
}

@Override
@SuppressWarnings("unchecked")
public PCollection<?> translate(Join operator, BeamExecutorContext context) {
return doTranslate(operator, context);
}

<K, LeftT, RightT, OutputT, W extends Window<W>> PCollection<Pair<K, OutputT>> doTranslate(
Join<LeftT, RightT, K, OutputT, W> operator, BeamExecutorContext context) {
Coder<K> keyCoder = context.getCoder(operator.getLeftKeyExtractor());

@SuppressWarnings("unchecked") final PCollection<LeftT> left = (PCollection<LeftT>) context
.getInputs(operator).get(0);
@SuppressWarnings("unchecked") final PCollection<RightT> right = (PCollection<RightT>) context
.getInputs(operator).get(1);

final PCollection<KV<K, LeftT>> leftKvInput =
getKVInputCollection(
left,
operator.getLeftKeyExtractor(),
keyCoder,
new KryoCoder<>(),
":extract-keys-left");

final PCollection<KV<K, RightT>> rightKvInput =
getKVInputCollection(
right,
operator.getRightKeyExtractor(),
keyCoder,
new KryoCoder<>(),
":extract-keys-right");

switch (operator.getType()) {
case LEFT:
final PCollectionView<Map<K, Iterable<RightT>>> broadcastRight =
rightKvInput.apply(View.asMultimap());
return leftKvInput.apply(
ParDo.of(new BroadcastHashLeftJoinFn<>(broadcastRight, operator.getJoiner()))
.withSideInputs(broadcastRight));

case RIGHT:
final PCollectionView<Map<K, Iterable<LeftT>>> broadcastLeft =
leftKvInput.apply(View.asMultimap());
return rightKvInput.apply(
ParDo.of(new BroadcastHashRightJoinFn<>(broadcastLeft, operator.getJoiner()))
.withSideInputs(broadcastLeft));

default:
throw new UnsupportedOperationException(
String.format(
"Cannot translate Euphoria '%s' operator to Beam transformations."
+ " Given join type '%s' is not supported for BrodcastHashJoin.",
Join.class.getSimpleName(), operator.getType()));
}
}

@Override
public boolean canTranslate(Join operator) {
@SuppressWarnings("unchecked") final ArrayList<Dataset> inputs = new ArrayList(operator.listInputs());
if (inputs.size() != 2) {
return false;
}
final Dataset leftDataset = inputs.get(0);
final Dataset rightDataset = inputs.get(1);
return (operator.getType() == Join.Type.LEFT && hasFitsInMemoryHint(rightDataset.getProducer())
|| operator.getType() == Join.Type.RIGHT
&& hasFitsInMemoryHint(leftDataset.getProducer()))
&& isAllowedWindowing(operator.getWindowing());
}

/**
* BroadcastHashJoin supports only GlobalWindow or none.
*/
private boolean isAllowedWindowing(Windowing windowing) {
return windowing == null
|| (windowing instanceof BeamWindowing
&& ((BeamWindowing) windowing).getWindowFn() instanceof GlobalWindows);
}

static class BroadcastHashRightJoinFn<K, LeftT, RightT, OutputT>
extends DoFn<KV<K, RightT>, Pair<K, OutputT>> {

private final PCollectionView<Map<K, Iterable<LeftT>>> smallSideCollection;
private final BinaryFunctor<LeftT, RightT, OutputT> joiner;
private final SingleValueCollector<OutputT> outCollector = new SingleValueCollector<>();

BroadcastHashRightJoinFn(
PCollectionView<Map<K, Iterable<LeftT>>> smallSideCollection,
BinaryFunctor<LeftT, RightT, OutputT> joiner) {
this.smallSideCollection = smallSideCollection;
this.joiner = joiner;
}

@SuppressWarnings("unused")
@ProcessElement
public void processElement(ProcessContext context) {
final K key = context.element().getKey();
final Map<K, Iterable<LeftT>> map = context.sideInput(smallSideCollection);
final Iterable<LeftT> leftValues = map.getOrDefault(key, Collections.singletonList(null));
leftValues.forEach(
leftValue -> {
joiner.apply(leftValue, context.element().getValue(), outCollector);
context.output(Pair.of(key, outCollector.get()));
});
}
}

static class BroadcastHashLeftJoinFn<K, LeftT, RightT, OutputT>
extends DoFn<KV<K, LeftT>, Pair<K, OutputT>> {

private final PCollectionView<Map<K, Iterable<RightT>>> smallSideCollection;
private final BinaryFunctor<LeftT, RightT, OutputT> joiner;
private final SingleValueCollector<OutputT> outCollector = new SingleValueCollector<>();

BroadcastHashLeftJoinFn(
PCollectionView<Map<K, Iterable<RightT>>> smallSideCollection,
BinaryFunctor<LeftT, RightT, OutputT> joiner) {
this.smallSideCollection = smallSideCollection;
this.joiner = joiner;
}

@SuppressWarnings("unused")
@ProcessElement
public void processElement(ProcessContext context) {
final K key = context.element().getKey();
final Map<K, Iterable<RightT>> map = context.sideInput(smallSideCollection);
final Iterable<RightT> rightValues = map.getOrDefault(key, Collections.singletonList(null));

rightValues.forEach(
rightValue -> {
joiner.apply(context.element().getValue(), rightValue, outCollector);
context.output(Pair.of(key, outCollector.get()));
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.extensions.euphoria.beam;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.Collection;
Expand Down Expand Up @@ -59,6 +60,7 @@ class FlowTranslator {
// extended operators
translators.put(ReduceByKey.class, new ReduceByKeyTranslator());
translators.put(ReduceStateByKey.class, new ReduceStateByKeyTranslator());
translators.put(Join.class, new BrodcastHashJoinTranslator());
translators.put(Join.class, new JoinTranslator());
}

Expand All @@ -78,9 +80,10 @@ private static boolean isOperatorDirectlyTranslatable(Operator operator){
return false;
}

@VisibleForTesting
@Nullable
@SuppressWarnings("unchecked")
private static OperatorTranslator getTranslatorIfAvailable(Operator operator){
static OperatorTranslator getTranslatorIfAvailable(Operator operator) {
Collection<OperatorTranslator> availableTranslators = translators.get(operator.getClass());
if (availableTranslators.isEmpty()){
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
*/
package org.apache.beam.sdk.extensions.euphoria.beam;

import java.util.List;
import static org.apache.beam.sdk.extensions.euphoria.beam.common.OperatorTranslatorUtil.getKVInputCollection;

import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.euphoria.beam.common.InputToKvDoFn;
import org.apache.beam.sdk.extensions.euphoria.beam.io.KryoCoder;
import org.apache.beam.sdk.extensions.euphoria.beam.join.FullJoinFn;
import org.apache.beam.sdk.extensions.euphoria.beam.join.InnerJoinFn;
Expand All @@ -30,7 +29,6 @@
import org.apache.beam.sdk.extensions.euphoria.beam.window.WindowingUtils;
import org.apache.beam.sdk.extensions.euphoria.core.client.dataset.windowing.Window;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.BinaryFunctor;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction;
import org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
import org.apache.beam.sdk.transforms.ParDo;
Expand Down Expand Up @@ -60,13 +58,16 @@ public PCollection<?> translate(Join operator, BeamExecutorContext context) {
Coder<K> keyCoder = context.getCoder(operator.getLeftKeyExtractor());

// get input data-sets transformed to Pcollections<KV<K,LeftT/RightT>>
List<PCollection<Object>> inputs = context.getInputs(operator);
@SuppressWarnings("unchecked") final PCollection<LeftT> left = (PCollection<LeftT>) context
.getInputs(operator).get(0);
@SuppressWarnings("unchecked") final PCollection<RightT> right = (PCollection<RightT>) context
.getInputs(operator).get(1);

PCollection<KV<K, LeftT>> leftKvInput = getKVInputCollection(inputs.get(0),
PCollection<KV<K, LeftT>> leftKvInput = getKVInputCollection(left,
operator.getLeftKeyExtractor(),
keyCoder, new KryoCoder<>(), "::extract-keys-left");

PCollection<KV<K, RightT>> rightKvInput = getKVInputCollection(inputs.get(1),
PCollection<KV<K, RightT>> rightKvInput = getKVInputCollection(right,
operator.getRightKeyExtractor(),
keyCoder, new KryoCoder<>(), "::extract-keys-right");

Expand All @@ -93,22 +94,6 @@ public PCollection<?> translate(Join operator, BeamExecutorContext context) {
return coGrouped.apply(joinFn.getFnName(), ParDo.of(joinFn));
}

private <K, ValueT> PCollection<KV<K, ValueT>> getKVInputCollection(
PCollection<Object> inputPCollection,
UnaryFunction<ValueT, K> keyExtractor,
Coder<K> keyCoder, Coder<ValueT> valueCoder, String transformName) {

@SuppressWarnings("unchecked")
PCollection<ValueT> typedInput = (PCollection<ValueT>) inputPCollection;
typedInput.setCoder(valueCoder);

PCollection<KV<K, ValueT>> kvInput =
typedInput.apply(transformName, ParDo.of(new InputToKvDoFn<>(keyExtractor)));
kvInput.setCoder(KvCoder.of(keyCoder, valueCoder));

return kvInput;
}

private <K, LeftT, RightT, OutputT, W extends Window<W>> JoinFn<LeftT, RightT, K, OutputT>
chooseJoinFn(
Join<LeftT, RightT, K, OutputT, W> operator, TupleTag<LeftT> leftTag,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.euphoria.beam.common;

import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.UnaryFunction;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

/**
* Shared utility methods among operator translators.
*/
public class OperatorTranslatorUtil {

/**
* Transform input to KV elements.
*/
public static <K, ValueT> PCollection<KV<K, ValueT>> getKVInputCollection(
PCollection<ValueT> inputPCollection,
UnaryFunction<ValueT, K> keyExtractor,
Coder<K> keyCoder, Coder<ValueT> valueCoder, String transformName) {

@SuppressWarnings("unchecked")
PCollection<ValueT> typedInput = inputPCollection;
typedInput.setCoder(valueCoder);

PCollection<KV<K, ValueT>> kvInput =
typedInput.apply(transformName, ParDo.of(new InputToKvDoFn<>(keyExtractor)));
kvInput.setCoder(KvCoder.of(keyCoder, valueCoder));

return kvInput;
}

}
Loading

0 comments on commit e357f1a

Please sign in to comment.