Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix dynamodb make a DescribeTable call per replica #35630

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/35630.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
resource/aws_dynamodb_table: Ensure that `replica`s are always set on Read
```
8 changes: 8 additions & 0 deletions internal/service/dynamodb/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package dynamodb

const (
errCodeValidationException = "ValidationException"
)
5 changes: 4 additions & 1 deletion internal/service/dynamodb/exports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@ package dynamodb

// Exports for use in tests only.
var (
ListTags = listTags
ResourceKinesisStreamingDestination = resourceKinesisStreamingDestination

FindKinesisDataStreamDestinationByTwoPartKey = findKinesisDataStreamDestinationByTwoPartKey
ListTags = listTags
)
31 changes: 0 additions & 31 deletions internal/service/dynamodb/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,37 +13,6 @@ import (
"github.com/hashicorp/terraform-provider-aws/internal/tfresource"
)

func FindKinesisDataStreamDestination(ctx context.Context, conn *dynamodb.DynamoDB, streamArn, tableName string) (*dynamodb.KinesisDataStreamDestination, error) {
input := &dynamodb.DescribeKinesisStreamingDestinationInput{
TableName: aws.String(tableName),
}

output, err := conn.DescribeKinesisStreamingDestinationWithContext(ctx, input)

if err != nil {
return nil, err
}

if output == nil {
return nil, nil
}

var result *dynamodb.KinesisDataStreamDestination

for _, destination := range output.KinesisDataStreamDestinations {
if destination == nil {
continue
}

if aws.StringValue(destination.StreamArn) == streamArn {
result = destination
break
}
}

return result, nil
}

func FindTableByName(ctx context.Context, conn *dynamodb.DynamoDB, name string) (*dynamodb.TableDescription, error) {
input := &dynamodb.DescribeTableInput{
TableName: aws.String(name),
Expand Down
201 changes: 155 additions & 46 deletions internal/service/dynamodb/kinesis_streaming_destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,31 @@ package dynamodb

import (
"context"
"fmt"
"errors"
"log"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/hashicorp/aws-sdk-go-base/v2/awsv1shim/v2/tfawserr"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/retry"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-provider-aws/internal/conns"
"github.com/hashicorp/terraform-provider-aws/internal/errs"
"github.com/hashicorp/terraform-provider-aws/internal/errs/sdkdiag"
"github.com/hashicorp/terraform-provider-aws/internal/flex"
tfslices "github.com/hashicorp/terraform-provider-aws/internal/slices"
"github.com/hashicorp/terraform-provider-aws/internal/tfresource"
"github.com/hashicorp/terraform-provider-aws/internal/verify"
)

// @SDKResource("aws_dynamodb_kinesis_streaming_destination")
func ResourceKinesisStreamingDestination() *schema.Resource {
const (
kinesisStreamingDestinationResourceIDPartCount = 2
)

// @SDKResource("aws_dynamodb_kinesis_streaming_destination", name="Kinesis Streaming Destination")
func resourceKinesisStreamingDestination() *schema.Resource {
return &schema.Resource{
CreateWithoutTimeout: resourceKinesisStreamingDestinationCreate,
ReadWithoutTimeout: resourceKinesisStreamingDestinationRead,
Expand Down Expand Up @@ -51,63 +60,49 @@ func resourceKinesisStreamingDestinationCreate(ctx context.Context, d *schema.Re

conn := meta.(*conns.AWSClient).DynamoDBConn(ctx)

streamArn := d.Get("stream_arn").(string)
streamARN := d.Get("stream_arn").(string)
tableName := d.Get("table_name").(string)

id := errs.Must(flex.FlattenResourceId([]string{tableName, streamARN}, kinesisStreamingDestinationResourceIDPartCount, false))
input := &dynamodb.EnableKinesisStreamingDestinationInput{
StreamArn: aws.String(streamArn),
StreamArn: aws.String(streamARN),
TableName: aws.String(tableName),
}

output, err := conn.EnableKinesisStreamingDestinationWithContext(ctx, input)
_, err := conn.EnableKinesisStreamingDestinationWithContext(ctx, input)

if err != nil {
return sdkdiag.AppendErrorf(diags, "enabling DynamoDB Kinesis streaming destination (stream: %s, table: %s): %s", streamArn, tableName, err)
return sdkdiag.AppendErrorf(diags, "enabling DynamoDB Kinesis Streaming Destination (%s): %s", id, err)
}

if output == nil {
return sdkdiag.AppendErrorf(diags, "enabling DynamoDB Kinesis streaming destination (stream: %s, table: %s): empty output", streamArn, tableName)
}
d.SetId(id)

if err := waitKinesisStreamingDestinationActive(ctx, conn, streamArn, tableName); err != nil {
return sdkdiag.AppendErrorf(diags, "waiting for DynamoDB Kinesis streaming destination (stream: %s, table: %s) to be active: %s", streamArn, tableName, err)
if _, err := waitKinesisStreamingDestinationActive(ctx, conn, streamARN, tableName); err != nil {
return sdkdiag.AppendErrorf(diags, "waiting for DynamoDB Kinesis Streaming Destination (%s) create: %s", d.Id(), err)
}

d.SetId(fmt.Sprintf("%s,%s", aws.StringValue(output.TableName), aws.StringValue(output.StreamArn)))

return append(diags, resourceKinesisStreamingDestinationRead(ctx, d, meta)...)
}

func resourceKinesisStreamingDestinationRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
var diags diag.Diagnostics

conn := meta.(*conns.AWSClient).DynamoDBConn(ctx)

tableName, streamArn, err := KinesisStreamingDestinationParseID(d.Id())

parts, err := flex.ExpandResourceId(d.Id(), kinesisStreamingDestinationResourceIDPartCount, false)
if err != nil {
return sdkdiag.AppendFromErr(diags, err)
}

output, err := FindKinesisDataStreamDestination(ctx, conn, streamArn, tableName)
tableName, streamARN := parts[0], parts[1]
output, err := findKinesisDataStreamDestinationByTwoPartKey(ctx, conn, streamARN, tableName)

if !d.IsNewResource() && tfawserr.ErrCodeEquals(err, dynamodb.ErrCodeResourceNotFoundException) {
log.Printf("[WARN] DynamoDB Kinesis Streaming Destination (stream: %s, table: %s) not found, removing from state", streamArn, tableName)
if !d.IsNewResource() && tfresource.NotFound(err) {
log.Printf("[WARN] DynamoDB Kinesis Streaming Destination (%s) not found, removing from state", d.Id())
d.SetId("")
return diags
}

if err != nil {
return sdkdiag.AppendErrorf(diags, "retrieving DynamoDB Kinesis streaming destination (stream: %s, table: %s): %s", streamArn, tableName, err)
}

if output == nil || aws.StringValue(output.DestinationStatus) == dynamodb.DestinationStatusDisabled {
if d.IsNewResource() {
return sdkdiag.AppendErrorf(diags, "retrieving DynamoDB Kinesis streaming destination (stream: %s, table: %s): empty output after creation", streamArn, tableName)
}
log.Printf("[WARN] DynamoDB Kinesis Streaming Destination (stream: %s, table: %s) not found, removing from state", streamArn, tableName)
d.SetId("")
return diags
return sdkdiag.AppendErrorf(diags, "reading DynamoDB Kinesis Streaming Destination (%s): %s", d.Id(), err)
}

d.Set("stream_arn", output.StreamArn)
Expand All @@ -121,36 +116,150 @@ func resourceKinesisStreamingDestinationDelete(ctx context.Context, d *schema.Re

conn := meta.(*conns.AWSClient).DynamoDBConn(ctx)

tableName, streamArn, err := KinesisStreamingDestinationParseID(d.Id())

parts, err := flex.ExpandResourceId(d.Id(), kinesisStreamingDestinationResourceIDPartCount, false)
if err != nil {
return sdkdiag.AppendFromErr(diags, err)
}

input := &dynamodb.DisableKinesisStreamingDestinationInput{
TableName: aws.String(tableName),
StreamArn: aws.String(streamArn),
tableName, streamARN := parts[0], parts[1]
_, err = findKinesisDataStreamDestinationByTwoPartKey(ctx, conn, streamARN, tableName)

if tfresource.NotFound(err) {
return diags
}

_, err = conn.DisableKinesisStreamingDestinationWithContext(ctx, input)
log.Printf("[DEBUG] Deleting DynamoDB Kinesis Streaming Destination: %s", d.Id())
_, err = conn.DisableKinesisStreamingDestinationWithContext(ctx, &dynamodb.DisableKinesisStreamingDestinationInput{
TableName: aws.String(tableName),
StreamArn: aws.String(streamARN),
})

if tfawserr.ErrCodeEquals(err, dynamodb.ErrCodeResourceNotFoundException) {
return diags
}

if err != nil {
return sdkdiag.AppendErrorf(diags, "disabling DynamoDB Kinesis streaming destination (stream: %s, table: %s): %s", streamArn, tableName, err)
return sdkdiag.AppendErrorf(diags, "disabling DynamoDB Kinesis Streaming Destination (%s): %s", d.Id(), err)
}

if err := waitKinesisStreamingDestinationDisabled(ctx, conn, streamArn, tableName); err != nil {
return sdkdiag.AppendErrorf(diags, "waiting for DynamoDB Kinesis streaming destination (stream: %s, table: %s) to be disabled: %s", streamArn, tableName, err)
if _, err := waitKinesisStreamingDestinationDisabled(ctx, conn, streamARN, tableName); err != nil {
return sdkdiag.AppendErrorf(diags, "waiting for DynamoDB Kinesis Streaming Destination (%s) delete: %s", d.Id(), err)
}

return diags
}

func KinesisStreamingDestinationParseID(id string) (string, string, error) {
parts := strings.SplitN(id, ",", 2)
func kinesisDataStreamDestinationForStream(arn string) tfslices.Predicate[*dynamodb.KinesisDataStreamDestination] {
return func(v *dynamodb.KinesisDataStreamDestination) bool {
return aws.StringValue(v.StreamArn) == arn
}
}

func findKinesisDataStreamDestinationByTwoPartKey(ctx context.Context, conn *dynamodb.DynamoDB, streamARN, tableName string) (*dynamodb.KinesisDataStreamDestination, error) {
input := &dynamodb.DescribeKinesisStreamingDestinationInput{
TableName: aws.String(tableName),
}
output, err := findKinesisDataStreamDestination(ctx, conn, input, kinesisDataStreamDestinationForStream(streamARN))

if err != nil {
return nil, err
}

if aws.StringValue(output.DestinationStatus) == dynamodb.DestinationStatusDisabled {
return nil, &retry.NotFoundError{}
}

return output, nil
}

func findKinesisDataStreamDestination(ctx context.Context, conn *dynamodb.DynamoDB, input *dynamodb.DescribeKinesisStreamingDestinationInput, filter tfslices.Predicate[*dynamodb.KinesisDataStreamDestination]) (*dynamodb.KinesisDataStreamDestination, error) {
output, err := findKinesisDataStreamDestinations(ctx, conn, input, filter)

if err != nil {
return nil, err
}

return tfresource.AssertSinglePtrResult(output)
}

func findKinesisDataStreamDestinations(ctx context.Context, conn *dynamodb.DynamoDB, input *dynamodb.DescribeKinesisStreamingDestinationInput, filter tfslices.Predicate[*dynamodb.KinesisDataStreamDestination]) ([]*dynamodb.KinesisDataStreamDestination, error) {
output, err := conn.DescribeKinesisStreamingDestinationWithContext(ctx, input)

if tfawserr.ErrCodeEquals(err, dynamodb.ErrCodeResourceNotFoundException) {
return nil, &retry.NotFoundError{
LastError: err,
LastRequest: input,
}
}

if err != nil {
return nil, err
}

if output == nil {
return nil, tfresource.NewEmptyResultError(input)
}

return tfslices.Filter(output.KinesisDataStreamDestinations, filter), nil
}

func statusKinesisStreamingDestination(ctx context.Context, conn *dynamodb.DynamoDB, streamARN, tableName string) retry.StateRefreshFunc {
return func() (interface{}, string, error) {
input := &dynamodb.DescribeKinesisStreamingDestinationInput{
TableName: aws.String(tableName),
}
output, err := findKinesisDataStreamDestination(ctx, conn, input, kinesisDataStreamDestinationForStream(streamARN))

if tfresource.NotFound(err) {
return nil, "", nil
}

if err != nil {
return nil, "", err
}

return output, aws.StringValue(output.DestinationStatus), nil
}
}

func waitKinesisStreamingDestinationActive(ctx context.Context, conn *dynamodb.DynamoDB, streamARN, tableName string) (*dynamodb.KinesisDataStreamDestination, error) {
const (
timeout = 5 * time.Minute
)
stateConf := &retry.StateChangeConf{
Pending: []string{dynamodb.DestinationStatusDisabled, dynamodb.DestinationStatusEnabling},
Target: []string{dynamodb.DestinationStatusActive},
Timeout: timeout,
Refresh: statusKinesisStreamingDestination(ctx, conn, streamARN, tableName),
}

outputRaw, err := stateConf.WaitForStateContext(ctx)

if output, ok := outputRaw.(*dynamodb.KinesisDataStreamDestination); ok {
tfresource.SetLastError(err, errors.New(aws.StringValue(output.DestinationStatusDescription)))
return output, err
}

return nil, err
}

func waitKinesisStreamingDestinationDisabled(ctx context.Context, conn *dynamodb.DynamoDB, streamARN, tableName string) (*dynamodb.KinesisDataStreamDestination, error) {
const (
timeout = 5 * time.Minute
)
stateConf := &retry.StateChangeConf{
Pending: []string{dynamodb.DestinationStatusActive, dynamodb.DestinationStatusDisabling},
Target: []string{dynamodb.DestinationStatusDisabled},
Timeout: timeout,
Refresh: statusKinesisStreamingDestination(ctx, conn, streamARN, tableName),
}

outputRaw, err := stateConf.WaitForStateContext(ctx)

if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
return "", "", fmt.Errorf("unexpected format of ID (%s), expected TABLE_NAME,STREAM_ARN", id)
if output, ok := outputRaw.(*dynamodb.KinesisDataStreamDestination); ok {
tfresource.SetLastError(err, errors.New(aws.StringValue(output.DestinationStatusDescription)))
return output, err
}

return parts[0], parts[1], nil
return nil, err
}
Loading
Loading