Skip to content

Commit

Permalink
Merge pull request #1 from lukecwik/pr6328
Browse files Browse the repository at this point in the history
Fix tests expectations and minor code fix up.
  • Loading branch information
ryan-williams authored Sep 9, 2018
2 parents 2b94f2e + 52e21d4 commit af58bd2
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ public class PTransformTranslation {
getUrn(StandardPTransforms.Primitives.MAP_WINDOWS);

/**
* @deprecated SDKs should move away from creating `Read` transforms and migrate to
* using Impulse + SplittableDoFns.
* @deprecated SDKs should move away from creating `Read` transforms and migrate to using Impulse
* + SplittableDoFns.
*/
@Deprecated
public static final String READ_TRANSFORM_URN =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import static com.google.common.base.Preconditions.checkArgument;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.graph.MutableNetwork;
import com.google.common.graph.Network;
import com.google.common.graph.NetworkBuilder;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -120,23 +120,22 @@ static Collection<String> getPrimitiveTransformIds(RunnerApi.Components componen
return ids;
}

private static Set<String> primitiveUrns =
new HashSet<String>(
Arrays.asList(
PTransformTranslation.PAR_DO_TRANSFORM_URN,
PTransformTranslation.FLATTEN_TRANSFORM_URN,
PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
PTransformTranslation.IMPULSE_TRANSFORM_URN,
PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN,
PTransformTranslation.TEST_STREAM_TRANSFORM_URN,
PTransformTranslation.MAP_WINDOWS_TRANSFORM_URN,
PTransformTranslation.READ_TRANSFORM_URN,
PTransformTranslation.CREATE_VIEW_TRANSFORM_URN));
private static Set<String> PRIMITIVE_URNS =
ImmutableSet.of(
PTransformTranslation.PAR_DO_TRANSFORM_URN,
PTransformTranslation.FLATTEN_TRANSFORM_URN,
PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
PTransformTranslation.IMPULSE_TRANSFORM_URN,
PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN,
PTransformTranslation.TEST_STREAM_TRANSFORM_URN,
PTransformTranslation.MAP_WINDOWS_TRANSFORM_URN,
PTransformTranslation.READ_TRANSFORM_URN,
PTransformTranslation.CREATE_VIEW_TRANSFORM_URN);

/** Returns true if the provided transform is a primitive. */
private static boolean isPrimitiveTransform(PTransform transform) {
String urn = PTransformTranslation.urnForTransformOrNull(transform);
return urn != null && primitiveUrns.contains(urn);
return PRIMITIVE_URNS.contains(urn);
}

private MutableNetwork<PipelineNode, PipelineEdge> buildNetwork(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.hamcrest.Matchers;
import org.hamcrest.core.AnyOf;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand Down Expand Up @@ -179,27 +178,30 @@ public void singleEnvironmentBecomesASingleStage() {
* becomes all runner-executed
*/
@Test
@Ignore
public void unknownTransformsNoEnvironmentBecomeRunnerExecuted() {
public void transformsWithNoEnvironmentBecomeRunnerExecuted() {
Components components =
partialComponents
.toBuilder()
.putTransforms(
"mystery",
PTransform.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN))
.setUniqueName("Mystery")
.putInputs("input", "impulse.out")
.putOutputs("output", "mystery.out")
.setSpec(FunctionSpec.newBuilder().setUrn("beam:transform:mystery:v1.4"))
.build())
.putPcollections("mystery.out", pc("mystery.out"))
.putTransforms(
"enigma",
PTransform.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN))
.setUniqueName("Enigma")
.putInputs("input", "impulse.out")
.putOutputs("output", "enigma.out")
.setSpec(FunctionSpec.newBuilder().setUrn("beam:transform:enigma:v1"))
.build())
.putPcollections("enigma.out", pc("enigma.out"))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,19 @@
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.graph.OutputDeduplicator.DeduplicationResult;
import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Tests for {@link OutputDeduplicator}. */
@RunWith(JUnit4.class)
@Ignore
public class OutputDeduplicatorTest {
@Test
public void unchangedWithNoDuplicates() {
Expand All @@ -61,22 +61,41 @@ public void unchangedWithNoDuplicates() {
* \-> two -> .out /
*/
PCollection redOut = PCollection.newBuilder().setUniqueName("red.out").build();
PTransform red = PTransform.newBuilder().putOutputs("out", redOut.getUniqueName()).build();
PTransform red =
PTransform.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.build())
.putOutputs("out", redOut.getUniqueName())
.build();
PCollection oneOut = PCollection.newBuilder().setUniqueName("one.out").build();
PTransform one =
PTransform.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.build())
.putInputs("in", redOut.getUniqueName())
.putOutputs("out", oneOut.getUniqueName())
.build();
PCollection twoOut = PCollection.newBuilder().setUniqueName("two.out").build();
PTransform two =
PTransform.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.build())
.putInputs("in", redOut.getUniqueName())
.putOutputs("out", twoOut.getUniqueName())
.build();
PCollection blueOut = PCollection.newBuilder().setUniqueName("blue.out").build();
PTransform blue =
PTransform.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.build())
.putInputs("one", oneOut.getUniqueName())
.putInputs("two", twoOut.getUniqueName())
.putOutputs("out", blueOut.getUniqueName())
Expand Down Expand Up @@ -147,29 +166,52 @@ public void duplicateOverStages() {
* --> [two -> .out -> shared ->] .out:1 /
*/
PCollection redOut = PCollection.newBuilder().setUniqueName("red.out").build();
PTransform red = PTransform.newBuilder().putOutputs("out", redOut.getUniqueName()).build();
PTransform red =
PTransform.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.build())
.putOutputs("out", redOut.getUniqueName())
.build();
PCollection oneOut = PCollection.newBuilder().setUniqueName("one.out").build();
PTransform one =
PTransform.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.build())
.putInputs("in", redOut.getUniqueName())
.putOutputs("out", oneOut.getUniqueName())
.build();
PCollection twoOut = PCollection.newBuilder().setUniqueName("two.out").build();
PTransform two =
PTransform.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.build())
.putInputs("in", redOut.getUniqueName())
.putOutputs("out", twoOut.getUniqueName())
.build();
PCollection sharedOut = PCollection.newBuilder().setUniqueName("shared.out").build();
PTransform shared =
PTransform.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.build())
.putInputs("one", oneOut.getUniqueName())
.putInputs("two", twoOut.getUniqueName())
.putOutputs("shared", sharedOut.getUniqueName())
.build();
PCollection blueOut = PCollection.newBuilder().setUniqueName("blue.out").build();
PTransform blue =
PTransform.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.build())
.putInputs("in", sharedOut.getUniqueName())
.putOutputs("out", blueOut.getUniqueName())
.build();
Expand Down Expand Up @@ -269,23 +311,42 @@ public void duplicateOverStagesAndTransforms() {
* -----------------> shared:0 -> .out:1 /
*/
PCollection redOut = PCollection.newBuilder().setUniqueName("red.out").build();
PTransform red = PTransform.newBuilder().putOutputs("out", redOut.getUniqueName()).build();
PTransform red =
PTransform.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.build())
.putOutputs("out", redOut.getUniqueName())
.build();
PCollection oneOut = PCollection.newBuilder().setUniqueName("one.out").build();
PTransform one =
PTransform.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.build())
.putInputs("in", redOut.getUniqueName())
.putOutputs("out", oneOut.getUniqueName())
.build();
PCollection sharedOut = PCollection.newBuilder().setUniqueName("shared.out").build();
PTransform shared =
PTransform.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.build())
.putInputs("one", oneOut.getUniqueName())
.putInputs("red", redOut.getUniqueName())
.putOutputs("shared", sharedOut.getUniqueName())
.build();
PCollection blueOut = PCollection.newBuilder().setUniqueName("blue.out").build();
PTransform blue =
PTransform.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.build())
.putInputs("in", sharedOut.getUniqueName())
.putOutputs("out", blueOut.getUniqueName())
.build();
Expand Down Expand Up @@ -386,42 +447,73 @@ public void multipleDuplicatesInStages() {
* [-> three -> .out -> otherShared -> .out:1] ---/
*/
PCollection redOut = PCollection.newBuilder().setUniqueName("red.out").build();
PTransform red = PTransform.newBuilder().putOutputs("out", redOut.getUniqueName()).build();
PTransform red =
PTransform.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.build())
.putOutputs("out", redOut.getUniqueName())
.build();
PCollection threeOut = PCollection.newBuilder().setUniqueName("three.out").build();
PTransform three =
PTransform.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.build())
.putInputs("in", redOut.getUniqueName())
.putOutputs("out", threeOut.getUniqueName())
.build();
PCollection oneOut = PCollection.newBuilder().setUniqueName("one.out").build();
PTransform one =
PTransform.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.build())
.putInputs("in", redOut.getUniqueName())
.putOutputs("out", oneOut.getUniqueName())
.build();
PCollection twoOut = PCollection.newBuilder().setUniqueName("two.out").build();
PTransform two =
PTransform.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.build())
.putInputs("in", redOut.getUniqueName())
.putOutputs("out", twoOut.getUniqueName())
.build();
PCollection sharedOut = PCollection.newBuilder().setUniqueName("shared.out").build();
PTransform shared =
PTransform.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.build())
.putInputs("one", oneOut.getUniqueName())
.putInputs("two", twoOut.getUniqueName())
.putOutputs("shared", sharedOut.getUniqueName())
.build();
PCollection otherSharedOut = PCollection.newBuilder().setUniqueName("shared.out2").build();
PTransform otherShared =
PTransform.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.build())
.putInputs("multi", threeOut.getUniqueName())
.putInputs("two", twoOut.getUniqueName())
.putOutputs("out", otherSharedOut.getUniqueName())
.build();
PCollection blueOut = PCollection.newBuilder().setUniqueName("blue.out").build();
PTransform blue =
PTransform.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
.setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
.build())
.putInputs("in", sharedOut.getUniqueName())
.putOutputs("out", blueOut.getUniqueName())
.build();
Expand Down

0 comments on commit af58bd2

Please sign in to comment.