Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataproc job - add presto config #5739

Merged
131 changes: 125 additions & 6 deletions mmv1/third_party/terraform/resources/resource_dataproc_job.go.erb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
<% end -%>
)

var jobTypes = []string{"pyspark_config", "spark_config", "hadoop_config", "hive_config", "pig_config", "sparksql_config", "presto_config"}

func resourceDataprocJob() *schema.Resource {
return &schema.Resource{
Create: resourceDataprocJobCreate,
Expand Down Expand Up @@ -180,6 +182,7 @@ func resourceDataprocJob() *schema.Resource {
"hive_config": hiveSchema,
"pig_config": pigSchema,
"sparksql_config": sparkSqlSchema,
"presto_config": prestoSchema,
},
UseJSONNumber: true,
}
Expand Down Expand Up @@ -261,6 +264,11 @@ func resourceDataprocJobCreate(d *schema.ResourceData, meta interface{}) error {
submitReq.Job.SparkSqlJob = expandSparkSqlJob(config)
}

if v, ok := d.GetOk("presto_config"); ok {
config := extractFirstMapConfig(v.([]interface{}))
submitReq.Job.PrestoJob = expandPrestoJob(config)
}

// Submit the job
job, err := config.NewDataprocClient(userAgent).Projects.Regions.Jobs.Submit(
project, region, submitReq).Do()
Expand Down Expand Up @@ -359,6 +367,12 @@ func resourceDataprocJobRead(d *schema.ResourceData, meta interface{}) error {
return fmt.Errorf("Error setting sparksql_config: %s", err)
}
}

if job.PrestoJob != nil {
if err := d.Set("presto_config", flattenPrestoJob(job.PrestoJob)); err != nil {
return fmt.Errorf("Error setting presto_config: %s", err)
}
}
return nil
}

Expand Down Expand Up @@ -441,7 +455,7 @@ var pySparkSchema = &schema.Schema{
ForceNew: true,
MaxItems: 1,
Description: `The config of pySpark job.`,
ExactlyOneOf: []string{"pyspark_config", "spark_config", "hadoop_config", "hive_config", "pig_config", "sparksql_config"},
ExactlyOneOf: jobTypes,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"main_python_file_uri": {
Expand Down Expand Up @@ -570,7 +584,7 @@ var sparkSchema = &schema.Schema{
ForceNew: true,
MaxItems: 1,
Description: `The config of the Spark job.`,
ExactlyOneOf: []string{"pyspark_config", "spark_config", "hadoop_config", "hive_config", "pig_config", "sparksql_config"},
ExactlyOneOf: jobTypes,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
// main driver: can be only one of the class | jar_file
Expand Down Expand Up @@ -691,7 +705,7 @@ var hadoopSchema = &schema.Schema{
ForceNew: true,
MaxItems: 1,
Description: `The config of Hadoop job`,
ExactlyOneOf: []string{"spark_config", "pyspark_config", "hadoop_config", "hive_config", "pig_config", "sparksql_config"},
ExactlyOneOf: jobTypes,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
// main driver: can be only one of the main_class | main_jar_file_uri
Expand Down Expand Up @@ -812,7 +826,7 @@ var hiveSchema = &schema.Schema{
ForceNew: true,
MaxItems: 1,
Description: `The config of hive job`,
ExactlyOneOf: []string{"spark_config", "pyspark_config", "hadoop_config", "hive_config", "pig_config", "sparksql_config"},
ExactlyOneOf: jobTypes,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
// main query: can be only one of query_list | query_file_uri
Expand Down Expand Up @@ -918,7 +932,7 @@ var pigSchema = &schema.Schema{
ForceNew: true,
MaxItems: 1,
Description: `The config of pag job.`,
ExactlyOneOf: []string{"spark_config", "pyspark_config", "hadoop_config", "hive_config", "pig_config", "sparksql_config"},
ExactlyOneOf: jobTypes,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
// main query: can be only one of query_list | query_file_uri
Expand Down Expand Up @@ -1027,7 +1041,7 @@ var sparkSqlSchema = &schema.Schema{
ForceNew: true,
MaxItems: 1,
Description: `The config of SparkSql job`,
ExactlyOneOf: []string{"spark_config", "pyspark_config", "hadoop_config", "hive_config", "pig_config", "sparksql_config"},
ExactlyOneOf: jobTypes,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
// main query: can be only one of query_list | query_file_uri
Expand Down Expand Up @@ -1117,6 +1131,111 @@ func expandSparkSqlJob(config map[string]interface{}) *dataproc.SparkSqlJob {

}

// ---- Presto Job ----

var prestoSchema = &schema.Schema{
Type: schema.TypeList,
Optional: true,
ForceNew: true,
MaxItems: 1,
Description: `The config of presto job`,
ExactlyOneOf: jobTypes,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"client_tags": {
Type: schema.TypeList,
Description: `Presto client tags to attach to this query.`,
Optional: true,
ForceNew: true,
Elem: &schema.Schema{Type: schema.TypeString},
},
"continue_on_failure": {
Type: schema.TypeBool,
Optional: true,
ForceNew: true,
Description: `Whether to continue executing queries if a query fails. Setting to true can be useful when executing independent parallel queries. Defaults to false.`,
},
// main query: can be only one of query_list | query_file_uri
"query_list": {
Type: schema.TypeList,
Optional: true,
ForceNew: true,
Description: `The list of SQL queries or statements to execute as part of the job. Conflicts with query_file_uri`,
Elem: &schema.Schema{Type: schema.TypeString},
ExactlyOneOf: []string{"presto_config.0.query_file_uri", "presto_config.0.query_list"},
},

"query_file_uri": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
Description: `The HCFS URI of the script that contains SQL queries. Conflicts with query_list`,
ExactlyOneOf: []string{"presto_config.0.query_file_uri", "presto_config.0.query_list"},
},

"properties": {
Type: schema.TypeMap,
Optional: true,
ForceNew: true,
Description: `A mapping of property names to values. Used to set Presto session properties Equivalent to using the --session flag in the Presto CLI.`,
Elem: &schema.Schema{Type: schema.TypeString},
},
"output_format": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
Description: `The format in which query output will be displayed. See the Presto documentation for supported output formats.`,
},

"logging_config": loggingConfig,
},
},
}

func flattenPrestoJob(job *dataproc.PrestoJob) []map[string]interface{} {
queries := []string{}
if job.QueryList != nil {
queries = job.QueryList.Queries
}
return []map[string]interface{}{
{
"client_tags": job.ClientTags,
"continue_on_failure": job.ContinueOnFailure,
"query_list": queries,
"query_file_uri": job.QueryFileUri,
"properties": job.Properties,
"output_format": job.OutputFormat,
},
}
}

func expandPrestoJob(config map[string]interface{}) *dataproc.PrestoJob {
job := &dataproc.PrestoJob{}
if v, ok := config["client_tags"]; ok {
job.ClientTags = convertStringArr(v.([]interface{}))
}
if v, ok := config["continue_on_failure"]; ok {
job.ContinueOnFailure = v.(bool)
}
if v, ok := config["query_file_uri"]; ok {
job.QueryFileUri = v.(string)
}
if v, ok := config["query_list"]; ok {
job.QueryList = &dataproc.QueryList{
Queries: convertStringArr(v.([]interface{})),
}
}
if v, ok := config["properties"]; ok {
job.Properties = convertStringMap(v.(map[string]interface{}))
}
if v, ok := config["output_format"]; ok {
job.OutputFormat = v.(string)
}

return job

}

// ---- Other flatten / expand methods ----

func expandLoggingConfig(config map[string]interface{}) *dataproc.LoggingConfig {
Expand Down
74 changes: 74 additions & 0 deletions mmv1/third_party/terraform/tests/resource_dataproc_job_test.go.erb
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,38 @@ func TestAccDataprocJob_SparkSql(t *testing.T) {
})
}

func TestAccDataprocJob_Presto(t *testing.T) {
t.Parallel()

var job dataproc.Job
rnd := randString(t, 10)
vcrTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckDataprocJobDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccDataprocJob_presto(rnd),
Check: resource.ComposeTestCheckFunc(
testAccCheckDataprocJobExists(t, "google_dataproc_job.presto", &job),

// Autogenerated / computed values
resource.TestCheckResourceAttrSet("google_dataproc_job.presto", "reference.0.job_id"),
resource.TestCheckResourceAttrSet("google_dataproc_job.presto", "status.0.state"),
resource.TestCheckResourceAttrSet("google_dataproc_job.presto", "status.0.state_start_time"),

// Unique job config
testAccCheckDataprocJobAttrMatch(
"google_dataproc_job.presto", "presto_config", &job),

// Wait until job completes successfully
testAccCheckDataprocJobCompletesSuccessfully(t, "google_dataproc_job.presto", &job),
),
},
},
})
}

func testAccCheckDataprocJobDestroyProducer(t *testing.T) func(s *terraform.State) error {
return func(s *terraform.State) error {
config := googleProviderConfig(t)
Expand Down Expand Up @@ -693,3 +725,45 @@ resource "google_dataproc_job" "sparksql" {
`, rnd)

}

func testAccDataprocJob_presto(rnd string) string {
return fmt.Sprintf(`
resource "google_dataproc_cluster" "basic" {
name = "dproc-job-test-%s"
region = "us-central1"

cluster_config {
# Keep the costs down with smallest config we can get away with
software_config {
override_properties = {
"dataproc:dataproc.allow.zero.workers" = "true"
}
optional_components = ["PRESTO"]
}

master_config {
num_instances = 1
machine_type = "e2-standard-2"
disk_config {
boot_disk_size_gb = 35
}
}
}
}

resource "google_dataproc_job" "presto" {
region = google_dataproc_cluster.basic.region
force_delete = true
placement {
cluster_name = google_dataproc_cluster.basic.name
}

presto_config {
query_list = [
"SELECT * FROM system.metadata.schema_properties"
]
}
}
`, rnd)

}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ output "pyspark_status" {
* [hive_config](#nested_hive_config) - Submits a Hive job to the cluster
* [hpig_config](#nested_hpig_config) - Submits a Pig job to the cluster
* [sparksql_config](#nested_sparksql_config) - Submits a Spark SQL job to the cluster
* [presto_config](#nested_presto_config) - Submits a Presto job to the cluster

- - -

Expand Down Expand Up @@ -320,6 +321,37 @@ resource "google_dataproc_job" "sparksql" {

* `logging_config.driver_log_levels`- (Required) The per-package log levels for the driver. This may include 'root' package name to configure rootLogger. Examples: 'com.google = FATAL', 'root = INFO', 'org.apache = DEBUG'

<a name="nested_presto_config"></a>The `presto_config` block supports:

```hcl
# Submit a Presto job to the cluster
resource "google_dataproc_job" "presto" {
...
presto_config {
query_list = [
"DROP TABLE IF EXISTS dprocjob_test",
"CREATE TABLE dprocjob_test(bar int)",
"SELECT * FROM dprocjob_test WHERE bar > 2",
]
}
}
```

* `client_tags` - (Optional) Presto client tags to attach to this query.

* `continue_on_failure` - (Optional) Whether to continue executing queries if a query fails. Setting to true can be useful when executing independent parallel queries. Defaults to false.

* `query_list`- (Optional) The list of SQL queries or statements to execute as part of the job.
Conflicts with `query_file_uri`

* `query_file_uri` - (Optional) The HCFS URI of the script that contains SQL queries.
Conflicts with `query_list`

* `properties` - (Optional) A mapping of property names to values. Used to set Presto session properties Equivalent to using the --session flag in the Presto CLI.

* `output_format` - (Optional) The format in which query output will be displayed. See the Presto documentation for supported output formats.

* `logging_config.driver_log_levels`- (Required) The per-package log levels for the driver. This may include 'root' package name to configure rootLogger. Examples: 'com.google = FATAL', 'root = INFO', 'org.apache = DEBUG'

## Attributes Reference

Expand Down