From d083345d4e5027c12a99d376b5a2352c04a93426 Mon Sep 17 00:00:00 2001 From: Modular Magician Date: Fri, 1 Apr 2022 19:54:56 +0000 Subject: [PATCH] Dataproc job - add presto config (#5739) * dataproc-job-log-level * add presto job * fmt * desc for presto props * docs * docs * actually add presto config * presto test * presto test actually * docs * presto expand/flatten * presto expand/flatten * fix test * Update resource_dataproc_job_test.go.erb Signed-off-by: Modular Magician --- .changelog/5739.txt | 3 + google/resource_dataproc_job.go | 131 +++++++++++++++++++++- google/resource_dataproc_job_test.go | 74 ++++++++++++ website/docs/r/dataproc_job.html.markdown | 32 ++++++ 4 files changed, 234 insertions(+), 6 deletions(-) create mode 100644 .changelog/5739.txt diff --git a/.changelog/5739.txt b/.changelog/5739.txt new file mode 100644 index 00000000000..5dd665cbf53 --- /dev/null +++ b/.changelog/5739.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +dataproc: adds `presto_config` to `dataproc_job` +``` diff --git a/google/resource_dataproc_job.go b/google/resource_dataproc_job.go index d19a6d1063a..ebf42d2ec5b 100644 --- a/google/resource_dataproc_job.go +++ b/google/resource_dataproc_job.go @@ -11,6 +11,8 @@ import ( "google.golang.org/api/dataproc/v1" ) +var jobTypes = []string{"pyspark_config", "spark_config", "hadoop_config", "hive_config", "pig_config", "sparksql_config", "presto_config"} + func resourceDataprocJob() *schema.Resource { return &schema.Resource{ Create: resourceDataprocJobCreate, @@ -175,6 +177,7 @@ func resourceDataprocJob() *schema.Resource { "hive_config": hiveSchema, "pig_config": pigSchema, "sparksql_config": sparkSqlSchema, + "presto_config": prestoSchema, }, UseJSONNumber: true, } @@ -256,6 +259,11 @@ func resourceDataprocJobCreate(d *schema.ResourceData, meta interface{}) error { submitReq.Job.SparkSqlJob = expandSparkSqlJob(config) } + if v, ok := d.GetOk("presto_config"); ok { + config := extractFirstMapConfig(v.([]interface{})) + submitReq.Job.PrestoJob = expandPrestoJob(config) + } + // Submit the job job, err := config.NewDataprocClient(userAgent).Projects.Regions.Jobs.Submit( project, region, submitReq).Do() @@ -354,6 +362,12 @@ func resourceDataprocJobRead(d *schema.ResourceData, meta interface{}) error { return fmt.Errorf("Error setting sparksql_config: %s", err) } } + + if job.PrestoJob != nil { + if err := d.Set("presto_config", flattenPrestoJob(job.PrestoJob)); err != nil { + return fmt.Errorf("Error setting presto_config: %s", err) + } + } return nil } @@ -436,7 +450,7 @@ var pySparkSchema = &schema.Schema{ ForceNew: true, MaxItems: 1, Description: `The config of pySpark job.`, - ExactlyOneOf: []string{"pyspark_config", "spark_config", "hadoop_config", "hive_config", "pig_config", "sparksql_config"}, + ExactlyOneOf: jobTypes, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "main_python_file_uri": { @@ -565,7 +579,7 @@ var sparkSchema = &schema.Schema{ ForceNew: true, MaxItems: 1, Description: `The config of the Spark job.`, - ExactlyOneOf: []string{"pyspark_config", "spark_config", "hadoop_config", "hive_config", "pig_config", "sparksql_config"}, + ExactlyOneOf: jobTypes, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ // main driver: can be only one of the class | jar_file @@ -686,7 +700,7 @@ var hadoopSchema = &schema.Schema{ ForceNew: true, MaxItems: 1, Description: `The config of Hadoop job`, - ExactlyOneOf: []string{"spark_config", "pyspark_config", "hadoop_config", "hive_config", "pig_config", "sparksql_config"}, + ExactlyOneOf: jobTypes, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ // main driver: can be only one of the main_class | main_jar_file_uri @@ -807,7 +821,7 @@ var hiveSchema = &schema.Schema{ ForceNew: true, MaxItems: 1, Description: `The config of hive job`, - ExactlyOneOf: []string{"spark_config", "pyspark_config", "hadoop_config", "hive_config", "pig_config", "sparksql_config"}, + ExactlyOneOf: jobTypes, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ // main query: can be only one of query_list | query_file_uri @@ -913,7 +927,7 @@ var pigSchema = &schema.Schema{ ForceNew: true, MaxItems: 1, Description: `The config of pag job.`, - ExactlyOneOf: []string{"spark_config", "pyspark_config", "hadoop_config", "hive_config", "pig_config", "sparksql_config"}, + ExactlyOneOf: jobTypes, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ // main query: can be only one of query_list | query_file_uri @@ -1022,7 +1036,7 @@ var sparkSqlSchema = &schema.Schema{ ForceNew: true, MaxItems: 1, Description: `The config of SparkSql job`, - ExactlyOneOf: []string{"spark_config", "pyspark_config", "hadoop_config", "hive_config", "pig_config", "sparksql_config"}, + ExactlyOneOf: jobTypes, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ // main query: can be only one of query_list | query_file_uri @@ -1112,6 +1126,111 @@ func expandSparkSqlJob(config map[string]interface{}) *dataproc.SparkSqlJob { } +// ---- Presto Job ---- + +var prestoSchema = &schema.Schema{ + Type: schema.TypeList, + Optional: true, + ForceNew: true, + MaxItems: 1, + Description: `The config of presto job`, + ExactlyOneOf: jobTypes, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "client_tags": { + Type: schema.TypeList, + Description: `Presto client tags to attach to this query.`, + Optional: true, + ForceNew: true, + Elem: &schema.Schema{Type: schema.TypeString}, + }, + "continue_on_failure": { + Type: schema.TypeBool, + Optional: true, + ForceNew: true, + Description: `Whether to continue executing queries if a query fails. Setting to true can be useful when executing independent parallel queries. Defaults to false.`, + }, + // main query: can be only one of query_list | query_file_uri + "query_list": { + Type: schema.TypeList, + Optional: true, + ForceNew: true, + Description: `The list of SQL queries or statements to execute as part of the job. Conflicts with query_file_uri`, + Elem: &schema.Schema{Type: schema.TypeString}, + ExactlyOneOf: []string{"presto_config.0.query_file_uri", "presto_config.0.query_list"}, + }, + + "query_file_uri": { + Type: schema.TypeString, + Optional: true, + ForceNew: true, + Description: `The HCFS URI of the script that contains SQL queries. Conflicts with query_list`, + ExactlyOneOf: []string{"presto_config.0.query_file_uri", "presto_config.0.query_list"}, + }, + + "properties": { + Type: schema.TypeMap, + Optional: true, + ForceNew: true, + Description: `A mapping of property names to values. Used to set Presto session properties Equivalent to using the --session flag in the Presto CLI.`, + Elem: &schema.Schema{Type: schema.TypeString}, + }, + "output_format": { + Type: schema.TypeString, + Optional: true, + ForceNew: true, + Description: `The format in which query output will be displayed. See the Presto documentation for supported output formats.`, + }, + + "logging_config": loggingConfig, + }, + }, +} + +func flattenPrestoJob(job *dataproc.PrestoJob) []map[string]interface{} { + queries := []string{} + if job.QueryList != nil { + queries = job.QueryList.Queries + } + return []map[string]interface{}{ + { + "client_tags": job.ClientTags, + "continue_on_failure": job.ContinueOnFailure, + "query_list": queries, + "query_file_uri": job.QueryFileUri, + "properties": job.Properties, + "output_format": job.OutputFormat, + }, + } +} + +func expandPrestoJob(config map[string]interface{}) *dataproc.PrestoJob { + job := &dataproc.PrestoJob{} + if v, ok := config["client_tags"]; ok { + job.ClientTags = convertStringArr(v.([]interface{})) + } + if v, ok := config["continue_on_failure"]; ok { + job.ContinueOnFailure = v.(bool) + } + if v, ok := config["query_file_uri"]; ok { + job.QueryFileUri = v.(string) + } + if v, ok := config["query_list"]; ok { + job.QueryList = &dataproc.QueryList{ + Queries: convertStringArr(v.([]interface{})), + } + } + if v, ok := config["properties"]; ok { + job.Properties = convertStringMap(v.(map[string]interface{})) + } + if v, ok := config["output_format"]; ok { + job.OutputFormat = v.(string) + } + + return job + +} + // ---- Other flatten / expand methods ---- func expandLoggingConfig(config map[string]interface{}) *dataproc.LoggingConfig { diff --git a/google/resource_dataproc_job_test.go b/google/resource_dataproc_job_test.go index 970d745e854..96f72966da8 100644 --- a/google/resource_dataproc_job_test.go +++ b/google/resource_dataproc_job_test.go @@ -267,6 +267,38 @@ func TestAccDataprocJob_SparkSql(t *testing.T) { }) } +func TestAccDataprocJob_Presto(t *testing.T) { + t.Parallel() + + var job dataproc.Job + rnd := randString(t, 10) + vcrTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckDataprocJobDestroyProducer(t), + Steps: []resource.TestStep{ + { + Config: testAccDataprocJob_presto(rnd), + Check: resource.ComposeTestCheckFunc( + testAccCheckDataprocJobExists(t, "google_dataproc_job.presto", &job), + + // Autogenerated / computed values + resource.TestCheckResourceAttrSet("google_dataproc_job.presto", "reference.0.job_id"), + resource.TestCheckResourceAttrSet("google_dataproc_job.presto", "status.0.state"), + resource.TestCheckResourceAttrSet("google_dataproc_job.presto", "status.0.state_start_time"), + + // Unique job config + testAccCheckDataprocJobAttrMatch( + "google_dataproc_job.presto", "presto_config", &job), + + // Wait until job completes successfully + testAccCheckDataprocJobCompletesSuccessfully(t, "google_dataproc_job.presto", &job), + ), + }, + }, + }) +} + func testAccCheckDataprocJobDestroyProducer(t *testing.T) func(s *terraform.State) error { return func(s *terraform.State) error { config := googleProviderConfig(t) @@ -688,3 +720,45 @@ resource "google_dataproc_job" "sparksql" { `, rnd) } + +func testAccDataprocJob_presto(rnd string) string { + return fmt.Sprintf(` +resource "google_dataproc_cluster" "basic" { + name = "dproc-job-test-%s" + region = "us-central1" + + cluster_config { + # Keep the costs down with smallest config we can get away with + software_config { + override_properties = { + "dataproc:dataproc.allow.zero.workers" = "true" + } + optional_components = ["PRESTO"] + } + + master_config { + num_instances = 1 + machine_type = "e2-standard-2" + disk_config { + boot_disk_size_gb = 35 + } + } + } +} + +resource "google_dataproc_job" "presto" { + region = google_dataproc_cluster.basic.region + force_delete = true + placement { + cluster_name = google_dataproc_cluster.basic.name + } + + presto_config { + query_list = [ + "SELECT * FROM system.metadata.schema_properties" + ] + } +} +`, rnd) + +} diff --git a/website/docs/r/dataproc_job.html.markdown b/website/docs/r/dataproc_job.html.markdown index 1c7ef3dbed7..fb2e436aadf 100644 --- a/website/docs/r/dataproc_job.html.markdown +++ b/website/docs/r/dataproc_job.html.markdown @@ -90,6 +90,7 @@ output "pyspark_status" { * [hive_config](#nested_hive_config) - Submits a Hive job to the cluster * [hpig_config](#nested_hpig_config) - Submits a Pig job to the cluster * [sparksql_config](#nested_sparksql_config) - Submits a Spark SQL job to the cluster + * [presto_config](#nested_presto_config) - Submits a Presto job to the cluster - - - @@ -320,6 +321,37 @@ resource "google_dataproc_job" "sparksql" { * `logging_config.driver_log_levels`- (Required) The per-package log levels for the driver. This may include 'root' package name to configure rootLogger. Examples: 'com.google = FATAL', 'root = INFO', 'org.apache = DEBUG' +The `presto_config` block supports: + +```hcl +# Submit a Presto job to the cluster +resource "google_dataproc_job" "presto" { + ... + presto_config { + query_list = [ + "DROP TABLE IF EXISTS dprocjob_test", + "CREATE TABLE dprocjob_test(bar int)", + "SELECT * FROM dprocjob_test WHERE bar > 2", + ] + } +} +``` + +* `client_tags` - (Optional) Presto client tags to attach to this query. + +* `continue_on_failure` - (Optional) Whether to continue executing queries if a query fails. Setting to true can be useful when executing independent parallel queries. Defaults to false. + +* `query_list`- (Optional) The list of SQL queries or statements to execute as part of the job. + Conflicts with `query_file_uri` + +* `query_file_uri` - (Optional) The HCFS URI of the script that contains SQL queries. + Conflicts with `query_list` + +* `properties` - (Optional) A mapping of property names to values. Used to set Presto session properties Equivalent to using the --session flag in the Presto CLI. + +* `output_format` - (Optional) The format in which query output will be displayed. See the Presto documentation for supported output formats. + +* `logging_config.driver_log_levels`- (Required) The per-package log levels for the driver. This may include 'root' package name to configure rootLogger. Examples: 'com.google = FATAL', 'root = INFO', 'org.apache = DEBUG' ## Attributes Reference