Skip to content

Commit

Permalink
[BEAM-4394] Pull request review fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
VaclavPlajt committed May 21, 2018
1 parent 22810d1 commit 013b078
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.BinaryFunctor;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;

/**
Expand All @@ -18,39 +17,34 @@ public FullJoinFn(BinaryFunctor<LeftT, RightT, OutputT> joiner, TupleTag<LeftT>
}

@Override
public void processElement(ProcessContext c) {
protected void doJoin(
ProcessContext c, K key, CoGbkResult value,
Iterable<LeftT> leftSideIter,
Iterable<RightT> rightSideIter) {

KV<K, CoGbkResult> element = c.element();
CoGbkResult value = element.getValue();
K key = element.getKey();

Iterable<LeftT> leftSideIter = value.getAll(leftTag);
Iterable<RightT> rightSIdeIter = value.getAll(rightTag);
boolean leftHasValues = leftSideIter.iterator().hasNext();
boolean rightHasValues = rightSideIter.iterator().hasNext();

SingleValueCollector<OutputT> outCollector = new SingleValueCollector<>();

boolean leftHasValues = leftSideIter.iterator().hasNext();
boolean rightHasValues = rightSIdeIter.iterator().hasNext();

if (leftHasValues && rightHasValues) {
for (RightT rightValue : rightSIdeIter) {
for (RightT rightValue : rightSideIter) {
for (LeftT leftValue : leftSideIter) {
joiner.apply(leftValue, rightValue, outCollector);
c.output(Pair.of(key, outCollector.get()));
}
}
} else if (leftHasValues && !rightHasValues) {
} else if (leftHasValues) {
for (LeftT leftValue : leftSideIter) {
joiner.apply(leftValue, null, outCollector);
c.output(Pair.of(key, outCollector.get()));
}
} else if (!leftHasValues && rightHasValues) {
for (RightT rightValue : rightSIdeIter) {
} else if (rightHasValues) {
for (RightT rightValue : rightSideIter) {
joiner.apply(null, rightValue, outCollector);
c.output(Pair.of(key, outCollector.get()));
}
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.BinaryFunctor;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;

/**
Expand All @@ -20,14 +19,10 @@ public InnerJoinFn(
}

@Override
public void processElement(ProcessContext c) {

KV<K, CoGbkResult> element = c.element();
CoGbkResult value = element.getValue();
K key = element.getKey();

Iterable<LeftT> leftSideIter = value.getAll(leftTag);
Iterable<RightT> rightSideIter = value.getAll(rightTag);
protected void doJoin(
ProcessContext c, K key, CoGbkResult value,
Iterable<LeftT> leftSideIter,
Iterable<RightT> rightSideIter) {

SingleValueCollector<OutputT> outCollector = new SingleValueCollector<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,22 @@ protected JoinFn(
}

@ProcessElement
public abstract void processElement(ProcessContext c);
public final void processElement(ProcessContext c) {

KV<K, CoGbkResult> element = c.element();
CoGbkResult value = element.getValue();
K key = element.getKey();

Iterable<LeftT> leftSideIter = value.getAll(leftTag);
Iterable<RightT> rightSideIter = value.getAll(rightTag);

doJoin(c, key, value, leftSideIter, rightSideIter);
}

protected abstract void doJoin(
ProcessContext c, K key, CoGbkResult value,
Iterable<LeftT> leftSideIter,
Iterable<RightT> rightSideIter);

public abstract String getFnName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.BinaryFunctor;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;


Expand All @@ -21,30 +20,24 @@ public LeftOuterJoinFn(
}

@Override
public void processElement(ProcessContext c) {

KV<K, CoGbkResult> element = c.element();
CoGbkResult value = element.getValue();
K key = element.getKey();

Iterable<LeftT> leftSideIter = value.getAll(leftTag);
Iterable<RightT> rightSIdeIter = value.getAll(rightTag);
protected void doJoin(
ProcessContext c, K key, CoGbkResult value,
Iterable<LeftT> leftSideIter,
Iterable<RightT> rightSideIter) {

SingleValueCollector<OutputT> outCollector = new SingleValueCollector<>();

for (LeftT leftValue : leftSideIter) {
if (rightSIdeIter.iterator().hasNext()) {
for (RightT rightValue : rightSIdeIter) {
if (rightSideIter.iterator().hasNext()) {
for (RightT rightValue : rightSideIter) {
joiner.apply(leftValue, rightValue, outCollector);
c.output(Pair.of(key, outCollector.get()));
}
} else {
joiner.apply(leftValue, null, outCollector);
c.output(Pair.of(key, outCollector.get()));
joiner.apply(leftValue, null, outCollector);
c.output(Pair.of(key, outCollector.get()));
}
}


}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import org.apache.beam.sdk.extensions.euphoria.core.client.functional.BinaryFunctor;
import org.apache.beam.sdk.extensions.euphoria.core.client.util.Pair;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;

/**
Expand All @@ -20,18 +19,12 @@ public RightOuterJoinFn(
}

@Override
public void processElement(ProcessContext c) {

KV<K, CoGbkResult> element = c.element();
CoGbkResult value = element.getValue();
K key = element.getKey();

Iterable<LeftT> leftSideIter = value.getAll(leftTag);
Iterable<RightT> rightSIdeIter = value.getAll(rightTag);
protected void doJoin(ProcessContext c, K key, CoGbkResult value, Iterable<LeftT> leftSideIter,
Iterable<RightT> rightSideIter) {

SingleValueCollector<OutputT> outCollector = new SingleValueCollector<>();

for (RightT rightValue : rightSIdeIter) {
for (RightT rightValue : rightSideIter) {
if (leftSideIter.iterator().hasNext()) {
for (LeftT leftValue : leftSideIter) {
joiner.apply(leftValue, rightValue, outCollector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/

/**
* {@link cz.seznam.euphoria.core.client.operator.Join} translation centered classes.
* {@link org.apache.beam.sdk.extensions.euphoria.core.client.operator.Join} translation centered
* classes.
*/
package org.apache.beam.sdk.extensions.euphoria.beam.join;

0 comments on commit 013b078

Please sign in to comment.