Skip to content

Commit

Permalink
This closes #16
Browse files Browse the repository at this point in the history
  • Loading branch information
davorbonaci committed Mar 6, 2016
2 parents bf15d2f + ab10ac3 commit 22ff05c
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
<avro.version>1.7.7</avro.version>
<bigquery.version>v2-rev248-1.21.0</bigquery.version>
<bigtable.version>0.2.3</bigtable.version>
<dataflow.version>v1b3-rev19-1.21.0</dataflow.version>
<dataflow.version>v1b3-rev22-1.21.0</dataflow.version>
<dataflow.proto.version>0.5.160222</dataflow.proto.version>
<datastore.version>v1beta2-rev1-4.0.0</datastore.version>
<google-clients.version>1.21.0</google-clients.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,18 @@ public String create(PipelineOptions options) {
String getNetwork();
void setNetwork(String value);

/**
* GCE <a href="https://cloud.google.com/compute/docs/networking">subnetwork</a> for launching
* workers.
*
* <p>Default is up to the Dataflow service. Expected format is zones/ZONE/subnetworks/SUBNETWORK.
*/
@Description("GCE subnetwork for launching workers. For more information, see the reference "
+ "documentation https://cloud.google.com/compute/docs/networking. "
+ "Default is up to the Dataflow service.")
String getSubnetwork();
void setSubnetwork(String value);

/**
* GCE <a href="https://developers.google.com/compute/docs/zones"
* >availability zone</a> for launching workers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,9 @@ public Job translate(List<DataflowPackage> packages) {
if (!Strings.isNullOrEmpty(options.getNetwork())) {
workerPool.setNetwork(options.getNetwork());
}
if (!Strings.isNullOrEmpty(options.getSubnetwork())) {
workerPool.setSubnetwork(options.getSubnetwork());
}
if (options.getDiskSizeGb() > 0) {
workerPool.setDiskSizeGb(options.getDiskSizeGb());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,40 @@ public void testNetworkConfigMissing() throws IOException {
assertNull(job.getEnvironment().getWorkerPools().get(0).getNetwork());
}

@Test
public void testSubnetworkConfig() throws IOException {
final String testSubnetwork = "zones/ZONE/subnetworks/SUBNETWORK";

DataflowPipelineOptions options = buildPipelineOptions();
options.setSubnetwork(testSubnetwork);

DataflowPipeline p = buildPipeline(options);
p.traverseTopologically(new RecordingPipelineVisitor());
Job job =
DataflowPipelineTranslator.fromOptions(options)
.translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
.getJob();

assertEquals(1, job.getEnvironment().getWorkerPools().size());
assertEquals(testSubnetwork,
job.getEnvironment().getWorkerPools().get(0).getSubnetwork());
}

@Test
public void testSubnetworkConfigMissing() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();

DataflowPipeline p = buildPipeline(options);
p.traverseTopologically(new RecordingPipelineVisitor());
Job job =
DataflowPipelineTranslator.fromOptions(options)
.translate(p, p.getRunner(), Collections.<DataflowPackage>emptyList())
.getJob();

assertEquals(1, job.getEnvironment().getWorkerPools().size());
assertNull(job.getEnvironment().getWorkerPools().get(0).getSubnetwork());
}

@Test
public void testScalingAlgorithmMissing() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
Expand Down

0 comments on commit 22ff05c

Please sign in to comment.