Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle failed flex job template updates #3279

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions google-beta/resource_dataflow_flex_template_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func resourceDataflowFlexTemplateJobUpdate(d *schema.ResourceData, meta interfac
// wait until current job is running or terminated
err = waitForDataflowJobState(d, config, d.Id(), userAgent, d.Timeout(schema.TimeoutUpdate), "JOB_STATE_RUNNING")
if err != nil {
return fmt.Errorf("Error waiting for job with job ID %q to be running: %v", d.Id(), err)
return fmt.Errorf("Error waiting for job with job ID %q to be running: %s", d.Id(), err)
}

request := dataflow.LaunchFlexTemplateRequest{
Expand All @@ -251,13 +251,15 @@ func resourceDataflowFlexTemplateJobUpdate(d *schema.ResourceData, meta interfac
return err
}

// don't set id until previous job is successfully updated
err = waitForDataflowJobState(d, config, d.Id(), userAgent, d.Timeout(schema.TimeoutUpdate), "JOB_STATE_UPDATED")
// don't set id until new job is successfully running
job := response.Job
err = waitForDataflowJobState(d, config, job.Id, userAgent, d.Timeout(schema.TimeoutUpdate), "JOB_STATE_RUNNING")
if err != nil {
return fmt.Errorf("Error waiting for Job with job ID %q to be updated: %v", d.Id(), err)
// the default behavior is to overwrite the resource's state with the state of the "new" job, even though we are returning an error here. this call to Partial prevents this behavior
d.Partial(true)
return fmt.Errorf("Error waiting for Job with job ID %q to be updated: %s", job.Id, err)
}

job := response.Job
d.SetId(job.Id)
if err := d.Set("job_id", job.Id); err != nil {
return fmt.Errorf("Error setting job_id: %s", err)
Expand Down
40 changes: 38 additions & 2 deletions google-beta/resource_dataflow_flex_template_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package google

import (
"fmt"
"regexp"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -65,6 +66,37 @@ func TestAccDataflowFlexTemplateJob_streamUpdate(t *testing.T) {
})
}

func TestAccDataflowFlexTemplateJob_streamUpdateFail(t *testing.T) {
// This resource uses custom retry logic that cannot be sped up without
// modifying the actual resource
skipIfVcr(t)
t.Parallel()

randStr := randString(t, 10)
job := "tf-test-dataflow-job-" + randStr

vcrTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckDataflowJobDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccDataflowFlexTemplateJob_basic(job, "mytopic"),
Check: resource.ComposeTestCheckFunc(
testAccDataflowJobExists(t, "google_dataflow_flex_template_job.job"),
),
},
{
Config: testAccDataflowFlexTemplateJob_basic(job, ""),
Check: resource.ComposeTestCheckFunc(
testAccDataflowJobHasOption(t, "google_dataflow_flex_template_job.job", "topic", "projects/myproject/topics/mytopic"),
),
ExpectError: regexp.MustCompile(`Error waiting for Job with job ID "[^"]+" to be updated: the job with ID "[^"]+" has terminated with state "JOB_STATE_FAILED" instead of expected state "JOB_STATE_RUNNING"`),
},
},
})
}

func TestAccDataflowFlexTemplateJob_withServiceAccount(t *testing.T) {
// Dataflow responses include serialized java classes and bash commands
// This makes body comparison infeasible
Expand Down Expand Up @@ -153,6 +185,10 @@ func testAccDataflowFlexTemplateJobGetGeneratedInstance(t *testing.T, s *terrafo

// note: this config creates a job that doesn't actually do anything, but still runs
func testAccDataflowFlexTemplateJob_basic(job, topicName string) string {
topicField := ""
if topicName != "" {
topicField = fmt.Sprintf("topic = \"projects/myproject/topics/%s\"", topicName)
}
return fmt.Sprintf(`
data "google_storage_bucket_object" "flex_template" {
name = "latest/flex/Streaming_Data_Generator"
Expand All @@ -165,10 +201,10 @@ resource "google_dataflow_flex_template_job" "job" {
parameters = {
schemaLocation = "gs://mybucket/schema.json"
qps = "1"
topic = "projects/myproject/topics/%s"
%s
}
}
`, job, topicName)
`, job, topicField)
}

// note: this config creates a job that doesn't actually do anything, but still runs
Expand Down