Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upstream flex template update #4677

Merged
merged 2 commits into from
May 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ func resourceDataflowFlexTemplateJob() *schema.Resource {
"container_spec_gcs_path": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},

"name": {
Expand Down Expand Up @@ -60,15 +59,13 @@ func resourceDataflowFlexTemplateJob() *schema.Resource {
Type: schema.TypeMap,
Optional: true,
DiffSuppressFunc: resourceDataflowJobLabelDiffSuppress,
ForceNew: true,
// TODO add support for labels when the API supports it
Deprecated: "Deprecated until the API supports this field",
},

"parameters": {
Type: schema.TypeMap,
Optional: true,
ForceNew: true,
},

"project": {
Expand Down Expand Up @@ -179,10 +176,96 @@ func resourceDataflowFlexTemplateJobRead(d *schema.ResourceData, meta interface{
return nil
}

// resourceDataflowFlexTemplateJobUpdate is a blank method to enable updating
// the on_delete virtual field
func waitForDataflowJobState(d *schema.ResourceData, config *Config, jobID, userAgent string, timeout time.Duration, targetState string) error {
return resource.Retry(timeout, func() *resource.RetryError {
project, err := getProject(d, config)
if err != nil {
return resource.NonRetryableError(err)
}

region, err := getRegion(d, config)
if err != nil {
return resource.NonRetryableError(err)
}

job, err := resourceDataflowJobGetJob(config, project, region, userAgent, jobID)
if err != nil {
if isRetryableError(err) {
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}

state := job.CurrentState
if state == targetState {
log.Printf("[DEBUG] the job with ID %q has state %q.", jobID, state)
return nil
}
if _, terminated := dataflowTerminalStatesMap[state]; terminated {
return resource.NonRetryableError(fmt.Errorf("the job with ID %q has terminated with state %q instead of expected state %q", jobID, state, targetState))
} else {
log.Printf("[DEBUG] the job with ID %q has state %q.", jobID, state)
return resource.RetryableError(fmt.Errorf("the job with ID %q has state %q, waiting for %q", jobID, state, targetState))
}
})
}

// resourceDataflowFlexTemplateJobUpdate updates a Flex Template Job resource.
func resourceDataflowFlexTemplateJobUpdate(d *schema.ResourceData, meta interface{}) error {
return nil
// Don't send an update request if only virtual fields have changes
if resourceDataflowJobIsVirtualUpdate(d, resourceDataflowFlexTemplateJob().Schema) {
return nil
}

config := meta.(*Config)
userAgent, err := generateUserAgentString(d, config.userAgent)
if err != nil {
return err
}

project, err := getProject(d, config)
if err != nil {
return err
}

region, err := getRegion(d, config)
if err != nil {
return err
}

// wait until current job is running or terminated
err = waitForDataflowJobState(d, config, d.Id(), userAgent, d.Timeout(schema.TimeoutUpdate), "JOB_STATE_RUNNING")
if err != nil {
return fmt.Errorf("Error waiting for job with job ID %q to be running: %v", d.Id(), err)
}

request := dataflow.LaunchFlexTemplateRequest{
LaunchParameter: &dataflow.LaunchFlexTemplateParameter{
ContainerSpecGcsPath: d.Get("container_spec_gcs_path").(string),
JobName: d.Get("name").(string),
Parameters: expandStringMap(d, "parameters"),
Update: true,
},
}

response, err := config.NewDataflowClient(userAgent).Projects.Locations.FlexTemplates.Launch(project, region, &request).Do()
if err != nil {
return err
}

// don't set id until previous job is successfully updated
err = waitForDataflowJobState(d, config, d.Id(), userAgent, d.Timeout(schema.TimeoutUpdate), "JOB_STATE_UPDATED")
if err != nil {
return fmt.Errorf("Error waiting for Job with job ID %q to be updated: %v", d.Id(), err)
}

job := response.Job
d.SetId(job.Id)
if err := d.Set("job_id", job.Id); err != nil {
return fmt.Errorf("Error setting job_id: %s", err)
}

return resourceDataflowFlexTemplateJobRead(d, meta)
}

func resourceDataflowFlexTemplateJobDelete(d *schema.ResourceData, meta interface{}) error {
Expand Down
6 changes: 2 additions & 4 deletions mmv1/third_party/terraform/resources/resource_dataflow_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func resourceDataflowJobRead(d *schema.ResourceData, meta interface{}) error {
// Stream update method. Batch job changes should have been set to ForceNew via custom diff
func resourceDataflowJobUpdateByReplacement(d *schema.ResourceData, meta interface{}) error {
// Don't send an update request if only virtual fields have changes
if resourceDataflowJobIsVirtualUpdate(d) {
if resourceDataflowJobIsVirtualUpdate(d, resourceDataflowJob().Schema) {
return nil
}

Expand Down Expand Up @@ -580,11 +580,9 @@ func resourceDataflowJobIterateMapHasChange(mapKey string, d *schema.ResourceDat
return false
}

func resourceDataflowJobIsVirtualUpdate(d *schema.ResourceData) bool {
func resourceDataflowJobIsVirtualUpdate(d *schema.ResourceData, resourceSchema map[string]*schema.Schema) bool {
// on_delete is the only virtual field
if d.HasChange("on_delete") {
// Check if other fields have changes, which would require an actual update request
resourceSchema := resourceDataflowJob().Schema
for field := range resourceSchema {
if field == "on_delete" {
continue
Expand Down
Loading