diff --git a/pom.xml b/pom.xml
index ba130d25a3d25..51741cf71995f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,7 +70,7 @@
1.7.7
v2-rev248-1.21.0
0.2.3
- v1b3-rev19-1.21.0
+ v1b3-rev22-1.21.0
0.5.160222
v1beta2-rev1-4.0.0
1.21.0
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java
index 25d15890c7c35..be5cfdc2a731f 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowPipelineWorkerPoolOptions.java
@@ -144,6 +144,18 @@ public String create(PipelineOptions options) {
String getNetwork();
void setNetwork(String value);
+ /**
+ * GCE subnetwork for launching
+ * workers.
+ *
+ *
Default is up to the Dataflow service. Expected format is zones/ZONE/subnetworks/SUBNETWORK.
+ */
+ @Description("GCE subnetwork for launching workers. For more information, see the reference "
+ + "documentation https://cloud.google.com/compute/docs/networking. "
+ + "Default is up to the Dataflow service.")
+ String getSubnetwork();
+ void setSubnetwork(String value);
+
/**
* GCE availability zone for launching workers.
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
index ae3a40310372f..d0cc4e53d530a 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java
@@ -442,6 +442,9 @@ public Job translate(List packages) {
if (!Strings.isNullOrEmpty(options.getNetwork())) {
workerPool.setNetwork(options.getNetwork());
}
+ if (!Strings.isNullOrEmpty(options.getSubnetwork())) {
+ workerPool.setSubnetwork(options.getSubnetwork());
+ }
if (options.getDiskSizeGb() > 0) {
workerPool.setDiskSizeGb(options.getDiskSizeGb());
}
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java
index 72090a0866a62..497552f90124a 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java
@@ -230,6 +230,40 @@ public void testNetworkConfigMissing() throws IOException {
assertNull(job.getEnvironment().getWorkerPools().get(0).getNetwork());
}
+ @Test
+ public void testSubnetworkConfig() throws IOException {
+ final String testSubnetwork = "zones/ZONE/subnetworks/SUBNETWORK";
+
+ DataflowPipelineOptions options = buildPipelineOptions();
+ options.setSubnetwork(testSubnetwork);
+
+ DataflowPipeline p = buildPipeline(options);
+ p.traverseTopologically(new RecordingPipelineVisitor());
+ Job job =
+ DataflowPipelineTranslator.fromOptions(options)
+ .translate(p, p.getRunner(), Collections.emptyList())
+ .getJob();
+
+ assertEquals(1, job.getEnvironment().getWorkerPools().size());
+ assertEquals(testSubnetwork,
+ job.getEnvironment().getWorkerPools().get(0).getSubnetwork());
+ }
+
+ @Test
+ public void testSubnetworkConfigMissing() throws IOException {
+ DataflowPipelineOptions options = buildPipelineOptions();
+
+ DataflowPipeline p = buildPipeline(options);
+ p.traverseTopologically(new RecordingPipelineVisitor());
+ Job job =
+ DataflowPipelineTranslator.fromOptions(options)
+ .translate(p, p.getRunner(), Collections.emptyList())
+ .getJob();
+
+ assertEquals(1, job.getEnvironment().getWorkerPools().size());
+ assertNull(job.getEnvironment().getWorkerPools().get(0).getSubnetwork());
+ }
+
@Test
public void testScalingAlgorithmMissing() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();