From 5b06674cff77f237114ef5f15ceb7e2820838586 Mon Sep 17 00:00:00 2001 From: The Magician Date: Thu, 4 Jun 2020 15:11:04 -0700 Subject: [PATCH] google_dataflow_job - when updating, wait for new job to start (#3591) (#6534) This patch modifies the update-by-replacement logic to wait for the new job to start before updating the google_dataflow_job's resource ID to point to the new job's ID. This ensures that the google_dataflow_job resource continues to point to the original job if the update operation were to fail. Signed-off-by: Modular Magician --- .changelog/3591.txt | 3 +++ google/resource_dataflow_job.go | 38 +++++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+) create mode 100644 .changelog/3591.txt diff --git a/.changelog/3591.txt b/.changelog/3591.txt new file mode 100644 index 00000000000..8fd8f04aebe --- /dev/null +++ b/.changelog/3591.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +dataflow: changed the update logic for `google_dataflow_job` to wait for the replacement job to start successfully before modifying the resource ID to point to the replacement job +``` diff --git a/google/resource_dataflow_job.go b/google/resource_dataflow_job.go index d753c549f6e..f58bd07c2dd 100644 --- a/google/resource_dataflow_job.go +++ b/google/resource_dataflow_job.go @@ -47,6 +47,9 @@ func resourceDataflowJob() *schema.Resource { Read: resourceDataflowJobRead, Update: resourceDataflowJobUpdateByReplacement, Delete: resourceDataflowJobDelete, + Timeouts: &schema.ResourceTimeout{ + Update: schema.DefaultTimeout(10 * time.Minute), + }, CustomizeDiff: customdiff.All( resourceDataflowJobTypeCustomizeDiff, ), @@ -309,6 +312,11 @@ func resourceDataflowJobUpdateByReplacement(d *schema.ResourceData, meta interfa if err != nil { return err } + + if err := waitForDataflowJobToBeUpdated(d, config, response.Job.Id, d.Timeout(schema.TimeoutUpdate)); err != nil { + return fmt.Errorf("Error updating job with job ID %q: %v", d.Id(), err) + } + d.SetId(response.Job.Id) return resourceDataflowJobRead(d, meta) @@ -499,3 +507,33 @@ func resourceDataflowJobIsVirtualUpdate(d *schema.ResourceData) bool { return false } + +func waitForDataflowJobToBeUpdated(d *schema.ResourceData, config *Config, replacementJobID string, timeout time.Duration) error { + return resource.Retry(timeout, func() *resource.RetryError { + project, err := getProject(d, config) + if err != nil { + return resource.NonRetryableError(err) + } + + region, err := getRegion(d, config) + if err != nil { + return resource.NonRetryableError(err) + } + + replacementJob, err := resourceDataflowJobGetJob(config, project, region, replacementJobID) + if err != nil { + return resource.NonRetryableError(err) + } + + state := replacementJob.CurrentState + switch state { + case "", "JOB_STATE_PENDING": + return resource.RetryableError(fmt.Errorf("the replacement job with ID %q has pending state %q.", replacementJobID, state)) + case "JOB_STATE_FAILED": + return resource.NonRetryableError(fmt.Errorf("the replacement job with ID %q failed with state %q.", replacementJobID, state)) + default: + log.Printf("[DEBUG] the replacement job with ID %q has state %q.", replacementJobID, state) + return nil + } + }) +}