From ab10ac3560ee38398ff222f552e372e91f1ca4af Mon Sep 17 00:00:00 2001 From: sammcveety Date: Wed, 2 Mar 2016 21:27:08 -0800 Subject: [PATCH] [BEAM-93] Add subnetwork support and increment Dataflow API dependency --- pom.xml | 2 +- .../DataflowPipelineWorkerPoolOptions.java | 12 +++++++ .../runners/DataflowPipelineTranslator.java | 3 ++ .../DataflowPipelineTranslatorTest.java | 34 +++++++++++++++++++ 4 files changed, 50 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index de47ff5c4fa59..f9dbab710055f 100644 --- a/pom.xml +++ b/pom.xml @@ -70,7 +70,7 @@ 1.7.7 v2-rev248-1.21.0 0.2.3 - v1b3-rev19-1.21.0 + v1b3-rev22-1.21.0 0.5.160222 v1beta2-rev1-4.0.0 1.21.0 diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java index 25d15890c7c35..be5cfdc2a731f 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java @@ -144,6 +144,18 @@ public String create(PipelineOptions options) { String getNetwork(); void setNetwork(String value); + /** + * GCE subnetwork for launching + * workers. + * + *

Default is up to the Dataflow service. Expected format is zones/ZONE/subnetworks/SUBNETWORK. + */ + @Description("GCE subnetwork for launching workers. For more information, see the reference " + + "documentation https://cloud.google.com/compute/docs/networking. " + + "Default is up to the Dataflow service.") + String getSubnetwork(); + void setSubnetwork(String value); + /** * GCE availability zone for launching workers. diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java index ae3a40310372f..d0cc4e53d530a 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java @@ -442,6 +442,9 @@ public Job translate(List packages) { if (!Strings.isNullOrEmpty(options.getNetwork())) { workerPool.setNetwork(options.getNetwork()); } + if (!Strings.isNullOrEmpty(options.getSubnetwork())) { + workerPool.setSubnetwork(options.getSubnetwork()); + } if (options.getDiskSizeGb() > 0) { workerPool.setDiskSizeGb(options.getDiskSizeGb()); } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java index 72090a0866a62..497552f90124a 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java @@ -230,6 +230,40 @@ public void testNetworkConfigMissing() throws IOException { assertNull(job.getEnvironment().getWorkerPools().get(0).getNetwork()); } + @Test + public void testSubnetworkConfig() throws IOException { + final String testSubnetwork = "zones/ZONE/subnetworks/SUBNETWORK"; + + DataflowPipelineOptions options = buildPipelineOptions(); + options.setSubnetwork(testSubnetwork); + + DataflowPipeline p = buildPipeline(options); + p.traverseTopologically(new RecordingPipelineVisitor()); + Job job = + DataflowPipelineTranslator.fromOptions(options) + .translate(p, p.getRunner(), Collections.emptyList()) + .getJob(); + + assertEquals(1, job.getEnvironment().getWorkerPools().size()); + assertEquals(testSubnetwork, + job.getEnvironment().getWorkerPools().get(0).getSubnetwork()); + } + + @Test + public void testSubnetworkConfigMissing() throws IOException { + DataflowPipelineOptions options = buildPipelineOptions(); + + DataflowPipeline p = buildPipeline(options); + p.traverseTopologically(new RecordingPipelineVisitor()); + Job job = + DataflowPipelineTranslator.fromOptions(options) + .translate(p, p.getRunner(), Collections.emptyList()) + .getJob(); + + assertEquals(1, job.getEnvironment().getWorkerPools().size()); + assertNull(job.getEnvironment().getWorkerPools().get(0).getSubnetwork()); + } + @Test public void testScalingAlgorithmMissing() throws IOException { DataflowPipelineOptions options = buildPipelineOptions();