Skip to content

Commit

Permalink
refactoring after CR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
anGie44 committed May 6, 2021
1 parent de0b321 commit 7c0d29a
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 109 deletions.
3 changes: 3 additions & 0 deletions .changelog/16743.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:new-resource
aws_dynamodb_kinesis_streaming_destination
```
62 changes: 31 additions & 31 deletions aws/internal/service/dynamodb/finder/finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,37 @@ import (
"github.com/aws/aws-sdk-go/service/dynamodb"
)

func DynamoDBKinesisDataStreamDestination(ctx context.Context, conn *dynamodb.DynamoDB, streamArn, tableName string) (*dynamodb.KinesisDataStreamDestination, error) {
input := &dynamodb.DescribeKinesisStreamingDestinationInput{
TableName: aws.String(tableName),
}

output, err := conn.DescribeKinesisStreamingDestinationWithContext(ctx, input)

if err != nil {
return nil, err
}

if output == nil {
return nil, nil
}

var result *dynamodb.KinesisDataStreamDestination

for _, destination := range output.KinesisDataStreamDestinations {
if destination == nil {
continue
}

if aws.StringValue(destination.StreamArn) == streamArn {
result = destination
break
}
}

return result, nil
}

func DynamoDBTableByName(conn *dynamodb.DynamoDB, tableName string) (*dynamodb.TableDescription, error) {
input := &dynamodb.DescribeTableInput{
TableName: aws.String(tableName),
Expand Down Expand Up @@ -88,34 +119,3 @@ func DynamoDBTTLRDescriptionByTableName(conn *dynamodb.DynamoDB, tableName strin

return output.TimeToLiveDescription, nil
}

func KinesisDataStreamDestination(ctx context.Context, conn *dynamodb.DynamoDB, streamArn, tableName string) (*dynamodb.KinesisDataStreamDestination, error) {
input := &dynamodb.DescribeKinesisStreamingDestinationInput{
TableName: aws.String(tableName),
}

output, err := conn.DescribeKinesisStreamingDestinationWithContext(ctx, input)

if err != nil {
return nil, err
}

if output == nil {
return nil, nil
}

var result *dynamodb.KinesisDataStreamDestination

for _, destination := range output.KinesisDataStreamDestinations {
if destination == nil {
continue
}

if aws.StringValue(destination.StreamArn) == streamArn {
result = destination
break
}
}

return result, nil
}
32 changes: 16 additions & 16 deletions aws/internal/service/dynamodb/waiter/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,22 @@ import (
"github.com/terraform-providers/terraform-provider-aws/aws/internal/service/dynamodb/finder"
)

func DynamoDBKinesisStreamingDestinationStatus(ctx context.Context, conn *dynamodb.DynamoDB, streamArn, tableName string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
result, err := finder.DynamoDBKinesisDataStreamDestination(ctx, conn, streamArn, tableName)

if err != nil {
return nil, "", err
}

if result == nil {
return nil, "", nil
}

return result, aws.StringValue(result.DestinationStatus), nil
}
}

func DynamoDBTableStatus(conn *dynamodb.DynamoDB, tableName string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
table, err := finder.DynamoDBTableByName(conn, tableName)
Expand Down Expand Up @@ -169,19 +185,3 @@ func DynamoDBTableSESStatus(conn *dynamodb.DynamoDB, tableName string) resource.
return table, aws.StringValue(table.SSEDescription.Status), nil
}
}

func KinesisStreamingDestinationStatus(ctx context.Context, conn *dynamodb.DynamoDB, streamArn, tableName string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
result, err := finder.KinesisDataStreamDestination(ctx, conn, streamArn, tableName)

if err != nil {
return nil, "", err
}

if result == nil {
return nil, "", nil
}

return result, aws.StringValue(result.DestinationStatus), nil
}
}
70 changes: 36 additions & 34 deletions aws/internal/service/dynamodb/waiter/waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,44 @@ import (
)

const (
CreateTableTimeout = 20 * time.Minute
UpdateTableTimeoutTotal = 60 * time.Minute
ReplicaUpdateTimeout = 30 * time.Minute
UpdateTableTimeout = 20 * time.Minute
UpdateTableContinuousBackupsTimeout = 20 * time.Minute
DeleteTableTimeout = 10 * time.Minute
PITRUpdateTimeout = 30 * time.Second
TTLUpdateTimeout = 30 * time.Second
KinesisStreamingDestinationActiveTimeout = 5 * time.Minute
KinesisStreamingDestinationDisabledTimeout = 5 * time.Minute
CreateTableTimeout = 20 * time.Minute
UpdateTableTimeoutTotal = 60 * time.Minute
ReplicaUpdateTimeout = 30 * time.Minute
UpdateTableTimeout = 20 * time.Minute
UpdateTableContinuousBackupsTimeout = 20 * time.Minute
DeleteTableTimeout = 10 * time.Minute
PITRUpdateTimeout = 30 * time.Second
TTLUpdateTimeout = 30 * time.Second
)

func DynamoDBKinesisStreamingDestinationActive(ctx context.Context, conn *dynamodb.DynamoDB, streamArn, tableName string) error {
stateConf := &resource.StateChangeConf{
Pending: []string{dynamodb.DestinationStatusDisabled, dynamodb.DestinationStatusEnabling},
Target: []string{dynamodb.DestinationStatusActive},
Timeout: KinesisStreamingDestinationActiveTimeout,
Refresh: DynamoDBKinesisStreamingDestinationStatus(ctx, conn, streamArn, tableName),
}

_, err := stateConf.WaitForStateContext(ctx)

return err
}

func DynamoDBKinesisStreamingDestinationDisabled(ctx context.Context, conn *dynamodb.DynamoDB, streamArn, tableName string) error {
stateConf := &resource.StateChangeConf{
Pending: []string{dynamodb.DestinationStatusActive, dynamodb.DestinationStatusDisabling},
Target: []string{dynamodb.DestinationStatusDisabled},
Timeout: KinesisStreamingDestinationDisabledTimeout,
Refresh: DynamoDBKinesisStreamingDestinationStatus(ctx, conn, streamArn, tableName),
}

_, err := stateConf.WaitForStateContext(ctx)

return err
}

func DynamoDBTableActive(conn *dynamodb.DynamoDB, tableName string) (*dynamodb.TableDescription, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{
Expand Down Expand Up @@ -230,29 +258,3 @@ func DynamoDBSSEUpdated(conn *dynamodb.DynamoDB, tableName string) (*dynamodb.Ta

return nil, err
}

func KinesisStreamingDestinationActive(ctx context.Context, conn *dynamodb.DynamoDB, streamArn, tableName string) error {
stateConf := &resource.StateChangeConf{
Pending: []string{dynamodb.DestinationStatusDisabled, dynamodb.DestinationStatusEnabling},
Target: []string{dynamodb.DestinationStatusActive},
Timeout: 5 * time.Minute,
Refresh: KinesisStreamingDestinationStatus(ctx, conn, streamArn, tableName),
}

_, err := stateConf.WaitForStateContext(ctx)

return err
}

func KinesisStreamingDestinationDisabled(ctx context.Context, conn *dynamodb.DynamoDB, streamArn, tableName string) error {
stateConf := &resource.StateChangeConf{
Pending: []string{dynamodb.DestinationStatusActive, dynamodb.DestinationStatusDisabling},
Target: []string{dynamodb.DestinationStatusDisabled},
Timeout: 5 * time.Minute,
Refresh: KinesisStreamingDestinationStatus(ctx, conn, streamArn, tableName),
}

_, err := stateConf.WaitForStateContext(ctx)

return err
}
38 changes: 23 additions & 15 deletions aws/resource_aws_dynamodb_kinesis_streaming_destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ import (

func resourceAwsDynamoDbKinesisStreamingDestination() *schema.Resource {
return &schema.Resource{
CreateContext: resourceAwsDynamoDbKinesisStreamingDestinationCreate,
ReadContext: resourceAwsDynamoDbKinesisStreamingDestinationRead,
DeleteContext: resourceAwsDynamoDbKinesisStreamingDestinationDelete,
CreateWithoutTimeout: resourceAwsDynamoDbKinesisStreamingDestinationCreate,
ReadWithoutTimeout: resourceAwsDynamoDbKinesisStreamingDestinationRead,
DeleteWithoutTimeout: resourceAwsDynamoDbKinesisStreamingDestinationDelete,

Importer: &schema.ResourceImporter{
StateContext: schema.ImportStatePassthroughContext,
},

Schema: map[string]*schema.Schema{
"stream_arn": {
Expand Down Expand Up @@ -51,11 +55,15 @@ func resourceAwsDynamoDbKinesisStreamingDestinationCreate(ctx context.Context, d
output, err := conn.EnableKinesisStreamingDestinationWithContext(ctx, input)

if err != nil {
return diag.FromErr(fmt.Errorf("error enabling DynamoDB Kinesis streaming destination for stream (%s) and table (%s): %w", streamArn, tableName, err))
return diag.FromErr(fmt.Errorf("error enabling DynamoDB Kinesis streaming destination (stream: %s, table: %s): %w", streamArn, tableName, err))
}

if output == nil {
return diag.FromErr(fmt.Errorf("error enabling DynamoDB Kinesis streaming destination (stream: %s, table: %s): empty output", streamArn, tableName))
}

if err := waiter.KinesisStreamingDestinationActive(ctx, conn, streamArn, tableName); err != nil {
return diag.FromErr(fmt.Errorf("error waiting for Kinesis Streaming Destination to become active: %w", err))
if err := waiter.DynamoDBKinesisStreamingDestinationActive(ctx, conn, streamArn, tableName); err != nil {
return diag.FromErr(fmt.Errorf("error waiting for DynamoDB Kinesis streaming destination (stream: %s, table: %s) to be active: %w", streamArn, tableName, err))
}

d.SetId(fmt.Sprintf("%s,%s", aws.StringValue(output.TableName), aws.StringValue(output.StreamArn)))
Expand All @@ -72,23 +80,23 @@ func resourceAwsDynamoDbKinesisStreamingDestinationRead(ctx context.Context, d *
return diag.FromErr(err)
}

output, err := finder.KinesisDataStreamDestination(ctx, conn, streamArn, tableName)
output, err := finder.DynamoDBKinesisDataStreamDestination(ctx, conn, streamArn, tableName)

if !d.IsNewResource() && tfawserr.ErrCodeEquals(err, dynamodb.ErrCodeResourceNotFoundException) {
log.Printf("[WARN] Kinesis Data Stream Destination (%s) not found for DynamoDB table (%s), removing from state", streamArn, tableName)
log.Printf("[WARN] DynamoDB Kinesis Streaming Destination (stream: %s, table: %s) not found, removing from state", streamArn, tableName)
d.SetId("")
return nil
}

if err != nil {
return diag.FromErr(fmt.Errorf("error retrieving Kinesis Streaming Destination for DynamoDB table (%s): %w", tableName, err))
return diag.FromErr(fmt.Errorf("error retrieving DynamoDB Kinesis streaming destination (stream: %s, table: %s): %w", streamArn, tableName, err))
}

if output == nil {
if output == nil || aws.StringValue(output.DestinationStatus) == dynamodb.DestinationStatusDisabled {
if d.IsNewResource() {
return diag.FromErr(fmt.Errorf("error retrieving Kinesis Streaming Destination for DynamoDB table (%s): empty output", tableName))
return diag.FromErr(fmt.Errorf("error retrieving DynamoDB Kinesis streaming destination (stream: %s, table: %s): empty output after creation", streamArn, tableName))
}
log.Printf("[WARN] Kinesis Data Stream Destination (%s) not found for DynamoDB table (%s), removing from state", streamArn, tableName)
log.Printf("[WARN] DynamoDB Kinesis Streaming Destination (stream: %s, table: %s) not found, removing from state", streamArn, tableName)
d.SetId("")
return nil
}
Expand Down Expand Up @@ -116,11 +124,11 @@ func resourceAwsDynamoDbKinesisStreamingDestinationDelete(ctx context.Context, d
_, err = conn.DisableKinesisStreamingDestinationWithContext(ctx, input)

if err != nil {
return diag.FromErr(fmt.Errorf("error disabling Kinesis Streaming Destination (%s) for DynamoDB table (%s): %w", streamArn, tableName, err))
return diag.FromErr(fmt.Errorf("error disabling DynamoDB Kinesis streaming destination (stream: %s, table: %s): %w", streamArn, tableName, err))
}

if err := waiter.KinesisStreamingDestinationDisabled(ctx, conn, streamArn, tableName); err != nil {
return diag.FromErr(fmt.Errorf("error waiting for Kinesis Streaming Destination for DynamoDB table (%s) to be disabled: %w", tableName, err))
if err := waiter.DynamoDBKinesisStreamingDestinationDisabled(ctx, conn, streamArn, tableName); err != nil {
return diag.FromErr(fmt.Errorf("error waiting for DynamoDB Kinesis streaming destination (stream: %s, table: %s) to be disabled: %w", streamArn, tableName, err))
}

return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func TestAccAwsDynamoDbKinesisStreamingDestination_basic(t *testing.T) {

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
ErrorCheck: testAccErrorCheck(t, dynamodb.EndpointsID),
ProviderFactories: testAccProviderFactories,
CheckDestroy: testAccCheckAWSDynamoDbKinesisStreamingDestinationDestroy,
Steps: []resource.TestStep{
Expand All @@ -46,6 +47,7 @@ func TestAccAwsDynamoDbKinesisStreamingDestination_disappears(t *testing.T) {

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
ErrorCheck: testAccErrorCheck(t, dynamodb.EndpointsID),
ProviderFactories: testAccProviderFactories,
CheckDestroy: testAccCheckAWSDynamoDbKinesisStreamingDestinationDestroy,
Steps: []resource.TestStep{
Expand All @@ -69,6 +71,7 @@ func TestAccAwsDynamoDbKinesisStreamingDestination_disappears_DynamoDbTable(t *t

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
ErrorCheck: testAccErrorCheck(t, dynamodb.EndpointsID),
ProviderFactories: testAccProviderFactories,
CheckDestroy: testAccCheckAWSDynamoDbKinesisStreamingDestinationDestroy,
Steps: []resource.TestStep{
Expand Down Expand Up @@ -129,7 +132,7 @@ func testAccCheckDynamoDbKinesisStreamingDestinationExists(resourceName string)

conn := testAccProvider.Meta().(*AWSClient).dynamodbconn

output, err := finder.KinesisDataStreamDestination(context.Background(), conn, streamArn, tableName)
output, err := finder.DynamoDBKinesisDataStreamDestination(context.Background(), conn, streamArn, tableName)

if err != nil {
return err
Expand Down Expand Up @@ -157,7 +160,7 @@ func testAccCheckAWSDynamoDbKinesisStreamingDestinationDestroy(s *terraform.Stat
return err
}

output, err := finder.KinesisDataStreamDestination(context.Background(), conn, streamArn, tableName)
output, err := finder.DynamoDBKinesisDataStreamDestination(context.Background(), conn, streamArn, tableName)

if tfawserr.ErrCodeEquals(err, dynamodb.ErrCodeResourceNotFoundException) {
continue
Expand Down
36 changes: 25 additions & 11 deletions website/docs/r/dynamodb_kinesis_streaming_destination.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ subcategory: "DynamoDB"
layout: "aws"
page_title: "AWS: aws_dynamodb_kinesis_streaming_destination"
description: |-
Configures a Kinesis streaming destination for item level changes to a DynamoDB table
Enables a Kinesis streaming destination for a DynamoDB table
---

# Resource: aws_dynamodb_kinesis_streaming_destination

Configures a [Kinesis streaming destination](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/kds.html) for item level changes to a DynamoDB table.
Enables a [Kinesis streaming destination](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/kds.html) for data replication of a DynamoDB table.

## Example Usage

```hcl
resource "aws_dynamodb_table" "orders" {
```terraform
resource "aws_dynamodb_table" "example" {
name = "orders"
hash_key = "id"
Expand All @@ -23,22 +23,36 @@ resource "aws_dynamodb_table" "orders" {
}
}
resource "aws_kinesis_stream" "order_item_changes" {
resource "aws_kinesis_stream" "example" {
name = "order_item_changes"
shard_count = 1
}
resource "aws_dynamodb_kinesis_streaming_destination" "order_changes" {
table_name = aws_dynamodb_table.orders.name
stream_arn = aws_kinesis_stream.order_item_changes.arn
resource "aws_dynamodb_kinesis_streaming_destination" "example" {
stream_arn = aws_kinesis_stream.example.arn
table_name = aws_dynamodb_table.example.name
}
```

## Argument Reference

The following arguments are supported:

* `table_name` - (Required) The name of the DynamoDB table to capture changes from. There
* `stream_arn` - (Required) The ARN for a Kinesis data stream. This must exist in the same account and region as the DynamoDB table.

* `table_name` - (Required) The name of the DynamoDB table. There
can only be one Kinesis streaming destination for a given DynamoDB table.
* `stream_arn` - (Required) The arn of the Kinesis stream to capture changes into. This
must exist in the same account and region as the DynamoDB table.

## Attributes Reference

In addition to all arguments above, the following attributes are exported:

* `id` - The `table_name` and `stream_arn` separated by a comma (`,`).

## Import

DynamoDB Kinesis Streaming Destinations can be imported using the `table_name` and `stream_arn` separated by `,`, e.g.

```
$ terraform import aws_dynamodb_kinesis_streaming_destination.example example,arn:aws:kinesis:us-east-1:111122223333:exampleStreamName
```

0 comments on commit 7c0d29a

Please sign in to comment.