Skip to content

Commit

Permalink
Merge pull request #28310 from taharah/cw-metric-streams
Browse files Browse the repository at this point in the history
r/aws_cloudwatch_metric_stream: fix tag updates
  • Loading branch information
ewbankkit authored Dec 13, 2022
2 parents bfcd55c + 70c1916 commit c704d7e
Show file tree
Hide file tree
Showing 5 changed files with 302 additions and 280 deletions.
3 changes: 3 additions & 0 deletions .changelog/28310.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
resource/aws_cloudwatch_metric_stream: Correctly update `tags`
```
227 changes: 173 additions & 54 deletions internal/service/cloudwatch/metric_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cloudwatch

import (
"context"
"fmt"
"log"
"regexp"
"time"
Expand All @@ -11,29 +10,32 @@ import (
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/hashicorp/aws-sdk-go-base/v2/awsv1shim/v2/tfawserr"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
"github.com/hashicorp/terraform-provider-aws/internal/conns"
"github.com/hashicorp/terraform-provider-aws/internal/create"
"github.com/hashicorp/terraform-provider-aws/internal/flex"
tftags "github.com/hashicorp/terraform-provider-aws/internal/tags"
"github.com/hashicorp/terraform-provider-aws/internal/tfresource"
"github.com/hashicorp/terraform-provider-aws/internal/verify"
)

func ResourceMetricStream() *schema.Resource {
return &schema.Resource{
CreateContext: resourceMetricStreamCreate,
ReadContext: resourceMetricStreamRead,
UpdateContext: resourceMetricStreamCreate,
DeleteContext: resourceMetricStreamDelete,
CreateWithoutTimeout: resourceMetricStreamCreate,
ReadWithoutTimeout: resourceMetricStreamRead,
UpdateWithoutTimeout: resourceMetricStreamUpdate,
DeleteWithoutTimeout: resourceMetricStreamDelete,

Importer: &schema.ResourceImporter{
StateContext: schema.ImportStatePassthroughContext,
},

Timeouts: &schema.ResourceTimeout{
Create: schema.DefaultTimeout(MetricStreamReadyTimeout),
Delete: schema.DefaultTimeout(MetricStreamDeleteTimeout),
Create: schema.DefaultTimeout(1 * time.Minute),
Update: schema.DefaultTimeout(1 * time.Minute),
Delete: schema.DefaultTimeout(2 * time.Minute),
},

CustomizeDiff: verify.SetTagsDiff,
Expand Down Expand Up @@ -175,51 +177,52 @@ func resourceMetricStreamCreate(ctx context.Context, d *schema.ResourceData, met
tags := defaultTagsConfig.MergeTags(tftags.New(d.Get("tags").(map[string]interface{})))

name := create.Name(d.Get("name").(string), d.Get("name_prefix").(string))

params := cloudwatch.PutMetricStreamInput{
Name: aws.String(name),
input := &cloudwatch.PutMetricStreamInput{
FirehoseArn: aws.String(d.Get("firehose_arn").(string)),
RoleArn: aws.String(d.Get("role_arn").(string)),
Name: aws.String(name),
OutputFormat: aws.String(d.Get("output_format").(string)),
RoleArn: aws.String(d.Get("role_arn").(string)),
}

if len(tags) > 0 {
params.Tags = Tags(tags.IgnoreAWS())
if v, ok := d.GetOk("exclude_filter"); ok && v.(*schema.Set).Len() > 0 {
input.ExcludeFilters = expandMetricStreamFilters(v.(*schema.Set))
}

if v, ok := d.GetOk("include_filter"); ok && v.(*schema.Set).Len() > 0 {
params.IncludeFilters = expandMetricStreamFilters(v.(*schema.Set))
input.IncludeFilters = expandMetricStreamFilters(v.(*schema.Set))
}

if v, ok := d.GetOk("exclude_filter"); ok && v.(*schema.Set).Len() > 0 {
params.ExcludeFilters = expandMetricStreamFilters(v.(*schema.Set))
if v, ok := d.GetOk("statistics_configuration"); ok && v.(*schema.Set).Len() > 0 {
input.StatisticsConfigurations = expandMetricStreamStatisticsConfigurations(v.(*schema.Set))
}

if v, ok := d.GetOk("statistics_configuration"); ok && v.(*schema.Set).Len() > 0 {
params.StatisticsConfigurations = expandMetricStreamStatisticsConfigurations(v.(*schema.Set))
if len(tags) > 0 {
input.Tags = Tags(tags.IgnoreAWS())
}

log.Printf("[DEBUG] Putting CloudWatch Metric Stream: %#v", params)
output, err := conn.PutMetricStreamWithContext(ctx, &params)
output, err := conn.PutMetricStreamWithContext(ctx, input)

// Some partitions (i.e., ISO) may not support tag-on-create
if params.Tags != nil && verify.ErrorISOUnsupported(conn.PartitionID, err) {
if input.Tags != nil && verify.ErrorISOUnsupported(conn.PartitionID, err) {
log.Printf("[WARN] failed creating CloudWatch Metric Stream (%s) with tags: %s. Trying create without tags.", name, err)
params.Tags = nil
input.Tags = nil

output, err = conn.PutMetricStreamWithContext(ctx, &params)
output, err = conn.PutMetricStreamWithContext(ctx, input)
}

if err != nil {
return diag.Errorf("failed creating CloudWatch Metric Stream (%s): %s", name, err)
return diag.Errorf("creating CloudWatch Metric Stream (%s): %s", name, err)
}

d.SetId(name)
log.Println("[INFO] CloudWatch Metric Stream put finished")

if _, err := waitMetricStreamRunning(ctx, conn, d.Id(), d.Timeout(schema.TimeoutCreate)); err != nil {
return diag.Errorf("waiting for CloudWatch Metric Stream (%s) create: %s", d.Id(), err)
}

// Some partitions (i.e., ISO) may not support tag-on-create, attempt tag after create
if params.Tags == nil && len(tags) > 0 {
err := UpdateTags(conn, aws.StringValue(output.Arn), nil, tags)
if input.Tags == nil && len(tags) > 0 {
err := UpdateTagsWithContext(ctx, conn, aws.StringValue(output.Arn), nil, tags)

// If default tags only, log and continue. Otherwise, error.
if v, ok := d.GetOk("tags"); (!ok || len(v.(map[string]interface{})) == 0) && verify.ErrorISOUnsupported(conn.PartitionID, err) {
Expand All @@ -228,7 +231,7 @@ func resourceMetricStreamCreate(ctx context.Context, d *schema.ResourceData, met
}

if err != nil {
return diag.Errorf("failed adding tags after create for CloudWatch Metric Stream (%s): %s", d.Id(), err)
return diag.Errorf("adding tags after create for CloudWatch Metric Stream (%s): %s", d.Id(), err)
}
}

Expand All @@ -240,20 +243,16 @@ func resourceMetricStreamRead(ctx context.Context, d *schema.ResourceData, meta
defaultTagsConfig := meta.(*conns.AWSClient).DefaultTagsConfig
ignoreTagsConfig := meta.(*conns.AWSClient).IgnoreTagsConfig

output, err := WaitMetricStreamReady(ctx, conn, d.Id())
output, err := FindMetricStreamByName(ctx, conn, d.Id())

if !d.IsNewResource() && tfawserr.ErrCodeEquals(err, cloudwatch.ErrCodeResourceNotFoundException) {
if !d.IsNewResource() && tfresource.NotFound(err) {
log.Printf("[WARN] CloudWatch Metric Stream (%s) not found, removing from state", d.Id())
d.SetId("")
return nil
}

if err != nil {
return diag.FromErr(fmt.Errorf("error getting CloudWatch Metric Stream (%s): %w", d.Id(), err))
}

if output == nil {
return diag.FromErr(fmt.Errorf("error getting CloudWatch Metric Stream (%s): empty response", d.Id()))
return diag.Errorf("reading CloudWatch Metric Stream (%s): %s", d.Id(), err)
}

d.Set("arn", output.Arn)
Expand All @@ -268,23 +267,23 @@ func resourceMetricStreamRead(ctx context.Context, d *schema.ResourceData, meta

if output.IncludeFilters != nil {
if err := d.Set("include_filter", flattenMetricStreamFilters(output.IncludeFilters)); err != nil {
return diag.FromErr(fmt.Errorf("error setting include_filter error: %w", err))
return diag.Errorf("setting include_filter: %s", err)
}
}

if output.ExcludeFilters != nil {
if err := d.Set("exclude_filter", flattenMetricStreamFilters(output.ExcludeFilters)); err != nil {
return diag.FromErr(fmt.Errorf("error setting exclude_filter error: %w", err))
return diag.Errorf("setting exclude_filter: %s", err)
}
}

if output.StatisticsConfigurations != nil {
if err := d.Set("statistics_configuration", flattenMetricStreamStatisticsConfigurations(output.StatisticsConfigurations)); err != nil {
return diag.FromErr(fmt.Errorf("error setting statistics_configuration error: %w", err))
return diag.Errorf("setting statistics_configuration: %s", err)
}
}

tags, err := ListTags(conn, aws.StringValue(output.Arn))
tags, err := ListTagsWithContext(ctx, conn, aws.StringValue(output.Arn))

// Some partitions (i.e., ISO) may not support tagging, giving error
if verify.ErrorISOUnsupported(conn.PartitionID, err) {
Expand All @@ -293,41 +292,165 @@ func resourceMetricStreamRead(ctx context.Context, d *schema.ResourceData, meta
}

if err != nil {
return diag.Errorf("failed listing tags for CloudWatch Metric Stream (%s): %s", d.Id(), err)
return diag.Errorf("listing tags for CloudWatch Metric Stream (%s): %s", d.Id(), err)
}

tags = tags.IgnoreAWS().IgnoreConfig(ignoreTagsConfig)

//lintignore:AWSR002
if err := d.Set("tags", tags.RemoveDefaultConfig(defaultTagsConfig).Map()); err != nil {
return diag.FromErr(fmt.Errorf("error setting tags: %w", err))
return diag.Errorf("setting tags: %s", err)
}

if err := d.Set("tags_all", tags.Map()); err != nil {
return diag.FromErr(fmt.Errorf("error setting tags_all: %w", err))
return diag.Errorf("setting tags_all: %s", err)
}

return nil
}

func resourceMetricStreamUpdate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
conn := meta.(*conns.AWSClient).CloudWatchConn

if d.HasChangesExcept("tags", "tags_all") {
input := &cloudwatch.PutMetricStreamInput{
FirehoseArn: aws.String(d.Get("firehose_arn").(string)),
Name: aws.String(d.Id()),
OutputFormat: aws.String(d.Get("output_format").(string)),
RoleArn: aws.String(d.Get("role_arn").(string)),
}

if v, ok := d.GetOk("exclude_filter"); ok && v.(*schema.Set).Len() > 0 {
input.ExcludeFilters = expandMetricStreamFilters(v.(*schema.Set))
}

if v, ok := d.GetOk("include_filter"); ok && v.(*schema.Set).Len() > 0 {
input.IncludeFilters = expandMetricStreamFilters(v.(*schema.Set))
}

if v, ok := d.GetOk("statistics_configuration"); ok && v.(*schema.Set).Len() > 0 {
input.StatisticsConfigurations = expandMetricStreamStatisticsConfigurations(v.(*schema.Set))
}

_, err := conn.PutMetricStreamWithContext(ctx, input)

if err != nil {
return diag.Errorf("updating CloudWatch Metric Stream (%s): %s", d.Id(), err)
}

if _, err := waitMetricStreamRunning(ctx, conn, d.Id(), d.Timeout(schema.TimeoutUpdate)); err != nil {
return diag.Errorf("waiting for CloudWatch Metric Stream (%s) update: %s", d.Id(), err)
}
}

if d.HasChange("tags_all") {
o, n := d.GetChange("tags_all")

if err := UpdateTags(conn, d.Get("arn").(string), o, n); err != nil {
log.Printf("[WARN] failed updating tags for CloudWatch Metric Stream (%s): %s", d.Id(), err)
}
}

return resourceMetricStreamRead(ctx, d, meta)
}

func resourceMetricStreamDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
log.Printf("[INFO] Deleting CloudWatch Metric Stream %s", d.Id())
conn := meta.(*conns.AWSClient).CloudWatchConn
params := cloudwatch.DeleteMetricStreamInput{

log.Printf("[INFO] Deleting CloudWatch Metric Stream: %s", d.Id())
_, err := conn.DeleteMetricStreamWithContext(ctx, &cloudwatch.DeleteMetricStreamInput{
Name: aws.String(d.Id()),
})

if err != nil {
return diag.Errorf("deleting CloudWatch Metric Stream (%s): %s", d.Id(), err)
}

if _, err := conn.DeleteMetricStreamWithContext(ctx, &params); err != nil {
return diag.FromErr(fmt.Errorf("error deleting CloudWatch Metric Stream: %s", err))
if _, err := waitMetricStreamDeleted(ctx, conn, d.Id(), d.Timeout(schema.TimeoutDelete)); err != nil {
return diag.Errorf("waiting for CloudWatch Metric Stream (%s) delete: %s", d.Id(), err)
}

if _, err := WaitMetricStreamDeleted(ctx, conn, d.Id()); err != nil {
return diag.FromErr(fmt.Errorf("error while waiting for CloudWatch Metric Stream (%s) to become deleted: %w", d.Id(), err))
return nil
}

func FindMetricStreamByName(ctx context.Context, conn *cloudwatch.CloudWatch, name string) (*cloudwatch.GetMetricStreamOutput, error) {
input := &cloudwatch.GetMetricStreamInput{
Name: aws.String(name),
}

log.Printf("[INFO] CloudWatch Metric Stream %s deleted", d.Id())
output, err := conn.GetMetricStreamWithContext(ctx, input)

return nil
if tfawserr.ErrCodeEquals(err, cloudwatch.ErrCodeResourceNotFoundException) {
return nil, &resource.NotFoundError{
LastError: err,
LastRequest: input,
}
}

if err != nil {
return nil, err
}

if output == nil {
return nil, tfresource.NewEmptyResultError(input)
}

return output, nil
}

func statusMetricStream(ctx context.Context, conn *cloudwatch.CloudWatch, name string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
output, err := FindMetricStreamByName(ctx, conn, name)

if tfresource.NotFound(err) {
return nil, "", nil
}

if err != nil {
return nil, "", err
}

return output, aws.StringValue(output.State), nil
}
}

const (
metricStreamStateRunning = "running"
metricStreamStateStopped = "stopped"
)

func waitMetricStreamDeleted(ctx context.Context, conn *cloudwatch.CloudWatch, name string, timeout time.Duration) (*cloudwatch.GetMetricStreamOutput, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{metricStreamStateRunning, metricStreamStateStopped},
Target: []string{},
Refresh: statusMetricStream(ctx, conn, name),
Timeout: timeout,
}

outputRaw, err := stateConf.WaitForStateContext(ctx)

if output, ok := outputRaw.(*cloudwatch.GetMetricStreamOutput); ok {
return output, err
}

return nil, err
}

func waitMetricStreamRunning(ctx context.Context, conn *cloudwatch.CloudWatch, name string, timeout time.Duration) (*cloudwatch.GetMetricStreamOutput, error) { //nolint:unparam
stateConf := &resource.StateChangeConf{
Pending: []string{metricStreamStateStopped},
Target: []string{metricStreamStateRunning},
Refresh: statusMetricStream(ctx, conn, name),
Timeout: timeout,
}

outputRaw, err := stateConf.WaitForStateContext(ctx)

if output, ok := outputRaw.(*cloudwatch.GetMetricStreamOutput); ok {
return output, err
}

return nil, err
}

func validateMetricStreamName(v interface{}, k string) (ws []string, errors []error) {
Expand Down Expand Up @@ -381,20 +504,16 @@ func expandMetricStreamStatisticsConfigurations(s *schema.Set) []*cloudwatch.Met
mConfiguration := configurationRaw.(map[string]interface{})

if v, ok := mConfiguration["additional_statistics"].(*schema.Set); ok && v.Len() > 0 {
log.Printf("[DEBUG] CloudWatch Metric Stream StatisticsConfigurations additional_statistics: %#v", v)
configuration.AdditionalStatistics = flex.ExpandStringSet(v)
}

if v, ok := mConfiguration["include_metric"].(*schema.Set); ok && v.Len() > 0 {
log.Printf("[DEBUG] CloudWatch Metric Stream StatisticsConfigurations include_metrics: %#v", v)
configuration.IncludeMetrics = expandMetricStreamStatisticsConfigurationsIncludeMetrics(v)
}

configurations = append(configurations, configuration)
}

log.Printf("[DEBUG] statistics_configurations: %#v", configurations)

if len(configurations) > 0 {
return configurations
}
Expand Down
Loading

0 comments on commit c704d7e

Please sign in to comment.