Skip to content

Commit

Permalink
google_dataflow_job - when updating, wait for new job to start
Browse files Browse the repository at this point in the history
This patch modifies the update-by-replacement logic to wait for the new
job to start before updating the google_dataflow_job's resource ID to
point to the new job's ID. This ensures that the google_dataflow_job
resource continues to point to the original job if the update operation
were to fail.
  • Loading branch information
jcanseco committed Jun 2, 2020
1 parent 51d183e commit 540663a
Showing 1 changed file with 38 additions and 0 deletions.
38 changes: 38 additions & 0 deletions third_party/terraform/resources/resource_dataflow_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func resourceDataflowJob() *schema.Resource {
Read: resourceDataflowJobRead,
Update: resourceDataflowJobUpdateByReplacement,
Delete: resourceDataflowJobDelete,
Timeouts: &schema.ResourceTimeout{
Update: schema.DefaultTimeout(10 * time.Minute),
},
CustomizeDiff: customdiff.All(
resourceDataflowJobTypeCustomizeDiff,
),
Expand Down Expand Up @@ -309,6 +312,11 @@ func resourceDataflowJobUpdateByReplacement(d *schema.ResourceData, meta interfa
if err != nil {
return err
}

if err := waitForDataflowJobToBeUpdated(d, config, response.Job.Id, d.Timeout(schema.TimeoutUpdate)); err != nil {
return fmt.Errorf("Error updating job with job ID %q: %v", d.Id(), err)
}

d.SetId(response.Job.Id)

return resourceDataflowJobRead(d, meta)
Expand Down Expand Up @@ -499,3 +507,33 @@ func resourceDataflowJobIsVirtualUpdate(d *schema.ResourceData) bool {

return false
}

func waitForDataflowJobToBeUpdated(d *schema.ResourceData, config *Config, replacementJobID string, timeout time.Duration) error {
return resource.Retry(timeout, func() *resource.RetryError {
project, err := getProject(d, config)
if err != nil {
return resource.NonRetryableError(err)
}

region, err := getRegion(d, config)
if err != nil {
return resource.NonRetryableError(err)
}

replacementJob, err := resourceDataflowJobGetJob(config, project, region, replacementJobID)
if err != nil {
return resource.NonRetryableError(err)
}

state := replacementJob.CurrentState
switch state {
case "", "JOB_STATE_PENDING":
return resource.RetryableError(fmt.Errorf("the replacement job with ID %q has pending state %q.", replacementJobID, state))
case "JOB_STATE_FAILED":
return resource.NonRetryableError(fmt.Errorf("the replacement job with ID %q failed with state %q.", replacementJobID, state))
default:
log.Printf("[DEBUG] the replacement job with ID %q has state %q.", replacementJobID, state)
return nil
}
})
}

0 comments on commit 540663a

Please sign in to comment.