From 2302056671c9cb4676a48998d4fe16164ded24d2 Mon Sep 17 00:00:00 2001 From: nikhil Date: Wed, 17 Apr 2024 21:53:57 +0100 Subject: [PATCH] f-aws_msk_replicator:Argument for Replication starting position --- internal/service/kafka/replicator.go | 43 +++++++++++++++++++++++ internal/service/kafka/replicator_test.go | 1 + 2 files changed, 44 insertions(+) diff --git a/internal/service/kafka/replicator.go b/internal/service/kafka/replicator.go index d6ae8bb18fc8..2f03c703f2fc 100644 --- a/internal/service/kafka/replicator.go +++ b/internal/service/kafka/replicator.go @@ -159,6 +159,21 @@ func resourceReplicator() *schema.Resource { Optional: true, Default: true, }, + "starting_position": { + Type: schema.TypeList, + Optional: true, + Computed: true, + MaxItems: 1, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "type": { + Type: schema.TypeString, + Optional: true, + ValidateDiagFunc: enum.Validate[types.ReplicationStartingPositionType](), + }, + }, + }, + }, "topics_to_exclude": { Type: schema.TypeSet, Optional: true, @@ -555,6 +570,10 @@ func flattenTopicReplication(apiObject *types.TopicReplication) map[string]inter tfMap["topics_to_exclude"] = flex.FlattenStringValueSet(v) } + if v := apiObject.StartingPosition; v != nil { + tfMap["starting_position"] = []interface{}{flattenStartingPosition(v)} + } + if aws.ToBool(apiObject.CopyTopicConfigurations) { tfMap["copy_topic_configurations"] = apiObject.CopyTopicConfigurations } @@ -570,6 +589,16 @@ func flattenTopicReplication(apiObject *types.TopicReplication) map[string]inter return tfMap } +func flattenStartingPosition(apiObject *types.ReplicationStartingPosition) map[string]interface{} { + tfMap := map[string]interface{}{} + + if v := apiObject.Type; v != "" { + tfMap["type"] = v + } + + return tfMap +} + func flattenKafkaClusterDescriptions(apiObjects []types.KafkaClusterDescription) []interface{} { // nosemgrep:ci.kafka-in-func-name if len(apiObjects) == 0 { return nil @@ -757,6 +786,10 @@ func expandTopicReplication(tfMap map[string]interface{}) *types.TopicReplicatio apiObject.TopicsToExclude = flex.ExpandStringValueSet(v) } + if v, ok := tfMap["starting_position"].([]interface{}); ok { + apiObject.StartingPosition = expandStartingPosition(v[0].(map[string]interface{})) + } + if v, ok := tfMap["copy_topic_configurations"].(bool); ok { apiObject.CopyTopicConfigurations = aws.Bool(v) } @@ -772,6 +805,16 @@ func expandTopicReplication(tfMap map[string]interface{}) *types.TopicReplicatio return apiObject } +func expandStartingPosition(tfMap map[string]interface{}) *types.ReplicationStartingPosition { + apiObject := &types.ReplicationStartingPosition{} + + if v, ok := tfMap["type"].(string); ok { + apiObject.Type = types.ReplicationStartingPositionType(v) + } + + return apiObject +} + func expandKafkaClusters(tfList []interface{}) []types.KafkaCluster { // nosemgrep:ci.kafka-in-func-name if len(tfList) == 0 { return nil diff --git a/internal/service/kafka/replicator_test.go b/internal/service/kafka/replicator_test.go index aac652b2e35a..e939c747be90 100644 --- a/internal/service/kafka/replicator_test.go +++ b/internal/service/kafka/replicator_test.go @@ -55,6 +55,7 @@ func TestAccKafkaReplicator_basic(t *testing.T) { resource.TestCheckResourceAttr(resourceName, "kafka_cluster.1.vpc_config.0.security_groups_ids.#", "1"), resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.target_compression_type", "NONE"), resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.topic_replication.0.topics_to_replicate.#", "1"), + resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.topic_replication.0.starting_position.#", "1"), resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.consumer_group_replication.0.consumer_groups_to_replicate.#", "1"), ), },