Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataflow flex-templates resource #3520

Closed
wants to merge 11 commits into from
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
<% autogen_exception -%>
package google
<% unless version == 'ga' -%>

import (
"fmt"
"log"
"time"

"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/helper/validation"
dataflow "google.golang.org/api/dataflow/v1b3"
)

// NOTE: resource_dataflow_flex_template currently does not support updating existing jobs.
// Changing any non-computed field will result in the job being deleted (according to its
// on_delete policy) and recreated with the updated parameters.

// resourceDataflowFlexTemplateJob defines the schema for Dataflow FlexTemplate jobs.
func resourceDataflowFlexTemplateJob() *schema.Resource {
return &schema.Resource{
Create: resourceDataflowFlexTemplateJobCreate,
Read: resourceDataflowFlexTemplateJobRead,
Update: resourceDataflowFlexTemplateJobUpdateByReplacement,
Delete: resourceDataflowJobDelete,
Schema: map[string]*schema.Schema{

"container_spec_gcs_path": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},

"name": {
geojaz marked this conversation as resolved.
Show resolved Hide resolved
Type: schema.TypeString,
Required: true,
ForceNew: true,
},

"on_delete": {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we want a blank update method defined. That will make Terraform persist the value for on_delete into state. If there's no method at all, it requires this field to recreate the resource.

Type: schema.TypeString,
ValidateFunc: validation.StringInSlice([]string{"cancel", "drain"}, false),
Optional: true,
Default: "drain",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc below says the default is cancel. (The default is "cancelled",)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing to cancel.

},

"labels": {
Type: schema.TypeMap,
Optional: true,
geojaz marked this conversation as resolved.
Show resolved Hide resolved
DiffSuppressFunc: resourceDataflowJobLabelDiffSuppress,
ForceNew: true,
},

"parameters": {
Type: schema.TypeMap,
Optional: true,
ForceNew: true,
},

"project": {
Type: schema.TypeString,
Optional: true,
Computed: true,
// ForceNew applies to both stream and batch jobs
ForceNew: true,
},

"job_id": {
Type: schema.TypeString,
Computed: true,
},

"state": {
Type: schema.TypeString,
Computed: true,
},
},
}
}

// resourceDataflowFlexTemplateJobCreate creates a Flex Template Job from TF code.
func resourceDataflowFlexTemplateJobCreate(d *schema.ResourceData, meta interface{}) error {
config := meta.(*Config)

project, err := getProject(d, config)
if err != nil {
return err
}

region, err := getRegion(d, config)
if err != nil {
return err
}

request := dataflow.LaunchFlexTemplateRequest{
LaunchParameter: &dataflow.LaunchFlexTemplateParameter{
ContainerSpecGcsPath: d.Get("container_spec_gcs_path").(string),
JobName: d.Get("name").(string),
Parameters: expandStringMap(d, "parameters"),
},
}
response, err := config.clientDataflow.Projects.Locations.FlexTemplates.Launch(project, region, &request).Do()
if err != nil {
return err
}
job := response.Job
d.SetId(job.Id)
rileykarson marked this conversation as resolved.
Show resolved Hide resolved
d.Set("job_id", job.Id)

return resourceDataflowFlexTemplateJobRead(d, meta)
rileykarson marked this conversation as resolved.
Show resolved Hide resolved
}

// resourceDataflowFlexTemplateJobRead reads a Flex Template Job resource.
func resourceDataflowFlexTemplateJobRead(d *schema.ResourceData, meta interface{}) error {
config := meta.(*Config)

project, err := getProject(d, config)
if err != nil {
return err
}

region, err := getRegion(d, config)
if err != nil {
return err
}

jobId := d.Id()

job, err := resourceDataflowJobGetJob(config, project, region, jobId)
if err != nil {
return handleNotFoundError(err, d, fmt.Sprintf("Dataflow job %s", jobId))
}

d.Set("state", job.CurrentState)
d.Set("name", job.Name)
d.Set("project", project)
d.Set("labels", job.Labels)

if _, ok := dataflowTerminalStatesMap[job.CurrentState]; ok {
log.Printf("[DEBUG] Removing resource '%s' because it is in state %s.\n", job.Name, job.CurrentState)
d.SetId("")
return nil
}
d.SetId(job.Id)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this line needed L127, read this info from this field already?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't hurt, but doesn't do anything- removed.

d.Set("job_id", job.Id)

return nil
}

// resourceDataflowFlexTemplateJobUpdateByReplacement will be the method for updating Flex-Template jobs
func resourceDataflowFlexTemplateJobUpdateByReplacement(d *schema.ResourceData, meta interface{}) error {
rileykarson marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

<% end -%>
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
<% autogen_exception -%>
package google
<% unless version == 'ga' -%>

import (
"fmt"
"testing"

"github.com/hashicorp/terraform-plugin-sdk/helper/resource"
)

func TestAccDataflowFlexTemplateJob_simple(t *testing.T) {
t.Parallel()

randStr := randString(t, 10)
bucket := "tf-test-dataflow-gcs-" + randStr
job := "tf-test-dataflow-job-" + randStr

vcrTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckDataflowJobDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccDataflowFlowFlexTemplateJob_basic(bucket, job),
Check: resource.ComposeTestCheckFunc(
testAccDataflowJobExists(t, "google_dataflow_job.big_data"),
),
},
},
})
}

func testAccDataflowFlowFlexTemplateJob_basic(bucket, job string) string {
return fmt.Sprintf(`
resource "google_storage_bucket" "temp" {
name = "%s"
force_destroy = true
}

resource "google_storage_bucket_object" "flex_template" {
name = "flex_template.json"
bucket = "%s"
rileykarson marked this conversation as resolved.
Show resolved Hide resolved
content = "%s"
}

resource "google_dataflow_flex_template_job" "big_data" {
name = "%s"
container_spec_gcs_path = "%s"
rileykarson marked this conversation as resolved.
Show resolved Hide resolved
on_delete = "cancel"
}
`, bucket, bucket, flexTemplateContent(), bucket, job)
rileykarson marked this conversation as resolved.
Show resolved Hide resolved
}

func flexTemplateContent() string {
return `
geojaz marked this conversation as resolved.
Show resolved Hide resolved
<<EOF
rileykarson marked this conversation as resolved.
Show resolved Hide resolved
{
"name": "Streaming Beam SQL",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a complete flex template? I get the following error after making the changes noted above: Error: googleapi: Error 400: (165707124cd2c54b): Docker image not specified in the template file., badRequest

"description": "An Apache Beam streaming pipeline that reads JSON encoded messages from Pub/Sub, uses Beam SQL to transform the message data, and writes the results to a BigQuery",
"parameters": [
{
"name": "inputSubscription",
"label": "Pub/Sub input subscription.",
"helpText": "Pub/Sub subscription to read from.",
"regexes": [
"[-_.a-zA-Z0-9]+"
]
},
{
"name": "outputTable",
"label": "BigQuery output table",
"helpText": "BigQuery table spec to write to, in the form 'project:dataset.table'.",
"is_optional": true,
"regexes": [
"[^:]+:[^.]+[.].+"
]
}
]
}
EOF
`
}
<% end -%>
3 changes: 3 additions & 0 deletions third_party/terraform/utils/provider.go.erb
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@ end # products.each do
"google_container_node_pool": resourceContainerNodePool(),
"google_container_registry": resourceContainerRegistry(),
"google_dataflow_job": resourceDataflowJob(),
<% unless version == 'ga' -%>
"google_dataflow_flex_template_job": resourceDataflowFlexTemplateJob(),
<% end -%>
"google_dataproc_cluster": resourceDataprocCluster(),
"google_dataproc_cluster_iam_binding": ResourceIamBinding(IamDataprocClusterSchema, NewDataprocClusterUpdater, DataprocClusterIdParseFunc),
"google_dataproc_cluster_iam_member": ResourceIamMember(IamDataprocClusterSchema, NewDataprocClusterUpdater, DataprocClusterIdParseFunc),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
---
subcategory: "Dataflow"
layout: "google"
page_title: "Google: google_dataflow_flex_template_job"
sidebar_current: "docs-google-dataflow-flex-template-job"
description: |-
Creates a job in Dataflow based on a Flex Template.
---

# google\_dataflow\_flex\_template\_job

Creates a [Flex Template](https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates) job on Dataflow, which is an implementation of Apache Beam running on Google Compute Engine. For more information see
the official documentation for
[Beam](https://beam.apache.org) and [Dataflow](https://cloud.google.com/dataflow/).

## Example Usage

```hcl
resource "google_dataflow_flex_template_job" "big_data_job" {
provider = google-beta
name = "dataflow-flextemplates-job"
container_spec_gcs_path = "gs://my-bucket/templates/template.json"
parameters = {
inputSubscription = "messages"
}
}
```

[ To Come ...]
## Note on "destroy" / "apply"
There are many types of Dataflow jobs. Some Dataflow jobs run constantly, getting new data from (e.g.) a GCS bucket, and outputting data continuously. Some jobs process a set amount of data then terminate. All jobs can fail while running due to programming errors or other issues. In this way, Dataflow jobs are different from most other Terraform / Google resources.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dataflow terminology for these two types of jobs are streaming and batch jobs respectively.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to keep this message as-is for consistency with the existing resource at https://www.terraform.io/docs/providers/google/r/dataflow_job.html. If you're able to PR a fix (or just file an issue that would be a big help!


The Dataflow resource is considered 'existing' while it is in a nonterminal state. If it reaches a terminal state (e.g. 'FAILED', 'COMPLETE', 'CANCELLED'), it will be recreated on the next 'apply'. This is as expected for jobs which run continuously, but may surprise users who use this resource for other kinds of Dataflow jobs.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Replace for other kinds of Dataflow jobs. with for Dataflow batch jobs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above.


A Dataflow job which is 'destroyed' may be "cancelled" or "drained". If "cancelled", the job terminates - any data written remains where it is, but no new data will be processed. If "drained", no new data will enter the pipeline, but any data currently in the pipeline will finish being processed. The default is "cancelled", but if a user sets `on_delete` to `"drain"` in the configuration, you may experience a long wait for your `terraform destroy` to complete.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could link to : https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline for effects of cancel vs drain.

I do not know if destroyed is TF terminology. Dataflow term for this is stopping.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above.


## Argument Reference

The following arguments are supported:

* `name` - (Required) A unique name for the resource, required by Dataflow.
* `container_spec_gcs_path` - (Required) The GCS path to the Dataflow job Flex Template.

- - -

* `parameters` - (Optional) Key/Value pairs to be passed to the Dataflow job (as used in the template).
* `labels` - (Optional) User labels to be specified for the job. Keys and values should follow the restrictions
specified in the [labeling restrictions](https://cloud.google.com/compute/docs/labeling-resources#restrictions) page.
**NOTE**: Google-provided Dataflow templates often provide default labels that begin with `goog-dataflow-provided`.
Unless explicitly set in config, these labels will be ignored to prevent diffs on re-apply.
* `on_delete` - (Optional) One of "drain" or "cancel". Specifies behavior of deletion during `terraform destroy`. See above note.
* `project` - (Optional) The project in which the resource belongs. If it is not provided, the provider project is used.

## Attributes Reference
In addition to the arguments listed above, the following computed attributes are exported:

* `job_id` - The unique ID of this job.
* `type` - The type of this job, selected from the [JobType enum](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobType)
* `state` - The current state of the resource, selected from the [JobState enum](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState)