Skip to content

Commit

Permalink
Fix pipeline options toArgs() returning empty list #765
Browse files Browse the repository at this point in the history
  • Loading branch information
khorshuheng committed Jun 7, 2020
1 parent b497a0b commit 8bb7f27
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,45 +47,45 @@ public DataflowRunnerConfig(DataflowRunnerConfigOptions runnerConfigOptions) {
}

/* Project id to use when launching jobs. */
@NotBlank String project;
@NotBlank public String project;

/* The Google Compute Engine region for creating Dataflow jobs. */
@NotBlank String region;
@NotBlank public String region;

/* GCP availability zone for operations. */
@NotBlank String zone;
@NotBlank public String zone;

/* Run the job as a specific service account, instead of the default GCE robot. */
String serviceAccount;
public String serviceAccount;

/* GCE network for launching workers. */
@NotBlank String network;
@NotBlank public String network;

/* GCE subnetwork for launching workers. */
@NotBlank String subnetwork;
@NotBlank public String subnetwork;

/* Machine type to create Dataflow worker VMs as. */
String workerMachineType;
public String workerMachineType;

/* The autoscaling algorithm to use for the workerpool. */
String autoscalingAlgorithm;
public String autoscalingAlgorithm;

/* Specifies whether worker pools should be started with public IP addresses. */
Boolean usePublicIps;
public Boolean usePublicIps;

/**
* A pipeline level default location for storing temporary files. Support Google Cloud Storage
* locations, e.g. gs://bucket/object
*/
@NotBlank String tempLocation;
@NotBlank public String tempLocation;

/* The maximum number of workers to use for the workerpool. */
Integer maxNumWorkers;
public Integer maxNumWorkers;

/* BigQuery table specification, e.g. PROJECT_ID:DATASET_ID.PROJECT_ID */
String deadLetterTableSpec;
public String deadLetterTableSpec;

Map<String, String> labels;
public Map<String, String> labels;

/** Validates Dataflow runner configuration options */
public void validate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ public class DirectRunnerConfig extends RunnerConfig {
* Controls the amount of target parallelism the DirectRunner will use. Defaults to the greater of
* the number of available processors and 3. Must be a value greater than zero.
*/
Integer targetParallelism;
public Integer targetParallelism;

/* BigQuery table specification, e.g. PROJECT_ID:DATASET_ID.PROJECT_ID */
String deadletterTableSpec;
public String deadletterTableSpec;

public DirectRunnerConfig(DirectRunnerConfigOptions runnerConfigOptions) {
this.deadletterTableSpec = runnerConfigOptions.getDeadLetterTableSpec();
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/feast/core/job/option/RunnerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
public abstract class RunnerConfig {

/**
* Converts the fields in this class to a list of --key=value args to be passed to a {@link
* org.apache.beam.sdk.options.PipelineOptionsFactory}.
* Converts the public-access fields in this class to a list of --key=value args to be passed to a
* {@link org.apache.beam.sdk.options.PipelineOptionsFactory}.
*
* <p>Ignores values that are proto-default (e.g. empty string, 0).
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.job.dataflow;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.*;

import com.google.common.collect.Lists;
import feast.proto.core.RunnerProto.DataflowRunnerConfigOptions;
import java.util.Arrays;
import java.util.List;
import org.junit.Test;

public class DataflowRunnerConfigTest {
@Test
public void shouldConvertToPipelineArgs() throws IllegalAccessException {
DataflowRunnerConfigOptions opts =
DataflowRunnerConfigOptions.newBuilder()
.setProject("my-project")
.setRegion("asia-east1")
.setZone("asia-east1-a")
.setTempLocation("gs://bucket/tempLocation")
.setNetwork("default")
.setSubnetwork("regions/asia-east1/subnetworks/mysubnetwork")
.setMaxNumWorkers(1)
.setAutoscalingAlgorithm("THROUGHPUT_BASED")
.setUsePublicIps(false)
.setWorkerMachineType("n1-standard-1")
.setDeadLetterTableSpec("project_id:dataset_id.table_id")
.putLabels("key", "value")
.build();

DataflowRunnerConfig dataflowRunnerConfig = new DataflowRunnerConfig(opts);
List<String> args = Lists.newArrayList(dataflowRunnerConfig.toArgs());
String[] expectedArgs =
Arrays.asList(
"--project=my-project",
"--region=asia-east1",
"--zone=asia-east1-a",
"--tempLocation=gs://bucket/tempLocation",
"--network=default",
"--subnetwork=regions/asia-east1/subnetworks/mysubnetwork",
"--maxNumWorkers=1",
"--autoscalingAlgorithm=THROUGHPUT_BASED",
"--usePublicIps=false",
"--workerMachineType=n1-standard-1",
"--deadLetterTableSpec=project_id:dataset_id.table_id",
"--labels={\"key\":\"value\"}")
.toArray(String[]::new);
assertThat(args.size(), equalTo(expectedArgs.length));
assertThat(args, containsInAnyOrder(expectedArgs));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.job.direct;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.*;

import com.google.common.collect.Lists;
import feast.proto.core.RunnerProto.DirectRunnerConfigOptions;
import java.util.List;
import org.junit.Test;

public class DirectRunnerConfigTest {
@Test
public void shouldConvertToPipelineArgs() throws IllegalAccessException {
DirectRunnerConfigOptions opts =
DirectRunnerConfigOptions.newBuilder()
.setTargetParallelism(1)
.setDeadLetterTableSpec("project_id:dataset_id.table_id")
.build();
DirectRunnerConfig directRunnerConfig = new DirectRunnerConfig(opts);
List<String> args = Lists.newArrayList(directRunnerConfig.toArgs());
assertThat(args.size(), equalTo(2));
assertThat(
args,
containsInAnyOrder(
"--targetParallelism=1", "--deadletterTableSpec=project_id:dataset_id.table_id"));
}
}

0 comments on commit 8bb7f27

Please sign in to comment.