Skip to content

Commit

Permalink
google_dataflow_job - when updating, wait for new job to start (#3591) (
Browse files Browse the repository at this point in the history
#6534)

This patch modifies the update-by-replacement logic to wait for the new
job to start before updating the google_dataflow_job's resource ID to
point to the new job's ID. This ensures that the google_dataflow_job
resource continues to point to the original job if the update operation
were to fail.

Signed-off-by: Modular Magician <magic-modules@google.com>
  • Loading branch information
modular-magician authored Jun 4, 2020
1 parent 0c36aa3 commit 5b06674
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .changelog/3591.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
dataflow: changed the update logic for `google_dataflow_job` to wait for the replacement job to start successfully before modifying the resource ID to point to the replacement job
```
38 changes: 38 additions & 0 deletions google/resource_dataflow_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func resourceDataflowJob() *schema.Resource {
Read: resourceDataflowJobRead,
Update: resourceDataflowJobUpdateByReplacement,
Delete: resourceDataflowJobDelete,
Timeouts: &schema.ResourceTimeout{
Update: schema.DefaultTimeout(10 * time.Minute),
},
CustomizeDiff: customdiff.All(
resourceDataflowJobTypeCustomizeDiff,
),
Expand Down Expand Up @@ -309,6 +312,11 @@ func resourceDataflowJobUpdateByReplacement(d *schema.ResourceData, meta interfa
if err != nil {
return err
}

if err := waitForDataflowJobToBeUpdated(d, config, response.Job.Id, d.Timeout(schema.TimeoutUpdate)); err != nil {
return fmt.Errorf("Error updating job with job ID %q: %v", d.Id(), err)
}

d.SetId(response.Job.Id)

return resourceDataflowJobRead(d, meta)
Expand Down Expand Up @@ -499,3 +507,33 @@ func resourceDataflowJobIsVirtualUpdate(d *schema.ResourceData) bool {

return false
}

func waitForDataflowJobToBeUpdated(d *schema.ResourceData, config *Config, replacementJobID string, timeout time.Duration) error {
return resource.Retry(timeout, func() *resource.RetryError {
project, err := getProject(d, config)
if err != nil {
return resource.NonRetryableError(err)
}

region, err := getRegion(d, config)
if err != nil {
return resource.NonRetryableError(err)
}

replacementJob, err := resourceDataflowJobGetJob(config, project, region, replacementJobID)
if err != nil {
return resource.NonRetryableError(err)
}

state := replacementJob.CurrentState
switch state {
case "", "JOB_STATE_PENDING":
return resource.RetryableError(fmt.Errorf("the replacement job with ID %q has pending state %q.", replacementJobID, state))
case "JOB_STATE_FAILED":
return resource.NonRetryableError(fmt.Errorf("the replacement job with ID %q failed with state %q.", replacementJobID, state))
default:
log.Printf("[DEBUG] the replacement job with ID %q has state %q.", replacementJobID, state)
return nil
}
})
}

0 comments on commit 5b06674

Please sign in to comment.