diff --git a/.changelog/19297.txt b/.changelog/19297.txt new file mode 100644 index 00000000000..a1e5c3a8a06 --- /dev/null +++ b/.changelog/19297.txt @@ -0,0 +1,11 @@ +```release-note:new-resource +aws_qldb_stream +``` + +```release-note:enhancement +data-source/aws_qldb_ledger: Add `kms_key` and `tags` attributes +``` + +```release-note:enhancement +resource/aws_qldb_ledger: Add `kms_key` argument +``` \ No newline at end of file diff --git a/internal/provider/provider.go b/internal/provider/provider.go index df93e4b5c5a..4b925a4f364 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -1681,6 +1681,7 @@ func Provider() *schema.Provider { "aws_pinpoint_sms_channel": pinpoint.ResourceSMSChannel(), "aws_qldb_ledger": qldb.ResourceLedger(), + "aws_qldb_stream": qldb.ResourceStream(), "aws_quicksight_data_source": quicksight.ResourceDataSource(), "aws_quicksight_group": quicksight.ResourceGroup(), diff --git a/internal/service/qldb/ledger.go b/internal/service/qldb/ledger.go index 29a9b79c2ae..57eb4005e27 100644 --- a/internal/service/qldb/ledger.go +++ b/internal/service/qldb/ledger.go @@ -1,7 +1,7 @@ package qldb import ( - "fmt" + "context" "log" "regexp" "time" @@ -9,10 +9,12 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/qldb" "github.com/hashicorp/aws-sdk-go-base/v2/awsv1shim/v2/tfawserr" + "github.com/hashicorp/terraform-plugin-sdk/v2/diag" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" "github.com/hashicorp/terraform-provider-aws/internal/conns" + "github.com/hashicorp/terraform-provider-aws/internal/create" tftags "github.com/hashicorp/terraform-provider-aws/internal/tags" "github.com/hashicorp/terraform-provider-aws/internal/tfresource" "github.com/hashicorp/terraform-provider-aws/internal/verify" @@ -20,10 +22,11 @@ import ( func ResourceLedger() *schema.Resource { return &schema.Resource{ - Create: resourceLedgerCreate, - Read: resourceLedgerRead, - Update: resourceLedgerUpdate, - Delete: resourceLedgerDelete, + CreateWithoutTimeout: resourceLedgerCreate, + ReadWithoutTimeout: resourceLedgerRead, + UpdateWithoutTimeout: resourceLedgerUpdate, + DeleteWithoutTimeout: resourceLedgerDelete, + Importer: &schema.ResourceImporter{ State: schema.ImportStatePassthrough, }, @@ -33,7 +36,20 @@ func ResourceLedger() *schema.Resource { Type: schema.TypeString, Computed: true, }, - + "deletion_protection": { + Type: schema.TypeBool, + Optional: true, + Default: true, + }, + "kms_key": { + Type: schema.TypeString, + Optional: true, + Computed: true, + ValidateFunc: validation.Any( + validation.StringInSlice([]string{"AWS_OWNED_KMS_KEY"}, false), + verify.ValidARN, + ), + }, "name": { Type: schema.TypeString, Optional: true, @@ -44,21 +60,12 @@ func ResourceLedger() *schema.Resource { validation.StringMatch(regexp.MustCompile(`^[A-Za-z0-9_-]+`), "must contain only alphanumeric characters, underscores, and hyphens"), ), }, - "permissions_mode": { Type: schema.TypeString, Required: true, ValidateFunc: validation.StringInSlice(qldb.PermissionsMode_Values(), false), }, - - "deletion_protection": { - Type: schema.TypeBool, - Optional: true, - Default: true, - }, - - "tags": tftags.TagsSchema(), - + "tags": tftags.TagsSchema(), "tags_all": tftags.TagsSchemaComputed(), }, @@ -66,236 +73,236 @@ func ResourceLedger() *schema.Resource { } } -func resourceLedgerCreate(d *schema.ResourceData, meta interface{}) error { +func resourceLedgerCreate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { conn := meta.(*conns.AWSClient).QLDBConn defaultTagsConfig := meta.(*conns.AWSClient).DefaultTagsConfig tags := defaultTagsConfig.MergeTags(tftags.New(d.Get("tags").(map[string]interface{}))) - var name string - if v, ok := d.GetOk("name"); ok { - name = v.(string) - } else { - name = resource.PrefixedUniqueId("tf") - } - - if err := d.Set("name", name); err != nil { - return fmt.Errorf("error setting name: %s", err) - } - - // Create the QLDB Ledger - createOpts := &qldb.CreateLedgerInput{ - Name: aws.String(d.Get("name").(string)), - PermissionsMode: aws.String(d.Get("permissions_mode").(string)), + name := create.Name(d.Get("name").(string), "tf") + input := &qldb.CreateLedgerInput{ DeletionProtection: aws.Bool(d.Get("deletion_protection").(bool)), + Name: aws.String(name), + PermissionsMode: aws.String(d.Get("permissions_mode").(string)), Tags: Tags(tags.IgnoreAWS()), } - log.Printf("[DEBUG] QLDB Ledger create config: %#v", *createOpts) - qldbResp, err := conn.CreateLedger(createOpts) - if err != nil { - return fmt.Errorf("Error creating QLDB Ledger: %s", err) + if v, ok := d.GetOk("kms_key"); ok { + input.KmsKey = aws.String(v.(string)) } - // Set QLDB ledger name - d.SetId(aws.StringValue(qldbResp.Name)) - - log.Printf("[INFO] QLDB Ledger name: %s", d.Id()) + log.Printf("[DEBUG] Creating QLDB Ledger: %s", input) + output, err := conn.CreateLedgerWithContext(ctx, input) - stateConf := &resource.StateChangeConf{ - Pending: []string{qldb.LedgerStateCreating}, - Target: []string{qldb.LedgerStateActive}, - Refresh: qldbLedgerRefreshStatusFunc(conn, d.Id()), - Timeout: 8 * time.Minute, - MinTimeout: 3 * time.Second, + if err != nil { + return diag.Errorf("creating QLDB Ledger (%s): %s", name, err) } - _, err = stateConf.WaitForState() - if err != nil { - return fmt.Errorf("Error waiting for QLDB Ledger status to be \"%s\": %s", qldb.LedgerStateActive, err) + d.SetId(aws.StringValue(output.Name)) + + if _, err := waitLedgerCreated(ctx, conn, d.Id()); err != nil { + return diag.Errorf("waiting for QLDB Ledger (%s) create: %s", d.Id(), err) } - // Update our attributes and return - return resourceLedgerRead(d, meta) + return resourceLedgerRead(ctx, d, meta) } -func resourceLedgerRead(d *schema.ResourceData, meta interface{}) error { +func resourceLedgerRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { conn := meta.(*conns.AWSClient).QLDBConn defaultTagsConfig := meta.(*conns.AWSClient).DefaultTagsConfig ignoreTagsConfig := meta.(*conns.AWSClient).IgnoreTagsConfig - // Refresh the QLDB state - input := &qldb.DescribeLedgerInput{ - Name: aws.String(d.Id()), - } - - qldbLedger, err := conn.DescribeLedger(input) + ledger, err := FindLedgerByName(ctx, conn, d.Id()) - if tfawserr.ErrCodeEquals(err, qldb.ErrCodeResourceNotFoundException) { - log.Printf("[WARN] QLDB Ledger (%s) not found, removing from state", d.Id()) + if !d.IsNewResource() && tfresource.NotFound(err) { + log.Printf("[WARN] QLDB Ledger %s not found, removing from state", d.Id()) d.SetId("") return nil } if err != nil { - return fmt.Errorf("error describing QLDB Ledger (%s): %s", d.Id(), err) - } - - // QLDB stuff - if err := d.Set("name", qldbLedger.Name); err != nil { - return fmt.Errorf("error setting name: %s", err) - } - - if err := d.Set("permissions_mode", qldbLedger.PermissionsMode); err != nil { - return fmt.Errorf("error setting permissions mode: %s", err) - } - - if err := d.Set("deletion_protection", qldbLedger.DeletionProtection); err != nil { - return fmt.Errorf("error setting deletion protection: %s", err) + return diag.Errorf("reading QLDB Ledger (%s): %s", d.Id(), err) } - // ARN - if err := d.Set("arn", qldbLedger.Arn); err != nil { - return fmt.Errorf("error setting ARN: %s", err) + d.Set("arn", ledger.Arn) + d.Set("deletion_protection", ledger.DeletionProtection) + if ledger.EncryptionDescription != nil { + d.Set("kms_key", ledger.EncryptionDescription.KmsKeyArn) + } else { + d.Set("kms_key", nil) } + d.Set("name", ledger.Name) + d.Set("permissions_mode", ledger.PermissionsMode) - // Tags - log.Printf("[INFO] Fetching tags for %s", d.Id()) tags, err := ListTags(conn, d.Get("arn").(string)) + if err != nil { - return fmt.Errorf("Error listing tags for QLDB Ledger: %s", err) + return diag.Errorf("listing tags for QLDB Ledger (%s): %s", d.Id(), err) } tags = tags.IgnoreAWS().IgnoreConfig(ignoreTagsConfig) //lintignore:AWSR002 if err := d.Set("tags", tags.RemoveDefaultConfig(defaultTagsConfig).Map()); err != nil { - return fmt.Errorf("error setting tags: %w", err) + return diag.Errorf("setting tags: %s", err) } if err := d.Set("tags_all", tags.Map()); err != nil { - return fmt.Errorf("error setting tags_all: %w", err) + return diag.Errorf("setting tags_all: %s", err) } return nil } -func resourceLedgerUpdate(d *schema.ResourceData, meta interface{}) error { +func resourceLedgerUpdate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { conn := meta.(*conns.AWSClient).QLDBConn if d.HasChange("permissions_mode") { - updateOpts := &qldb.UpdateLedgerPermissionsModeInput{ + input := &qldb.UpdateLedgerPermissionsModeInput{ Name: aws.String(d.Id()), PermissionsMode: aws.String(d.Get("permissions_mode").(string)), } - if _, err := conn.UpdateLedgerPermissionsMode(updateOpts); err != nil { - return fmt.Errorf("error updating permissions mode: %s", err) + + log.Printf("[INFO] Updating QLDB Ledger permissions mode: %s", input) + if _, err := conn.UpdateLedgerPermissionsModeWithContext(ctx, input); err != nil { + return diag.Errorf("updating QLDB Ledger (%s) permissions mode: %s", d.Id(), err) } } - if d.HasChange("deletion_protection") { - val := d.Get("deletion_protection").(bool) - modifyOpts := &qldb.UpdateLedgerInput{ + if d.HasChanges("deletion_protection", "kms_key") { + input := &qldb.UpdateLedgerInput{ + DeletionProtection: aws.Bool(d.Get("deletion_protection").(bool)), Name: aws.String(d.Id()), - DeletionProtection: aws.Bool(val), } - log.Printf( - "[INFO] Modifying deletion_protection QLDB attribute for %s: %#v", - d.Id(), modifyOpts) - if _, err := conn.UpdateLedger(modifyOpts); err != nil { - return err + if d.HasChange("kms_key") { + input.KmsKey = aws.String(d.Get("kms_key").(string)) + } + + log.Printf("[INFO] Updating QLDB Ledger: %s", input) + if _, err := conn.UpdateLedgerWithContext(ctx, input); err != nil { + return diag.Errorf("updating QLDB Ledger (%s): %s", d.Id(), err) } } if d.HasChange("tags_all") { o, n := d.GetChange("tags_all") + if err := UpdateTags(conn, d.Get("arn").(string), o, n); err != nil { - return fmt.Errorf("error updating tags: %s", err) + return diag.Errorf("updating tags: %s", err) } } - return resourceLedgerRead(d, meta) + return resourceLedgerRead(ctx, d, meta) } -func resourceLedgerDelete(d *schema.ResourceData, meta interface{}) error { +func resourceLedgerDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { conn := meta.(*conns.AWSClient).QLDBConn - deleteLedgerOpts := &qldb.DeleteLedgerInput{ + + input := &qldb.DeleteLedgerInput{ Name: aws.String(d.Id()), } + log.Printf("[INFO] Deleting QLDB Ledger: %s", d.Id()) + _, err := tfresource.RetryWhenAWSErrCodeEqualsContext(ctx, 5*time.Minute, + func() (interface{}, error) { + return conn.DeleteLedgerWithContext(ctx, input) + }, qldb.ErrCodeResourceInUseException) - err := resource.Retry(5*time.Minute, func() *resource.RetryError { - _, err := conn.DeleteLedger(deleteLedgerOpts) + if tfawserr.ErrCodeEquals(err, qldb.ErrCodeResourceNotFoundException) { + return nil + } - if tfawserr.ErrCodeEquals(err, qldb.ErrCodeResourceInUseException) { - return resource.RetryableError(err) - } + if err != nil { + return diag.Errorf("deleting QLDB Ledger (%s): %s", d.Id(), err) + } - if err != nil { - return resource.NonRetryableError(err) - } + if _, err := waitLedgerDeleted(ctx, conn, d.Id()); err != nil { + return diag.Errorf("waiting for QLDB Ledger (%s) delete: %s", d.Id(), err) + } - return nil - }) + return nil +} - if tfresource.TimedOut(err) { - _, err = conn.DeleteLedger(deleteLedgerOpts) +func FindLedgerByName(ctx context.Context, conn *qldb.QLDB, name string) (*qldb.DescribeLedgerOutput, error) { + input := &qldb.DescribeLedgerInput{ + Name: aws.String(name), } + output, err := conn.DescribeLedgerWithContext(ctx, input) + if tfawserr.ErrCodeEquals(err, qldb.ErrCodeResourceNotFoundException) { - return nil + return nil, &resource.NotFoundError{ + LastError: err, + LastRequest: input, + } } if err != nil { - return fmt.Errorf("error deleting QLDB Ledger (%s): %s", d.Id(), err) + return nil, err } - if err := WaitForLedgerDeletion(conn, d.Id()); err != nil { - return fmt.Errorf("error waiting for QLDB Ledger (%s) deletion: %s", d.Id(), err) + if output == nil { + return nil, tfresource.NewEmptyResultError(input) } - return nil + if state := aws.StringValue(output.State); state == qldb.LedgerStateDeleted { + return nil, &resource.NotFoundError{ + Message: state, + LastRequest: input, + } + } + + return output, nil } -func qldbLedgerRefreshStatusFunc(conn *qldb.QLDB, ledger string) resource.StateRefreshFunc { +func statusLedgerState(ctx context.Context, conn *qldb.QLDB, name string) resource.StateRefreshFunc { return func() (interface{}, string, error) { - input := &qldb.DescribeLedgerInput{ - Name: aws.String(ledger), + output, err := FindLedgerByName(ctx, conn, name) + + if tfresource.NotFound(err) { + return nil, "", nil } - resp, err := conn.DescribeLedger(input) + if err != nil { - return nil, "failed", err + return nil, "", err } - return resp, aws.StringValue(resp.State), nil + + return output, aws.StringValue(output.State), nil } } -func WaitForLedgerDeletion(conn *qldb.QLDB, ledgerName string) error { - stateConf := resource.StateChangeConf{ - Pending: []string{qldb.LedgerStateCreating, - qldb.LedgerStateActive, - qldb.LedgerStateDeleting}, - Target: []string{""}, - Timeout: 5 * time.Minute, - MinTimeout: 1 * time.Second, - Refresh: func() (interface{}, string, error) { - resp, err := conn.DescribeLedger(&qldb.DescribeLedgerInput{ - Name: aws.String(ledgerName), - }) +func waitLedgerCreated(ctx context.Context, conn *qldb.QLDB, name string) (*qldb.DescribeLedgerOutput, error) { + stateConf := &resource.StateChangeConf{ + Pending: []string{qldb.LedgerStateCreating}, + Target: []string{qldb.LedgerStateActive}, + Refresh: statusLedgerState(ctx, conn, name), + Timeout: 8 * time.Minute, + MinTimeout: 3 * time.Second, + } - if tfawserr.ErrCodeEquals(err, qldb.ErrCodeResourceNotFoundException) { - return 1, "", nil - } + outputRaw, err := stateConf.WaitForStateContext(ctx) - if err != nil { - return nil, qldb.ErrCodeResourceInUseException, err - } + if output, ok := outputRaw.(*qldb.DescribeLedgerOutput); ok { + return output, err + } - return resp, aws.StringValue(resp.State), nil - }, + return nil, err +} + +func waitLedgerDeleted(ctx context.Context, conn *qldb.QLDB, name string) (*qldb.DescribeLedgerOutput, error) { + stateConf := &resource.StateChangeConf{ + Pending: []string{qldb.LedgerStateActive, qldb.LedgerStateDeleting}, + Target: []string{}, + Refresh: statusLedgerState(ctx, conn, name), + Timeout: 5 * time.Minute, + MinTimeout: 1 * time.Second, } - _, err := stateConf.WaitForState() + outputRaw, err := stateConf.WaitForStateContext(ctx) + + if output, ok := outputRaw.(*qldb.DescribeLedgerOutput); ok { + return output, err + } - return err + return nil, err } diff --git a/internal/service/qldb/ledger_data_source.go b/internal/service/qldb/ledger_data_source.go index 50b4ea29cfa..468f93e9787 100644 --- a/internal/service/qldb/ledger_data_source.go +++ b/internal/service/qldb/ledger_data_source.go @@ -1,26 +1,34 @@ package qldb import ( - "fmt" - "log" + "context" "regexp" "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/qldb" + "github.com/hashicorp/terraform-plugin-sdk/v2/diag" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" "github.com/hashicorp/terraform-provider-aws/internal/conns" + tftags "github.com/hashicorp/terraform-provider-aws/internal/tags" ) func DataSourceLedger() *schema.Resource { return &schema.Resource{ - Read: dataSourceLedgerRead, + ReadWithoutTimeout: dataSourceLedgerRead, + Schema: map[string]*schema.Schema{ "arn": { Type: schema.TypeString, Computed: true, }, - + "deletion_protection": { + Type: schema.TypeBool, + Computed: true, + }, + "kms_key": { + Type: schema.TypeString, + Computed: true, + }, "name": { Type: schema.TypeString, Required: true, @@ -29,40 +37,46 @@ func DataSourceLedger() *schema.Resource { validation.StringMatch(regexp.MustCompile(`^[A-Za-z0-9_-]+`), "must contain only alphanumeric characters, underscores, and hyphens"), ), }, - "permissions_mode": { Type: schema.TypeString, Computed: true, }, - - "deletion_protection": { - Type: schema.TypeBool, - Computed: true, - }, + "tags": tftags.TagsSchemaComputed(), }, } } -func dataSourceLedgerRead(d *schema.ResourceData, meta interface{}) error { +func dataSourceLedgerRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { conn := meta.(*conns.AWSClient).QLDBConn + ignoreTagsConfig := meta.(*conns.AWSClient).IgnoreTagsConfig - target := d.Get("name") + name := d.Get("name").(string) + ledger, err := FindLedgerByName(ctx, conn, name) - req := &qldb.DescribeLedgerInput{ - Name: aws.String(target.(string)), + if err != nil { + return diag.Errorf("reading QLDB Ledger (%s): %s", name, err) + } + + d.SetId(aws.StringValue(ledger.Name)) + d.Set("arn", ledger.Arn) + d.Set("deletion_protection", ledger.DeletionProtection) + if ledger.EncryptionDescription != nil { + d.Set("kms_key", ledger.EncryptionDescription.KmsKeyArn) + } else { + d.Set("kms_key", nil) } + d.Set("name", ledger.Name) + d.Set("permissions_mode", ledger.PermissionsMode) - log.Printf("[DEBUG] Reading QLDB Ledger: %s", req) - resp, err := conn.DescribeLedger(req) + tags, err := ListTags(conn, d.Get("arn").(string)) if err != nil { - return fmt.Errorf("Error describing ledger: %w", err) + return diag.Errorf("listing tags for QLDB Ledger (%s): %s", d.Id(), err) } - d.SetId(aws.StringValue(resp.Name)) - d.Set("arn", resp.Arn) - d.Set("deletion_protection", resp.DeletionProtection) - d.Set("permissions_mode", resp.PermissionsMode) + if err := d.Set("tags", tags.IgnoreAWS().IgnoreConfig(ignoreTagsConfig).Map()); err != nil { + return diag.Errorf("setting tags: %s", err) + } return nil } diff --git a/internal/service/qldb/ledger_data_source_test.go b/internal/service/qldb/ledger_data_source_test.go index 6341af8c09e..a054a02bcda 100644 --- a/internal/service/qldb/ledger_data_source_test.go +++ b/internal/service/qldb/ledger_data_source_test.go @@ -11,7 +11,9 @@ import ( ) func TestAccQLDBLedgerDataSource_basic(t *testing.T) { - rName := fmt.Sprintf("tf-acc-test-%s", sdkacctest.RandString(7)) // QLDB name cannot be longer than 32 characters + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + resourceName := "aws_qldb_ledger.test" + datasourceName := "data.aws_qldb_ledger.test" resource.ParallelTest(t, resource.TestCase{ PreCheck: func() { acctest.PreCheck(t); acctest.PreCheckPartitionHasService(qldb.EndpointsID, t) }, @@ -21,10 +23,12 @@ func TestAccQLDBLedgerDataSource_basic(t *testing.T) { { Config: testAccLedgerDataSourceConfig(rName), Check: resource.ComposeTestCheckFunc( - resource.TestCheckResourceAttrPair("data.aws_qldb_ledger.by_name", "arn", "aws_qldb_ledger.tf_test", "arn"), - resource.TestCheckResourceAttrPair("data.aws_qldb_ledger.by_name", "deletion_protection", "aws_qldb_ledger.tf_test", "deletion_protection"), - resource.TestCheckResourceAttrPair("data.aws_qldb_ledger.by_name", "name", "aws_qldb_ledger.tf_test", "name"), - resource.TestCheckResourceAttrPair("data.aws_qldb_ledger.by_name", "permissions_mode", "aws_qldb_ledger.tf_test", "permissions_mode"), + resource.TestCheckResourceAttrPair(datasourceName, "arn", resourceName, "arn"), + resource.TestCheckResourceAttrPair(datasourceName, "deletion_protection", resourceName, "deletion_protection"), + resource.TestCheckResourceAttrPair(datasourceName, "kms_key", resourceName, "kms_key"), + resource.TestCheckResourceAttrPair(datasourceName, "name", resourceName, "name"), + resource.TestCheckResourceAttrPair(datasourceName, "permissions_mode", resourceName, "permissions_mode"), + resource.TestCheckResourceAttrPair(datasourceName, "tags.%", resourceName, "tags.%"), ), }, }, @@ -33,26 +37,18 @@ func TestAccQLDBLedgerDataSource_basic(t *testing.T) { func testAccLedgerDataSourceConfig(rName string) string { return fmt.Sprintf(` -resource "aws_qldb_ledger" "tf_wrong1" { - name = "%[1]s1" +resource "aws_qldb_ledger" "test" { + name = %[1]q permissions_mode = "STANDARD" deletion_protection = false -} -resource "aws_qldb_ledger" "tf_test" { - name = "%[1]s2" - permissions_mode = "STANDARD" - deletion_protection = false -} - -resource "aws_qldb_ledger" "tf_wrong2" { - name = "%[1]s3" - permissions_mode = "STANDARD" - deletion_protection = false + tags = { + Env = "test" + } } -data "aws_qldb_ledger" "by_name" { - name = aws_qldb_ledger.tf_test.name +data "aws_qldb_ledger" "test" { + name = aws_qldb_ledger.test.id } `, rName) } diff --git a/internal/service/qldb/ledger_test.go b/internal/service/qldb/ledger_test.go index a377c2dd3c8..75c96d6ccc3 100644 --- a/internal/service/qldb/ledger_test.go +++ b/internal/service/qldb/ledger_test.go @@ -1,24 +1,24 @@ package qldb_test import ( + "context" "fmt" "regexp" "testing" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/qldb" - "github.com/hashicorp/aws-sdk-go-base/v2/awsv1shim/v2/tfawserr" sdkacctest "github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" - "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" "github.com/hashicorp/terraform-plugin-sdk/v2/terraform" "github.com/hashicorp/terraform-provider-aws/internal/acctest" "github.com/hashicorp/terraform-provider-aws/internal/conns" + tfqldb "github.com/hashicorp/terraform-provider-aws/internal/service/qldb" + "github.com/hashicorp/terraform-provider-aws/internal/tfresource" ) func TestAccQLDBLedger_basic(t *testing.T) { - var qldbCluster qldb.DescribeLedgerOutput - rInt := sdkacctest.RandInt() + var v qldb.DescribeLedgerOutput + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) resourceName := "aws_qldb_ledger.test" resource.ParallelTest(t, resource.TestCase{ @@ -28,13 +28,14 @@ func TestAccQLDBLedger_basic(t *testing.T) { CheckDestroy: testAccCheckLedgerDestroy, Steps: []resource.TestStep{ { - Config: testAccLedgerConfig_basic(rInt), - Check: resource.ComposeTestCheckFunc( - testAccCheckLedgerExists(resourceName, &qldbCluster), + Config: testAccLedgerConfig(rName), + Check: resource.ComposeAggregateTestCheckFunc( + testAccCheckLedgerExists(resourceName, &v), acctest.MatchResourceAttrRegionalARN(resourceName, "arn", "qldb", regexp.MustCompile(`ledger/.+`)), - resource.TestMatchResourceAttr(resourceName, "name", regexp.MustCompile("test-ledger-[0-9]+")), - resource.TestCheckResourceAttr(resourceName, "permissions_mode", "ALLOW_ALL"), resource.TestCheckResourceAttr(resourceName, "deletion_protection", "false"), + resource.TestCheckResourceAttr(resourceName, "kms_key", ""), + resource.TestCheckResourceAttr(resourceName, "name", rName), + resource.TestCheckResourceAttr(resourceName, "permissions_mode", "ALLOW_ALL"), resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), ), }, @@ -47,9 +48,58 @@ func TestAccQLDBLedger_basic(t *testing.T) { }) } +func TestAccQLDBLedger_disappears(t *testing.T) { + var v qldb.DescribeLedgerOutput + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + resourceName := "aws_qldb_ledger.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t); acctest.PreCheckPartitionHasService(qldb.EndpointsID, t) }, + ErrorCheck: acctest.ErrorCheck(t, qldb.EndpointsID), + Providers: acctest.Providers, + CheckDestroy: testAccCheckLedgerDestroy, + Steps: []resource.TestStep{ + { + Config: testAccLedgerConfig(rName), + Check: resource.ComposeAggregateTestCheckFunc( + testAccCheckLedgerExists(resourceName, &v), + acctest.CheckResourceDisappears(acctest.Provider, tfqldb.ResourceLedger(), resourceName), + ), + ExpectNonEmptyPlan: true, + }, + }, + }) +} + +func TestAccQLDBLedger_nameGenerated(t *testing.T) { + var v qldb.DescribeLedgerOutput + resourceName := "aws_qldb_ledger.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t); acctest.PreCheckPartitionHasService(qldb.EndpointsID, t) }, + ErrorCheck: acctest.ErrorCheck(t, qldb.EndpointsID), + Providers: acctest.Providers, + CheckDestroy: testAccCheckLedgerDestroy, + Steps: []resource.TestStep{ + { + Config: testAccLedgerNameGeneratedConfig(), + Check: resource.ComposeAggregateTestCheckFunc( + testAccCheckLedgerExists(resourceName, &v), + resource.TestMatchResourceAttr(resourceName, "name", regexp.MustCompile(`tf\d+`)), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + }, + }) +} + func TestAccQLDBLedger_update(t *testing.T) { - var qldbCluster qldb.DescribeLedgerOutput - rInt := sdkacctest.RandInt() + var v qldb.DescribeLedgerOutput + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) resourceName := "aws_qldb_ledger.test" resource.ParallelTest(t, resource.TestCase{ @@ -59,21 +109,55 @@ func TestAccQLDBLedger_update(t *testing.T) { CheckDestroy: testAccCheckLedgerDestroy, Steps: []resource.TestStep{ { - Config: testAccLedgerConfig_basic(rInt), + Config: testAccLedgerConfig(rName), Check: resource.ComposeTestCheckFunc( - testAccCheckLedgerExists(resourceName, &qldbCluster), + testAccCheckLedgerExists(resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "deletion_protection", "false"), resource.TestCheckResourceAttr(resourceName, "permissions_mode", "ALLOW_ALL"), ), }, { - Config: testAccLedgerConfig_update(rInt), + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + { + Config: testAccLedgerUpdatedConfig(rName), Check: resource.ComposeTestCheckFunc( - testAccCheckLedgerExists(resourceName, &qldbCluster), - acctest.MatchResourceAttrRegionalARN(resourceName, "arn", "qldb", regexp.MustCompile(`ledger/.+`)), - resource.TestMatchResourceAttr(resourceName, "name", regexp.MustCompile("test-ledger-[0-9]+")), + testAccCheckLedgerExists(resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "deletion_protection", "true"), resource.TestCheckResourceAttr(resourceName, "permissions_mode", "STANDARD"), + ), + }, + { + Config: testAccLedgerConfig(rName), + Check: resource.ComposeTestCheckFunc( + testAccCheckLedgerExists(resourceName, &v), resource.TestCheckResourceAttr(resourceName, "deletion_protection", "false"), - resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), + resource.TestCheckResourceAttr(resourceName, "permissions_mode", "ALLOW_ALL"), + ), + }, + }, + }) +} + +func TestAccQLDBLedger_kmsKey(t *testing.T) { + var v qldb.DescribeLedgerOutput + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + resourceName := "aws_qldb_ledger.test" + kmsKeyResourceName := "aws_kms_key.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t); acctest.PreCheckPartitionHasService(qldb.EndpointsID, t) }, + ErrorCheck: acctest.ErrorCheck(t, qldb.EndpointsID), + Providers: acctest.Providers, + CheckDestroy: testAccCheckLedgerDestroy, + Steps: []resource.TestStep{ + { + Config: testAccLedgerKMSKeyConfig(rName), + Check: resource.ComposeAggregateTestCheckFunc( + testAccCheckLedgerExists(resourceName, &v), + resource.TestCheckResourceAttrPair(resourceName, "kms_key", kmsKeyResourceName, "arn"), ), }, { @@ -81,43 +165,81 @@ func TestAccQLDBLedger_update(t *testing.T) { ImportState: true, ImportStateVerify: true, }, + { + Config: testAccLedgerKMSKeyUpdatedConfig(rName), + Check: resource.ComposeAggregateTestCheckFunc( + testAccCheckLedgerExists(resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "kms_key", "AWS_OWNED_KMS_KEY"), + ), + }, }, }) } -func testAccCheckLedgerDestroy(s *terraform.State) error { - return testAccCheckLedgerDestroyWithProvider(s, acctest.Provider) +func TestAccQLDBLedger_tags(t *testing.T) { + var v qldb.DescribeLedgerOutput + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + resourceName := "aws_qldb_ledger.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t); acctest.PreCheckPartitionHasService(qldb.EndpointsID, t) }, + ErrorCheck: acctest.ErrorCheck(t, qldb.EndpointsID), + Providers: acctest.Providers, + CheckDestroy: testAccCheckLedgerDestroy, + Steps: []resource.TestStep{ + { + Config: testAccLedgerConfigTags1(rName, "key1", "value1"), + Check: resource.ComposeTestCheckFunc( + testAccCheckLedgerExists(resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "tags.%", "1"), + resource.TestCheckResourceAttr(resourceName, "tags.key1", "value1"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + { + Config: testAccLedgerConfigTags2(rName, "key1", "value1updated", "key2", "value2"), + Check: resource.ComposeTestCheckFunc( + testAccCheckLedgerExists(resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "tags.%", "2"), + resource.TestCheckResourceAttr(resourceName, "tags.key1", "value1updated"), + resource.TestCheckResourceAttr(resourceName, "tags.key2", "value2"), + ), + }, + { + Config: testAccLedgerConfigTags1(rName, "key2", "value2"), + Check: resource.ComposeTestCheckFunc( + testAccCheckLedgerExists(resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "tags.%", "1"), + resource.TestCheckResourceAttr(resourceName, "tags.key2", "value2"), + ), + }, + }, + }) } -func testAccCheckLedgerDestroyWithProvider(s *terraform.State, provider *schema.Provider) error { - conn := provider.Meta().(*conns.AWSClient).QLDBConn +func testAccCheckLedgerDestroy(s *terraform.State) error { + conn := acctest.Provider.Meta().(*conns.AWSClient).QLDBConn for _, rs := range s.RootModule().Resources { if rs.Type != "aws_qldb_ledger" { continue } - // Try to find the Group - var err error - resp, err := conn.DescribeLedger( - &qldb.DescribeLedgerInput{ - Name: aws.String(rs.Primary.ID), - }) - - if err == nil { - if len(aws.StringValue(resp.Name)) != 0 && aws.StringValue(resp.Name) == rs.Primary.ID { - return fmt.Errorf("QLDB Ledger %s still exists", rs.Primary.ID) - } - } + _, err := tfqldb.FindLedgerByName(context.TODO(), conn, rs.Primary.ID) - // Return nil if the cluster is already destroyed - if tfawserr.ErrCodeEquals(err, qldb.ErrCodeResourceNotFoundException) { + if tfresource.NotFound(err) { continue } if err != nil { return err } + + return fmt.Errorf("QLDB Ledger %s still exists", rs.Primary.ID) } return nil @@ -135,89 +257,81 @@ func testAccCheckLedgerExists(n string, v *qldb.DescribeLedgerOutput) resource.T } conn := acctest.Provider.Meta().(*conns.AWSClient).QLDBConn - resp, err := conn.DescribeLedger(&qldb.DescribeLedgerInput{ - Name: aws.String(rs.Primary.ID), - }) + + output, err := tfqldb.FindLedgerByName(context.TODO(), conn, rs.Primary.ID) if err != nil { return err } - if *resp.Name == rs.Primary.ID { - *v = *resp - return nil - } + *v = *output - return fmt.Errorf("QLDB Ledger (%s) not found", rs.Primary.ID) + return nil } } -func testAccLedgerConfig_basic(n int) string { +func testAccLedgerConfig(rName string) string { return fmt.Sprintf(` resource "aws_qldb_ledger" "test" { - name = "test-ledger-%d" + name = %[1]q permissions_mode = "ALLOW_ALL" deletion_protection = false } -`, n) +`, rName) } -func testAccLedgerConfig_update(n int) string { +func testAccLedgerUpdatedConfig(rName string) string { return fmt.Sprintf(` resource "aws_qldb_ledger" "test" { - name = "test-ledger-%d" + name = %[1]q permissions_mode = "STANDARD" + deletion_protection = true +} +`, rName) +} + +func testAccLedgerNameGeneratedConfig() string { + return ` +resource "aws_qldb_ledger" "test" { + permissions_mode = "ALLOW_ALL" deletion_protection = false } -`, n) +` } -func TestAccQLDBLedger_tags(t *testing.T) { - var cluster1, cluster2, cluster3 qldb.DescribeLedgerOutput - rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) - resourceName := "aws_qldb_ledger.test" +func testAccLedgerKMSKeyConfig(rName string) string { + return fmt.Sprintf(` +resource "aws_kms_key" "test" { + description = %[1]q + deletion_window_in_days = 7 +} - resource.ParallelTest(t, resource.TestCase{ - PreCheck: func() { acctest.PreCheck(t); acctest.PreCheckPartitionHasService(qldb.EndpointsID, t) }, - ErrorCheck: acctest.ErrorCheck(t, qldb.EndpointsID), - Providers: acctest.Providers, - CheckDestroy: testAccCheckLedgerDestroy, - Steps: []resource.TestStep{ - { - Config: testAccLedgerTags1Config(rName, "key1", "value1"), - Check: resource.ComposeTestCheckFunc( - testAccCheckLedgerExists(resourceName, &cluster1), - resource.TestCheckResourceAttr(resourceName, "tags.%", "1"), - resource.TestCheckResourceAttr(resourceName, "tags.key1", "value1"), - ), - }, - { - ResourceName: resourceName, - ImportState: true, - ImportStateVerify: true, - }, - { - Config: testAccLedgerTags2Config(rName, "key1", "value1updated", "key2", "value2"), - Check: resource.ComposeTestCheckFunc( - testAccCheckLedgerExists(resourceName, &cluster2), - resource.TestCheckResourceAttr(resourceName, "tags.%", "2"), - resource.TestCheckResourceAttr(resourceName, "tags.key1", "value1updated"), - resource.TestCheckResourceAttr(resourceName, "tags.key2", "value2"), - ), - }, - { - Config: testAccLedgerTags1Config(rName, "key2", "value2"), - Check: resource.ComposeTestCheckFunc( - testAccCheckLedgerExists(resourceName, &cluster3), - resource.TestCheckResourceAttr(resourceName, "tags.%", "1"), - resource.TestCheckResourceAttr(resourceName, "tags.key2", "value2"), - ), - }, - }, - }) +resource "aws_qldb_ledger" "test" { + name = %[1]q + permissions_mode = "ALLOW_ALL" + deletion_protection = false + kms_key = aws_kms_key.test.arn +} +`, rName) +} + +func testAccLedgerKMSKeyUpdatedConfig(rName string) string { + return fmt.Sprintf(` +resource "aws_kms_key" "test" { + description = %[1]q + deletion_window_in_days = 7 +} + +resource "aws_qldb_ledger" "test" { + name = %[1]q + permissions_mode = "ALLOW_ALL" + deletion_protection = false + kms_key = "AWS_OWNED_KMS_KEY" +} +`, rName) } -func testAccLedgerTags1Config(rName, tagKey1, tagValue1 string) string { +func testAccLedgerConfigTags1(rName, tagKey1, tagValue1 string) string { return fmt.Sprintf(` resource "aws_qldb_ledger" "test" { name = %[1]q @@ -231,7 +345,7 @@ resource "aws_qldb_ledger" "test" { `, rName, tagKey1, tagValue1) } -func testAccLedgerTags2Config(rName, tagKey1, tagValue1, tagKey2, tagValue2 string) string { +func testAccLedgerConfigTags2(rName, tagKey1, tagValue1, tagKey2, tagValue2 string) string { return fmt.Sprintf(` resource "aws_qldb_ledger" "test" { name = %[1]q diff --git a/internal/service/qldb/stream.go b/internal/service/qldb/stream.go new file mode 100644 index 00000000000..7e6df1fda26 --- /dev/null +++ b/internal/service/qldb/stream.go @@ -0,0 +1,401 @@ +package qldb + +import ( + "context" + "errors" + "log" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/qldb" + "github.com/hashicorp/aws-sdk-go-base/v2/awsv1shim/v2/tfawserr" + "github.com/hashicorp/terraform-plugin-sdk/v2/diag" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" + "github.com/hashicorp/terraform-provider-aws/internal/conns" + tftags "github.com/hashicorp/terraform-provider-aws/internal/tags" + "github.com/hashicorp/terraform-provider-aws/internal/tfresource" + "github.com/hashicorp/terraform-provider-aws/internal/verify" +) + +func ResourceStream() *schema.Resource { + return &schema.Resource{ + CreateWithoutTimeout: resourceStreamCreate, + ReadWithoutTimeout: resourceStreamRead, + UpdateWithoutTimeout: resourceStreamUpdate, + DeleteWithoutTimeout: resourceStreamDelete, + + Schema: map[string]*schema.Schema{ + "arn": { + Type: schema.TypeString, + Computed: true, + }, + "exclusive_end_time": { + Type: schema.TypeString, + Optional: true, + ForceNew: true, + ValidateFunc: validation.IsRFC3339Time, + }, + "inclusive_start_time": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + ValidateFunc: validation.IsRFC3339Time, + }, + "kinesis_configuration": { + Type: schema.TypeList, + Required: true, + ForceNew: true, + MaxItems: 1, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "aggregation_enabled": { + Type: schema.TypeBool, + Optional: true, + Default: true, + ForceNew: true, + }, + "stream_arn": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + ValidateFunc: verify.ValidARN, + }, + }, + }, + }, + "ledger_name": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + ValidateFunc: validation.All( + validation.StringLenBetween(1, 32), + ), + }, + "role_arn": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + ValidateFunc: verify.ValidARN, + }, + "stream_name": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + ValidateFunc: validation.All( + validation.StringLenBetween(1, 32), + ), + }, + "tags": tftags.TagsSchema(), + "tags_all": tftags.TagsSchemaComputed(), + }, + + CustomizeDiff: verify.SetTagsDiff, + } +} + +func resourceStreamCreate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { + conn := meta.(*conns.AWSClient).QLDBConn + defaultTagsConfig := meta.(*conns.AWSClient).DefaultTagsConfig + tags := defaultTagsConfig.MergeTags(tftags.New(d.Get("tags").(map[string]interface{}))) + + ledgerName := d.Get("ledger_name").(string) + name := d.Get("stream_name").(string) + input := &qldb.StreamJournalToKinesisInput{ + LedgerName: aws.String(ledgerName), + RoleArn: aws.String(d.Get("role_arn").(string)), + StreamName: aws.String(name), + Tags: Tags(tags.IgnoreAWS()), + } + + if v, ok := d.GetOk("exclusive_end_time"); ok { + v, _ := time.Parse(time.RFC3339, v.(string)) + input.ExclusiveEndTime = aws.Time(v) + } + + if v, ok := d.GetOk("inclusive_start_time"); ok { + v, _ := time.Parse(time.RFC3339, v.(string)) + input.InclusiveStartTime = aws.Time(v) + } + + if v, ok := d.GetOk("kinesis_configuration"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil { + input.KinesisConfiguration = expandKinesisConfiguration(v.([]interface{})[0].(map[string]interface{})) + } + + log.Printf("[DEBUG] Creating QLDB Stream: %s", input) + output, err := conn.StreamJournalToKinesisWithContext(ctx, input) + + if err != nil { + return diag.Errorf("creating QLDB Stream (%s): %s", name, err) + } + + d.SetId(aws.StringValue(output.StreamId)) + + if _, err := waitStreamCreated(ctx, conn, ledgerName, d.Id()); err != nil { + return diag.Errorf("waiting for QLDB Stream (%s) create: %s", d.Id(), err) + } + + return resourceStreamRead(ctx, d, meta) +} + +func resourceStreamRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { + conn := meta.(*conns.AWSClient).QLDBConn + defaultTagsConfig := meta.(*conns.AWSClient).DefaultTagsConfig + ignoreTagsConfig := meta.(*conns.AWSClient).IgnoreTagsConfig + + ledgerName := d.Get("ledger_name").(string) + stream, err := FindStream(ctx, conn, ledgerName, d.Id()) + + if !d.IsNewResource() && tfresource.NotFound(err) { + log.Printf("[WARN] QLDB Stream %s not found, removing from state", d.Id()) + d.SetId("") + return nil + } + + if err != nil { + return diag.Errorf("reading QLDB Stream (%s): %s", d.Id(), err) + } + + d.Set("arn", stream.Arn) + if stream.ExclusiveEndTime != nil { + d.Set("exclusive_end_time", aws.TimeValue(stream.ExclusiveEndTime).Format(time.RFC3339)) + } else { + d.Set("exclusive_end_time", nil) + } + if stream.InclusiveStartTime != nil { + d.Set("inclusive_start_time", aws.TimeValue(stream.InclusiveStartTime).Format(time.RFC3339)) + } else { + d.Set("inclusive_start_time", nil) + } + if stream.KinesisConfiguration != nil { + if err := d.Set("kinesis_configuration", []interface{}{flattenKinesisConfiguration(stream.KinesisConfiguration)}); err != nil { + return diag.Errorf("setting kinesis_configuration: %s", err) + } + } else { + d.Set("kinesis_configuration", nil) + } + d.Set("ledger_name", stream.LedgerName) + d.Set("role_arn", stream.RoleArn) + d.Set("stream_name", stream.StreamName) + + tags, err := ListTags(conn, d.Get("arn").(string)) + + if err != nil { + return diag.Errorf("listing tags for QLDB Stream (%s): %s", d.Id(), err) + } + + tags = tags.IgnoreAWS().IgnoreConfig(ignoreTagsConfig) + + //lintignore:AWSR002 + if err := d.Set("tags", tags.RemoveDefaultConfig(defaultTagsConfig).Map()); err != nil { + return diag.Errorf("setting tags: %s", err) + } + + if err := d.Set("tags_all", tags.Map()); err != nil { + return diag.Errorf("setting tags_all: %s", err) + } + + return nil +} + +func resourceStreamUpdate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { + conn := meta.(*conns.AWSClient).QLDBConn + + if d.HasChange("tags") { + o, n := d.GetChange("tags") + + if err := UpdateTags(conn, d.Get("arn").(string), o, n); err != nil { + return diag.Errorf("updating tags: %s", err) + } + } + + return resourceStreamRead(ctx, d, meta) +} + +func resourceStreamDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics { + conn := meta.(*conns.AWSClient).QLDBConn + + ledgerName := d.Get("ledger_name").(string) + input := &qldb.CancelJournalKinesisStreamInput{ + LedgerName: aws.String(ledgerName), + StreamId: aws.String(d.Id()), + } + + log.Printf("[INFO] Deleting QLDB Stream: %s", d.Id()) + _, err := tfresource.RetryWhenAWSErrCodeEqualsContext(ctx, 5*time.Minute, + func() (interface{}, error) { + return conn.CancelJournalKinesisStreamWithContext(ctx, input) + }, qldb.ErrCodeResourceInUseException) + + if tfawserr.ErrCodeEquals(err, qldb.ErrCodeResourceNotFoundException) { + return nil + } + + if err != nil { + return diag.Errorf("deleting QLDB Stream (%s): %s", d.Id(), err) + } + + if _, err := waitStreamDeleted(ctx, conn, ledgerName, d.Id()); err != nil { + return diag.Errorf("waiting for QLDB Stream (%s) delete: %s", d.Id(), err) + } + + return nil +} + +func FindStream(ctx context.Context, conn *qldb.QLDB, ledgerName, streamID string) (*qldb.JournalKinesisStreamDescription, error) { + input := &qldb.DescribeJournalKinesisStreamInput{ + LedgerName: aws.String(ledgerName), + StreamId: aws.String(streamID), + } + + output, err := findJournalKinesisStream(ctx, conn, input) + + if err != nil { + return nil, err + } + + // See https://docs.aws.amazon.com/qldb/latest/developerguide/streams.create.html#streams.create.states. + switch status := aws.StringValue(output.Status); status { + case qldb.StreamStatusCompleted, qldb.StreamStatusCanceled, qldb.StreamStatusFailed: + return nil, &resource.NotFoundError{ + Message: status, + LastRequest: input, + } + } + + return output, nil +} + +func findJournalKinesisStream(ctx context.Context, conn *qldb.QLDB, input *qldb.DescribeJournalKinesisStreamInput) (*qldb.JournalKinesisStreamDescription, error) { + output, err := conn.DescribeJournalKinesisStreamWithContext(ctx, input) + + if tfawserr.ErrCodeEquals(err, qldb.ErrCodeResourceNotFoundException) { + return nil, &resource.NotFoundError{ + LastError: err, + LastRequest: input, + } + } + + if err != nil { + return nil, err + } + + if output == nil || output.Stream == nil { + return nil, tfresource.NewEmptyResultError(input) + } + + return output.Stream, nil +} + +func statusStreamCreated(ctx context.Context, conn *qldb.QLDB, ledgerName, streamID string) resource.StateRefreshFunc { + return func() (interface{}, string, error) { + // Don't call FindStream as it maps useful statuses to NotFoundError. + output, err := findJournalKinesisStream(ctx, conn, &qldb.DescribeJournalKinesisStreamInput{ + LedgerName: aws.String(ledgerName), + StreamId: aws.String(streamID), + }) + + if tfresource.NotFound(err) { + return nil, "", nil + } + + if err != nil { + return nil, "", err + } + + return output, aws.StringValue(output.Status), nil + } +} + +func waitStreamCreated(ctx context.Context, conn *qldb.QLDB, ledgerName, streamID string) (*qldb.JournalKinesisStreamDescription, error) { + stateConf := &resource.StateChangeConf{ + Pending: []string{qldb.StreamStatusImpaired}, + Target: []string{qldb.StreamStatusActive}, + Refresh: statusStreamCreated(ctx, conn, ledgerName, streamID), + Timeout: 8 * time.Minute, + MinTimeout: 3 * time.Second, + } + + outputRaw, err := stateConf.WaitForStateContext(ctx) + + if output, ok := outputRaw.(*qldb.JournalKinesisStreamDescription); ok { + tfresource.SetLastError(err, errors.New(aws.StringValue(output.ErrorCause))) + + return output, err + } + + return nil, err +} + +func statusStreamDeleted(ctx context.Context, conn *qldb.QLDB, ledgerName, streamID string) resource.StateRefreshFunc { + return func() (interface{}, string, error) { + output, err := FindStream(ctx, conn, ledgerName, streamID) + + if tfresource.NotFound(err) { + return nil, "", nil + } + + if err != nil { + return nil, "", err + } + + return output, aws.StringValue(output.Status), nil + } +} + +func waitStreamDeleted(ctx context.Context, conn *qldb.QLDB, ledgerName, streamID string) (*qldb.JournalKinesisStreamDescription, error) { + stateConf := &resource.StateChangeConf{ + Pending: []string{qldb.StreamStatusActive, qldb.StreamStatusImpaired}, + Target: []string{}, + Refresh: statusStreamDeleted(ctx, conn, ledgerName, streamID), + Timeout: 5 * time.Minute, + MinTimeout: 1 * time.Second, + } + + outputRaw, err := stateConf.WaitForStateContext(ctx) + + if output, ok := outputRaw.(*qldb.JournalKinesisStreamDescription); ok { + tfresource.SetLastError(err, errors.New(aws.StringValue(output.ErrorCause))) + + return output, err + } + + return nil, err +} + +func expandKinesisConfiguration(tfMap map[string]interface{}) *qldb.KinesisConfiguration { + if tfMap == nil { + return nil + } + + apiObject := &qldb.KinesisConfiguration{} + + if v, ok := tfMap["aggregation_enabled"].(bool); ok { + apiObject.AggregationEnabled = aws.Bool(v) + } + + if v, ok := tfMap["stream_arn"].(string); ok && v != "" { + apiObject.StreamArn = aws.String(v) + } + + return apiObject +} + +func flattenKinesisConfiguration(apiObject *qldb.KinesisConfiguration) map[string]interface{} { + if apiObject == nil { + return nil + } + + tfMap := map[string]interface{}{} + + if v := apiObject.AggregationEnabled; v != nil { + tfMap["aggregation_enabled"] = aws.BoolValue(v) + } + + if v := apiObject.StreamArn; v != nil { + tfMap["stream_arn"] = aws.StringValue(v) + } + + return tfMap +} diff --git a/internal/service/qldb/stream_test.go b/internal/service/qldb/stream_test.go new file mode 100644 index 00000000000..8d6dfce917b --- /dev/null +++ b/internal/service/qldb/stream_test.go @@ -0,0 +1,305 @@ +package qldb_test + +import ( + "context" + "fmt" + "regexp" + "testing" + + "github.com/aws/aws-sdk-go/service/qldb" + sdkacctest "github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" + "github.com/hashicorp/terraform-plugin-sdk/v2/terraform" + "github.com/hashicorp/terraform-provider-aws/internal/acctest" + "github.com/hashicorp/terraform-provider-aws/internal/conns" + tfqldb "github.com/hashicorp/terraform-provider-aws/internal/service/qldb" + "github.com/hashicorp/terraform-provider-aws/internal/tfresource" +) + +func TestAccQLDBStream_basic(t *testing.T) { + var v qldb.JournalKinesisStreamDescription + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + resourceName := "aws_qldb_stream.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t); acctest.PreCheckPartitionHasService(qldb.EndpointsID, t) }, + ErrorCheck: acctest.ErrorCheck(t, qldb.EndpointsID), + Providers: acctest.Providers, + CheckDestroy: testAccCheckStreamDestroy, + Steps: []resource.TestStep{ + { + Config: testAccStreamConfig(rName), + Check: resource.ComposeAggregateTestCheckFunc( + testAccCheckStreamExists(resourceName, &v), + acctest.MatchResourceAttrRegionalARN(resourceName, "arn", "qldb", regexp.MustCompile(`stream/.+`)), + resource.TestCheckResourceAttr(resourceName, "exclusive_end_time", ""), + resource.TestCheckResourceAttrSet(resourceName, "inclusive_start_time"), + resource.TestCheckResourceAttr(resourceName, "kinesis_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "kinesis_configuration.0.aggregation_enabled", "true"), + resource.TestCheckResourceAttrSet(resourceName, "kinesis_configuration.0.stream_arn"), + resource.TestCheckResourceAttrSet(resourceName, "ledger_name"), + resource.TestCheckResourceAttrSet(resourceName, "role_arn"), + resource.TestCheckResourceAttr(resourceName, "stream_name", rName), + resource.TestCheckResourceAttr(resourceName, "tags.%", "0"), + ), + }, + }, + }) +} + +func TestAccQLDBStream_disappears(t *testing.T) { + var v qldb.JournalKinesisStreamDescription + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + resourceName := "aws_qldb_stream.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t); acctest.PreCheckPartitionHasService(qldb.EndpointsID, t) }, + ErrorCheck: acctest.ErrorCheck(t, qldb.EndpointsID), + Providers: acctest.Providers, + CheckDestroy: testAccCheckStreamDestroy, + Steps: []resource.TestStep{ + { + Config: testAccStreamConfig(rName), + Check: resource.ComposeTestCheckFunc( + testAccCheckStreamExists(resourceName, &v), + acctest.CheckResourceDisappears(acctest.Provider, tfqldb.ResourceStream(), resourceName), + ), + ExpectNonEmptyPlan: true, + }, + }, + }) +} + +func TestAccQLDBStream_tags(t *testing.T) { + var v qldb.JournalKinesisStreamDescription + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + resourceName := "aws_qldb_stream.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t); acctest.PreCheckPartitionHasService(qldb.EndpointsID, t) }, + ErrorCheck: acctest.ErrorCheck(t, qldb.EndpointsID), + Providers: acctest.Providers, + CheckDestroy: testAccCheckStreamDestroy, + Steps: []resource.TestStep{ + { + Config: testAccStreamConfigTags1(rName, "key1", "value1"), + Check: resource.ComposeTestCheckFunc( + testAccCheckStreamExists(resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "tags.%", "1"), + resource.TestCheckResourceAttr(resourceName, "tags.key1", "value1"), + ), + }, + { + Config: testAccStreamConfigTags2(rName, "key1", "value1updated", "key2", "value2"), + Check: resource.ComposeTestCheckFunc( + testAccCheckStreamExists(resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "tags.%", "2"), + resource.TestCheckResourceAttr(resourceName, "tags.key1", "value1updated"), + resource.TestCheckResourceAttr(resourceName, "tags.key2", "value2"), + ), + }, + { + Config: testAccStreamConfigTags1(rName, "key2", "value2"), + Check: resource.ComposeTestCheckFunc( + testAccCheckStreamExists(resourceName, &v), + resource.TestCheckResourceAttr(resourceName, "tags.%", "1"), + resource.TestCheckResourceAttr(resourceName, "tags.key2", "value2"), + ), + }, + }, + }) +} + +func TestAccQLDBStream_withEndTime(t *testing.T) { + var v qldb.JournalKinesisStreamDescription + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + resourceName := "aws_qldb_stream.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(t); acctest.PreCheckPartitionHasService(qldb.EndpointsID, t) }, + ErrorCheck: acctest.ErrorCheck(t, qldb.EndpointsID), + Providers: acctest.Providers, + CheckDestroy: testAccCheckStreamDestroy, + Steps: []resource.TestStep{ + { + Config: testAccStreamEndTimeConfig(rName), + Check: resource.ComposeTestCheckFunc( + testAccCheckStreamExists(resourceName, &v), + resource.TestCheckResourceAttrSet(resourceName, "exclusive_end_time"), + resource.TestCheckResourceAttrSet(resourceName, "inclusive_start_time"), + resource.TestCheckResourceAttr(resourceName, "kinesis_configuration.#", "1"), + resource.TestCheckResourceAttr(resourceName, "kinesis_configuration.0.aggregation_enabled", "false"), + resource.TestCheckResourceAttrSet(resourceName, "kinesis_configuration.0.stream_arn"), + ), + }, + }, + }) +} + +func testAccCheckStreamExists(n string, v *qldb.JournalKinesisStreamDescription) resource.TestCheckFunc { + return func(s *terraform.State) error { + rs, ok := s.RootModule().Resources[n] + if !ok { + return fmt.Errorf("Not found: %s", n) + } + + if rs.Primary.ID == "" { + return fmt.Errorf("No QLDB Stream ID is set") + } + + conn := acctest.Provider.Meta().(*conns.AWSClient).QLDBConn + + output, err := tfqldb.FindStream(context.TODO(), conn, rs.Primary.Attributes["ledger_name"], rs.Primary.ID) + + if err != nil { + return err + } + + *v = *output + + return nil + } +} + +func testAccCheckStreamDestroy(s *terraform.State) error { + conn := acctest.Provider.Meta().(*conns.AWSClient).QLDBConn + + for _, rs := range s.RootModule().Resources { + if rs.Type != "aws_qldb_stream" { + continue + } + + _, err := tfqldb.FindStream(context.TODO(), conn, rs.Primary.Attributes["ledger_name"], rs.Primary.ID) + + if tfresource.NotFound(err) { + continue + } + + if err != nil { + return err + } + + return fmt.Errorf("QLDB Stream %s still exists", rs.Primary.ID) + } + + return nil +} + +func testAccStreamBaseConfig(rName string) string { + return fmt.Sprintf(` +resource "aws_qldb_ledger" "test" { + name = %[1]q + permissions_mode = "ALLOW_ALL" + deletion_protection = false +} + +resource "aws_kinesis_stream" "test" { + name = %[1]q + shard_count = 1 + retention_period = 24 +} + +resource "aws_iam_role" "test" { + name = %[1]q + + assume_role_policy = jsonencode({ + Version = "2012-10-17" + Statement = [{ + Action = "sts:AssumeRole" + Effect = "Allow" + Sid = "" + Principal = { + Service = "qldb.amazonaws.com" + } + }] + }) + + inline_policy { + name = "test-qldb-policy" + policy = jsonencode({ + Version = "2012-10-17" + Statement = [{ + Action = [ + "kinesis:PutRecord*", + "kinesis:DescribeStream", + "kinesis:ListShards", + ] + Effect = "Allow" + Resource = aws_kinesis_stream.test.arn + }] + }) + } +} +`, rName) +} + +func testAccStreamConfig(rName string) string { + return acctest.ConfigCompose(testAccStreamBaseConfig(rName), fmt.Sprintf(` +resource "aws_qldb_stream" "test" { + stream_name = %[1]q + ledger_name = aws_qldb_ledger.test.id + inclusive_start_time = "2021-01-01T00:00:00Z" + role_arn = aws_iam_role.test.arn + + kinesis_configuration { + stream_arn = aws_kinesis_stream.test.arn + } +} +`, rName)) +} + +func testAccStreamEndTimeConfig(rName string) string { + return acctest.ConfigCompose(testAccStreamBaseConfig(rName), fmt.Sprintf(` +resource "aws_qldb_stream" "test" { + stream_name = %[1]q + ledger_name = aws_qldb_ledger.test.id + exclusive_end_time = "2021-12-31T23:59:59Z" + inclusive_start_time = "2021-01-01T00:00:00Z" + role_arn = aws_iam_role.test.arn + + kinesis_configuration { + aggregation_enabled = false + stream_arn = aws_kinesis_stream.test.arn + } +} +`, rName)) +} + +func testAccStreamConfigTags1(rName, tagKey1, tagValue1 string) string { + return acctest.ConfigCompose(testAccStreamBaseConfig(rName), fmt.Sprintf(` +resource "aws_qldb_stream" "test" { + stream_name = %[1]q + ledger_name = aws_qldb_ledger.test.id + inclusive_start_time = "2021-01-01T00:00:00Z" + role_arn = aws_iam_role.test.arn + + kinesis_configuration { + stream_arn = aws_kinesis_stream.test.arn + } + + tags = { + %[2]q = %[3]q + } +} +`, rName, tagKey1, tagValue1)) +} + +func testAccStreamConfigTags2(rName, tagKey1, tagValue1, tagKey2, tagValue2 string) string { + return acctest.ConfigCompose(testAccStreamBaseConfig(rName), fmt.Sprintf(` +resource "aws_qldb_stream" "test" { + stream_name = %[1]q + ledger_name = aws_qldb_ledger.test.id + inclusive_start_time = "2021-01-01T00:00:00Z" + role_arn = aws_iam_role.test.arn + + kinesis_configuration { + stream_arn = aws_kinesis_stream.test.arn + } + + tags = { + %[2]q = %[3]q + %[4]q = %[5]q + } +} +`, rName, tagKey1, tagValue1, tagKey2, tagValue2)) +} diff --git a/internal/service/qldb/sweep.go b/internal/service/qldb/sweep.go index eecf12e427c..2bfab20bcbf 100644 --- a/internal/service/qldb/sweep.go +++ b/internal/service/qldb/sweep.go @@ -9,6 +9,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/qldb" + multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" "github.com/hashicorp/terraform-provider-aws/internal/conns" "github.com/hashicorp/terraform-provider-aws/internal/sweep" @@ -18,46 +19,124 @@ func init() { resource.AddTestSweepers("aws_qldb_ledger", &resource.Sweeper{ Name: "aws_qldb_ledger", F: sweepLedgers, + Dependencies: []string{ + "aws_qldb_stream", + }, }) + + resource.AddTestSweepers("aws_qldb_stream", &resource.Sweeper{ + Name: "aws_qldb_stream", + F: sweepStreams, + }) + } func sweepLedgers(region string) error { client, err := sweep.SharedRegionalSweepClient(region) - if err != nil { - return fmt.Errorf("error getting client: %s", err) + return fmt.Errorf("error getting client: %w", err) } - conn := client.(*conns.AWSClient).QLDBConn input := &qldb.ListLedgersInput{} - page, err := conn.ListLedgers(input) + sweepResources := make([]*sweep.SweepResource, 0) - if err != nil { - if sweep.SkipSweepError(err) { - log.Printf("[WARN] Skipping QLDB Ledger sweep for %s: %s", region, err) - return nil + err = conn.ListLedgersPages(input, func(page *qldb.ListLedgersOutput, lastPage bool) bool { + if page == nil { + return !lastPage } - return fmt.Errorf("Error listing QLDB Ledgers: %s", err) - } - for _, item := range page.Ledgers { - input := &qldb.DeleteLedgerInput{ - Name: item.Name, + for _, v := range page.Ledgers { + r := ResourceLedger() + d := r.Data(nil) + d.SetId(aws.StringValue(v.Name)) + + sweepResources = append(sweepResources, sweep.NewSweepResource(r, d, client)) } - name := aws.StringValue(item.Name) - log.Printf("[INFO] Deleting QLDB Ledger: %s", name) - _, err = conn.DeleteLedger(input) + return !lastPage + }) + + if sweep.SkipSweepError(err) { + log.Printf("[WARN] Skipping QLDB Ledger sweep for %s: %s", region, err) + return nil + } + + if err != nil { + return fmt.Errorf("error listing QLDB Ledgers (%s): %w", region, err) + } + + err = sweep.SweepOrchestrator(sweepResources) + + if err != nil { + return fmt.Errorf("error sweeping QLDB Ledgers (%s): %w", region, err) + } + + return nil +} + +func sweepStreams(region string) error { + client, err := sweep.SharedRegionalSweepClient(region) + if err != nil { + return fmt.Errorf("error getting client: %w", err) + } + conn := client.(*conns.AWSClient).QLDBConn + input := &qldb.ListLedgersInput{} + var sweeperErrs *multierror.Error + sweepResources := make([]*sweep.SweepResource, 0) - if err != nil { - log.Printf("[ERROR] Failed to delete QLDB Ledger %s: %s", name, err) - continue + err = conn.ListLedgersPages(input, func(page *qldb.ListLedgersOutput, lastPage bool) bool { + if page == nil { + return !lastPage } - if err := WaitForLedgerDeletion(conn, name); err != nil { - log.Printf("[ERROR] Error waiting for QLDB Ledger (%s) deletion: %s", name, err) + for _, v := range page.Ledgers { + input := &qldb.ListJournalKinesisStreamsForLedgerInput{ + LedgerName: v.Name, + } + + err := conn.ListJournalKinesisStreamsForLedgerPages(input, func(page *qldb.ListJournalKinesisStreamsForLedgerOutput, lastPage bool) bool { + if page == nil { + return !lastPage + } + + for _, v := range page.Streams { + r := ResourceStream() + d := r.Data(nil) + d.SetId(aws.StringValue(v.StreamId)) + d.Set("ledger_name", v.LedgerName) + + sweepResources = append(sweepResources, sweep.NewSweepResource(r, d, client)) + } + + return !lastPage + }) + + if sweep.SkipSweepError(err) { + continue + } + + if err != nil { + sweeperErrs = multierror.Append(sweeperErrs, fmt.Errorf("error listing QLDB Streams (%s): %w", region, err)) + } } + + return !lastPage + }) + + if sweep.SkipSweepError(err) { + log.Printf("[WARN] Skipping QLDB Stream sweep for %s: %s", region, err) + return sweeperErrs.ErrorOrNil() // In case we have completed some pages, but had errors } - return nil + if err != nil { + sweeperErrs = multierror.Append(sweeperErrs, fmt.Errorf("error listing QLDB Ledgers (%s): %w", region, err)) + } + + err = sweep.SweepOrchestrator(sweepResources) + + if err != nil { + sweeperErrs = multierror.Append(sweeperErrs, fmt.Errorf("error sweeping QLDB Streams (%s): %w", region, err)) + } + + return sweeperErrs.ErrorOrNil() } diff --git a/website/docs/r/qldb_ledger.html.markdown b/website/docs/r/qldb_ledger.html.markdown index 73f7734c0ce..c9ce38b9e78 100644 --- a/website/docs/r/qldb_ledger.html.markdown +++ b/website/docs/r/qldb_ledger.html.markdown @@ -25,9 +25,10 @@ resource "aws_qldb_ledger" "sample-ledger" { The following arguments are supported: +* `deletion_protection` - (Optional) The deletion protection for the QLDB Ledger instance. By default it is `true`. To delete this resource via Terraform, this value must be configured to `false` and applied first before attempting deletion. +* `kms_key` - (Optional) The key in AWS Key Management Service (AWS KMS) to use for encryption of data at rest in the ledger. For more information, see the [AWS documentation](https://docs.aws.amazon.com/qldb/latest/developerguide/encryption-at-rest.html). Valid values are `"AWS_OWNED_KMS_KEY"` to use an AWS KMS key that is owned and managed by AWS on your behalf, or the ARN of a valid symmetric customer managed KMS key. * `name` - (Optional) The friendly name for the QLDB Ledger instance. By default generated by Terraform. * `permissions_mode` - (Required) The permissions mode for the QLDB ledger instance. Specify either `ALLOW_ALL` or `STANDARD`. -* `deletion_protection` - (Optional) The deletion protection for the QLDB Ledger instance. By default it is `true`. To delete this resource via Terraform, this value must be configured to `false` and applied first before attempting deletion. * `tags` - (Optional) Key-value map of resource tags. If configured with a provider [`default_tags` configuration block](https://www.terraform.io/docs/providers/aws/index.html#default_tags-configuration-block) present, tags with matching keys will overwrite those defined at the provider-level. ## Attributes Reference diff --git a/website/docs/r/qldb_stream.html.markdown b/website/docs/r/qldb_stream.html.markdown new file mode 100644 index 00000000000..c60a48bb5c9 --- /dev/null +++ b/website/docs/r/qldb_stream.html.markdown @@ -0,0 +1,58 @@ +--- +subcategory: "QLDB (Quantum Ledger Database)" +layout: "aws" +page_title: "AWS: aws_qldb_stream" +description: |- + Provides a QLDB Stream resource. +--- + +# Resource: aws_qldb_stream + +Provides an AWS Quantum Ledger Database (QLDB) Stream resource + +## Example Usage + +```terraform +resource "aws_qldb_stream" "example" { + ledger_name = "existing-ledger-name" + stream_name = "sample-ledger-stream" + role_arn = "sample-role-arn" + inclusive_start_time = "2021-01-01T00:00:00Z" + + kinesis_configuration { + aggegation_enabled = false + stream_arn = "arn:aws:kinesis:us-east-1:xxxxxxxxxxxx:stream/example-kinesis-stream" + } + + tags = { + "example" = "tag" + } +} +``` + +## Argument Reference + +The following arguments are supported: + +* `exclusive_end_time` - (Optional) The exclusive date and time that specifies when the stream ends. If you don't define this parameter, the stream runs indefinitely until you cancel it. It must be in ISO 8601 date and time format and in Universal Coordinated Time (UTC). For example: `"2019-06-13T21:36:34Z"`. +* `inclusive_start_time` - (Required) The inclusive start date and time from which to start streaming journal data. This parameter must be in ISO 8601 date and time format and in Universal Coordinated Time (UTC). For example: `"2019-06-13T21:36:34Z"`. This cannot be in the future and must be before `exclusive_end_time`. If you provide a value that is before the ledger's `CreationDateTime`, QLDB effectively defaults it to the ledger's `CreationDateTime`. +* `kinesis_configuration` - (Required) The configuration settings of the Kinesis Data Streams destination for your stream request. Documented below. +* `ledger_name` - (Required) The name of the QLDB ledger. +* `role_arn` - (Required) The Amazon Resource Name (ARN) of the IAM role that grants QLDB permissions for a journal stream to write data records to a Kinesis Data Streams resource. +* `stream_name` - (Required) The name that you want to assign to the QLDB journal stream. User-defined names can help identify and indicate the purpose of a stream. Your stream name must be unique among other active streams for a given ledger. Stream names have the same naming constraints as ledger names, as defined in the [Amazon QLDB Developer Guide](https://docs.aws.amazon.com/qldb/latest/developerguide/limits.html#limits.naming). +* `tags` - (Optional) Key-value map of resource tags. If configured with a provider [`default_tags` configuration block](https://www.terraform.io/docs/providers/aws/index.html#default_tags-configuration-block) present, tags with matching keys will overwrite those defined at the provider-level. + +### kinesis_configuration + +The `kinesis_configuration` block supports the following arguments: + +* `aggregation_enabled` - (Optional) Enables QLDB to publish multiple data records in a single Kinesis Data Streams record, increasing the number of records sent per API call. Default: `true`. +* `stream_arn` - (Required) The Amazon Resource Name (ARN) of the Kinesis Data Streams resource. + +## Attributes Reference + +In addition to all arguments above, the following attributes are exported: + +* `id` - The ID of the QLDB Stream. +* `arn` - The ARN of the QLDB Stream. +* `tags_all` - A map of tags assigned to the resource, including those inherited from the provider [`default_tags` configuration block](https://www.terraform.io/docs/providers/aws/index.html#default_tags-configuration-block).