Skip to content

Commit

Permalink
Upstream flex template update (#4677)
Browse files Browse the repository at this point in the history
* Upstream flex template update

* Remove unused var
  • Loading branch information
slevenick authored May 11, 2021
1 parent 30beacd commit 2bce477
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ func resourceDataflowFlexTemplateJob() *schema.Resource {
"container_spec_gcs_path": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},

"name": {
Expand Down Expand Up @@ -60,15 +59,13 @@ func resourceDataflowFlexTemplateJob() *schema.Resource {
Type: schema.TypeMap,
Optional: true,
DiffSuppressFunc: resourceDataflowJobLabelDiffSuppress,
ForceNew: true,
// TODO add support for labels when the API supports it
Deprecated: "Deprecated until the API supports this field",
},

"parameters": {
Type: schema.TypeMap,
Optional: true,
ForceNew: true,
},

"project": {
Expand Down Expand Up @@ -179,10 +176,96 @@ func resourceDataflowFlexTemplateJobRead(d *schema.ResourceData, meta interface{
return nil
}

// resourceDataflowFlexTemplateJobUpdate is a blank method to enable updating
// the on_delete virtual field
func waitForDataflowJobState(d *schema.ResourceData, config *Config, jobID, userAgent string, timeout time.Duration, targetState string) error {
return resource.Retry(timeout, func() *resource.RetryError {
project, err := getProject(d, config)
if err != nil {
return resource.NonRetryableError(err)
}

region, err := getRegion(d, config)
if err != nil {
return resource.NonRetryableError(err)
}

job, err := resourceDataflowJobGetJob(config, project, region, userAgent, jobID)
if err != nil {
if isRetryableError(err) {
return resource.RetryableError(err)
}
return resource.NonRetryableError(err)
}

state := job.CurrentState
if state == targetState {
log.Printf("[DEBUG] the job with ID %q has state %q.", jobID, state)
return nil
}
if _, terminated := dataflowTerminalStatesMap[state]; terminated {
return resource.NonRetryableError(fmt.Errorf("the job with ID %q has terminated with state %q instead of expected state %q", jobID, state, targetState))
} else {
log.Printf("[DEBUG] the job with ID %q has state %q.", jobID, state)
return resource.RetryableError(fmt.Errorf("the job with ID %q has state %q, waiting for %q", jobID, state, targetState))
}
})
}

// resourceDataflowFlexTemplateJobUpdate updates a Flex Template Job resource.
func resourceDataflowFlexTemplateJobUpdate(d *schema.ResourceData, meta interface{}) error {
return nil
// Don't send an update request if only virtual fields have changes
if resourceDataflowJobIsVirtualUpdate(d, resourceDataflowFlexTemplateJob().Schema) {
return nil
}

config := meta.(*Config)
userAgent, err := generateUserAgentString(d, config.userAgent)
if err != nil {
return err
}

project, err := getProject(d, config)
if err != nil {
return err
}

region, err := getRegion(d, config)
if err != nil {
return err
}

// wait until current job is running or terminated
err = waitForDataflowJobState(d, config, d.Id(), userAgent, d.Timeout(schema.TimeoutUpdate), "JOB_STATE_RUNNING")
if err != nil {
return fmt.Errorf("Error waiting for job with job ID %q to be running: %v", d.Id(), err)
}

request := dataflow.LaunchFlexTemplateRequest{
LaunchParameter: &dataflow.LaunchFlexTemplateParameter{
ContainerSpecGcsPath: d.Get("container_spec_gcs_path").(string),
JobName: d.Get("name").(string),
Parameters: expandStringMap(d, "parameters"),
Update: true,
},
}

response, err := config.NewDataflowClient(userAgent).Projects.Locations.FlexTemplates.Launch(project, region, &request).Do()
if err != nil {
return err
}

// don't set id until previous job is successfully updated
err = waitForDataflowJobState(d, config, d.Id(), userAgent, d.Timeout(schema.TimeoutUpdate), "JOB_STATE_UPDATED")
if err != nil {
return fmt.Errorf("Error waiting for Job with job ID %q to be updated: %v", d.Id(), err)
}

job := response.Job
d.SetId(job.Id)
if err := d.Set("job_id", job.Id); err != nil {
return fmt.Errorf("Error setting job_id: %s", err)
}

return resourceDataflowFlexTemplateJobRead(d, meta)
}

func resourceDataflowFlexTemplateJobDelete(d *schema.ResourceData, meta interface{}) error {
Expand Down
6 changes: 2 additions & 4 deletions mmv1/third_party/terraform/resources/resource_dataflow_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func resourceDataflowJobRead(d *schema.ResourceData, meta interface{}) error {
// Stream update method. Batch job changes should have been set to ForceNew via custom diff
func resourceDataflowJobUpdateByReplacement(d *schema.ResourceData, meta interface{}) error {
// Don't send an update request if only virtual fields have changes
if resourceDataflowJobIsVirtualUpdate(d) {
if resourceDataflowJobIsVirtualUpdate(d, resourceDataflowJob().Schema) {
return nil
}

Expand Down Expand Up @@ -580,11 +580,9 @@ func resourceDataflowJobIterateMapHasChange(mapKey string, d *schema.ResourceDat
return false
}

func resourceDataflowJobIsVirtualUpdate(d *schema.ResourceData) bool {
func resourceDataflowJobIsVirtualUpdate(d *schema.ResourceData, resourceSchema map[string]*schema.Schema) bool {
// on_delete is the only virtual field
if d.HasChange("on_delete") {
// Check if other fields have changes, which would require an actual update request
resourceSchema := resourceDataflowJob().Schema
for field := range resourceSchema {
if field == "on_delete" {
continue
Expand Down
Loading

0 comments on commit 2bce477

Please sign in to comment.