Skip to content

Commit

Permalink
Handle failed flex template job updates (#4847)
Browse files Browse the repository at this point in the history
  • Loading branch information
andremarianiello authored Jun 7, 2021
1 parent 4a2572f commit b2336f7
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func resourceDataflowFlexTemplateJobUpdate(d *schema.ResourceData, meta interfac
// wait until current job is running or terminated
err = waitForDataflowJobState(d, config, d.Id(), userAgent, d.Timeout(schema.TimeoutUpdate), "JOB_STATE_RUNNING")
if err != nil {
return fmt.Errorf("Error waiting for job with job ID %q to be running: %v", d.Id(), err)
return fmt.Errorf("Error waiting for job with job ID %q to be running: %s", d.Id(), err)
}

request := dataflow.LaunchFlexTemplateRequest{
Expand All @@ -257,13 +257,15 @@ func resourceDataflowFlexTemplateJobUpdate(d *schema.ResourceData, meta interfac
return err
}

// don't set id until previous job is successfully updated
err = waitForDataflowJobState(d, config, d.Id(), userAgent, d.Timeout(schema.TimeoutUpdate), "JOB_STATE_UPDATED")
// don't set id until new job is successfully running
job := response.Job
err = waitForDataflowJobState(d, config, job.Id, userAgent, d.Timeout(schema.TimeoutUpdate), "JOB_STATE_RUNNING")
if err != nil {
return fmt.Errorf("Error waiting for Job with job ID %q to be updated: %v", d.Id(), err)
// the default behavior is to overwrite the resource's state with the state of the "new" job, even though we are returning an error here. this call to Partial prevents this behavior
d.Partial(true)
return fmt.Errorf("Error waiting for Job with job ID %q to be updated: %s", job.Id, err)
}

job := response.Job
d.SetId(job.Id)
if err := d.Set("job_id", job.Id); err != nil {
return fmt.Errorf("Error setting job_id: %s", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package google

import (
"fmt"
"regexp"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -67,6 +68,37 @@ func TestAccDataflowFlexTemplateJob_streamUpdate(t *testing.T) {
})
}

func TestAccDataflowFlexTemplateJob_streamUpdateFail(t *testing.T) {
// This resource uses custom retry logic that cannot be sped up without
// modifying the actual resource
skipIfVcr(t)
t.Parallel()

randStr := randString(t, 10)
job := "tf-test-dataflow-job-" + randStr

vcrTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckDataflowJobDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccDataflowFlexTemplateJob_basic(job, "mytopic"),
Check: resource.ComposeTestCheckFunc(
testAccDataflowJobExists(t, "google_dataflow_flex_template_job.job"),
),
},
{
Config: testAccDataflowFlexTemplateJob_basic(job, ""),
Check: resource.ComposeTestCheckFunc(
testAccDataflowJobHasOption(t, "google_dataflow_flex_template_job.job", "topic", "projects/myproject/topics/mytopic"),
),
ExpectError: regexp.MustCompile(`Error waiting for Job with job ID "[^"]+" to be updated: the job with ID "[^"]+" has terminated with state "JOB_STATE_FAILED" instead of expected state "JOB_STATE_RUNNING"`),
},
},
})
}

func TestAccDataflowFlexTemplateJob_withServiceAccount(t *testing.T) {
// Dataflow responses include serialized java classes and bash commands
// This makes body comparison infeasible
Expand Down Expand Up @@ -155,6 +187,10 @@ func testAccDataflowFlexTemplateJobGetGeneratedInstance(t *testing.T, s *terrafo

// note: this config creates a job that doesn't actually do anything, but still runs
func testAccDataflowFlexTemplateJob_basic(job, topicName string) string {
topicField := ""
if topicName != "" {
topicField = fmt.Sprintf("topic = \"projects/myproject/topics/%s\"", topicName)
}
return fmt.Sprintf(`
data "google_storage_bucket_object" "flex_template" {
name = "latest/flex/Streaming_Data_Generator"
Expand All @@ -167,10 +203,10 @@ resource "google_dataflow_flex_template_job" "job" {
parameters = {
schemaLocation = "gs://mybucket/schema.json"
qps = "1"
topic = "projects/myproject/topics/%s"
%s
}
}
`, job, topicName)
`, job, topicField)
}

// note: this config creates a job that doesn't actually do anything, but still runs
Expand Down

0 comments on commit b2336f7

Please sign in to comment.