Skip to content

Commit

Permalink
f-aws_msk_replicator:Argument for Replication starting position
Browse files Browse the repository at this point in the history
  • Loading branch information
nikhil-goenka committed Apr 17, 2024
1 parent 9316322 commit 2302056
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 0 deletions.
43 changes: 43 additions & 0 deletions internal/service/kafka/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,21 @@ func resourceReplicator() *schema.Resource {
Optional: true,
Default: true,
},
"starting_position": {
Type: schema.TypeList,
Optional: true,
Computed: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"type": {
Type: schema.TypeString,
Optional: true,
ValidateDiagFunc: enum.Validate[types.ReplicationStartingPositionType](),
},
},
},
},
"topics_to_exclude": {
Type: schema.TypeSet,
Optional: true,
Expand Down Expand Up @@ -555,6 +570,10 @@ func flattenTopicReplication(apiObject *types.TopicReplication) map[string]inter
tfMap["topics_to_exclude"] = flex.FlattenStringValueSet(v)
}

if v := apiObject.StartingPosition; v != nil {
tfMap["starting_position"] = []interface{}{flattenStartingPosition(v)}
}

if aws.ToBool(apiObject.CopyTopicConfigurations) {
tfMap["copy_topic_configurations"] = apiObject.CopyTopicConfigurations
}
Expand All @@ -570,6 +589,16 @@ func flattenTopicReplication(apiObject *types.TopicReplication) map[string]inter
return tfMap
}

func flattenStartingPosition(apiObject *types.ReplicationStartingPosition) map[string]interface{} {
tfMap := map[string]interface{}{}

if v := apiObject.Type; v != "" {
tfMap["type"] = v
}

return tfMap
}

func flattenKafkaClusterDescriptions(apiObjects []types.KafkaClusterDescription) []interface{} { // nosemgrep:ci.kafka-in-func-name
if len(apiObjects) == 0 {
return nil
Expand Down Expand Up @@ -757,6 +786,10 @@ func expandTopicReplication(tfMap map[string]interface{}) *types.TopicReplicatio
apiObject.TopicsToExclude = flex.ExpandStringValueSet(v)
}

if v, ok := tfMap["starting_position"].([]interface{}); ok {
apiObject.StartingPosition = expandStartingPosition(v[0].(map[string]interface{}))
}

if v, ok := tfMap["copy_topic_configurations"].(bool); ok {
apiObject.CopyTopicConfigurations = aws.Bool(v)
}
Expand All @@ -772,6 +805,16 @@ func expandTopicReplication(tfMap map[string]interface{}) *types.TopicReplicatio
return apiObject
}

func expandStartingPosition(tfMap map[string]interface{}) *types.ReplicationStartingPosition {
apiObject := &types.ReplicationStartingPosition{}

if v, ok := tfMap["type"].(string); ok {
apiObject.Type = types.ReplicationStartingPositionType(v)
}

return apiObject
}

func expandKafkaClusters(tfList []interface{}) []types.KafkaCluster { // nosemgrep:ci.kafka-in-func-name
if len(tfList) == 0 {
return nil
Expand Down
1 change: 1 addition & 0 deletions internal/service/kafka/replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func TestAccKafkaReplicator_basic(t *testing.T) {
resource.TestCheckResourceAttr(resourceName, "kafka_cluster.1.vpc_config.0.security_groups_ids.#", "1"),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.target_compression_type", "NONE"),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.topic_replication.0.topics_to_replicate.#", "1"),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.topic_replication.0.starting_position.#", "1"),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.consumer_group_replication.0.consumer_groups_to_replicate.#", "1"),
),
},
Expand Down

0 comments on commit 2302056

Please sign in to comment.