Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for additional kinesis_settings to dms_endpoint #20084

Merged
merged 11 commits into from
Oct 18, 2021
3 changes: 3 additions & 0 deletions .changelog/20084.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_dms_endpoint: Add `include_transaction_details`, `include_partition_value`, `partition_include_schema_table`, `include_table_alter_operations`, `include_control_details` and `include_null_and_empty` arguments to `kinesis_settings` configuration block
```
147 changes: 122 additions & 25 deletions internal/service/dms/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,43 @@ func ResourceEndpoint() *schema.Resource {
DiffSuppressFunc: verify.SuppressMissingOptionalConfigurationBlock,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"include_control_details": {
Type: schema.TypeBool,
Optional: true,
Default: false,
},
"include_null_and_empty": {
Type: schema.TypeBool,
Optional: true,
Default: false,
},
"include_partition_value": {
Type: schema.TypeBool,
Optional: true,
Default: false,
},
"include_table_alter_operations": {
Type: schema.TypeBool,
Optional: true,
Default: false,
},
"include_transaction_details": {
Type: schema.TypeBool,
Optional: true,
Default: false,
},
"message_format": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
Default: dms.MessageFormatValueJson,
ValidateFunc: validation.StringInSlice(dms.MessageFormatValue_Values(), false),
},
"partition_include_schema_table": {
Type: schema.TypeBool,
Optional: true,
Default: false,
},
"service_access_role_arn": {
Type: schema.TypeString,
Optional: true,
Expand Down Expand Up @@ -431,11 +462,7 @@ func resourceEndpointCreate(d *schema.ResourceData, meta interface{}) error {
case engineNameKafka:
request.KafkaSettings = expandDmsKafkaSettings(d.Get("kafka_settings").([]interface{})[0].(map[string]interface{}))
case engineNameKinesis:
request.KinesisSettings = &dms.KinesisSettings{
MessageFormat: aws.String(d.Get("kinesis_settings.0.message_format").(string)),
ServiceAccessRoleArn: aws.String(d.Get("kinesis_settings.0.service_access_role_arn").(string)),
StreamArn: aws.String(d.Get("kinesis_settings.0.stream_arn").(string)),
}
request.KinesisSettings = expandDmsKinesisSettings(d.Get("kinesis_settings").([]interface{})[0].(map[string]interface{}))
case engineNameMongodb:
request.MongoDbSettings = &dms.MongoDbSettings{
Username: aws.String(d.Get("username").(string)),
Expand Down Expand Up @@ -656,16 +683,8 @@ func resourceEndpointUpdate(d *schema.ResourceData, meta interface{}) error {
hasChanges = true
}
case engineNameKinesis:
if d.HasChanges(
"kinesis_settings.0.service_access_role_arn",
"kinesis_settings.0.stream_arn") {
// Intentionally omitting MessageFormat, because it's rejected on ModifyEndpoint calls.
// "An error occurred (InvalidParameterValueException) when calling the ModifyEndpoint
// operation: Message format cannot be modified for kinesis endpoints."
request.KinesisSettings = &dms.KinesisSettings{
ServiceAccessRoleArn: aws.String(d.Get("kinesis_settings.0.service_access_role_arn").(string)),
StreamArn: aws.String(d.Get("kinesis_settings.0.stream_arn").(string)),
}
if d.HasChanges("kinesis_settings") {
request.KinesisSettings = expandDmsKinesisSettings(d.Get("kinesis_settings").([]interface{})[0].(map[string]interface{}))
request.EngineName = aws.String(engineName)
hasChanges = true
}
Expand Down Expand Up @@ -852,8 +871,8 @@ func resourceEndpointSetState(d *schema.ResourceData, endpoint *dms.Endpoint) er
d.Set("kafka_settings", nil)
}
case "kinesis":
if err := d.Set("kinesis_settings", flattenDmsKinesisSettings(endpoint.KinesisSettings)); err != nil {
return fmt.Errorf("Error setting kinesis_settings for DMS: %s", err)
if err := d.Set("kinesis_settings", []interface{}{flattenDmsKinesisSettings(endpoint.KinesisSettings)}); err != nil {
return fmt.Errorf("error setting kinesis_settings: %w", err)
}
case "mongodb":
if endpoint.MongoDbSettings != nil {
Expand Down Expand Up @@ -1067,18 +1086,96 @@ func flattenDmsKafkaSettings(apiObject *dms.KafkaSettings) map[string]interface{
return tfMap
}

func flattenDmsKinesisSettings(settings *dms.KinesisSettings) []map[string]interface{} {
if settings == nil {
return []map[string]interface{}{}
func expandDmsKinesisSettings(tfMap map[string]interface{}) *dms.KinesisSettings {
if tfMap == nil {
return nil
}

m := map[string]interface{}{
"message_format": aws.StringValue(settings.MessageFormat),
"service_access_role_arn": aws.StringValue(settings.ServiceAccessRoleArn),
"stream_arn": aws.StringValue(settings.StreamArn),
apiObject := &dms.KinesisSettings{}

if v, ok := tfMap["include_control_details"].(bool); ok {
apiObject.IncludeControlDetails = aws.Bool(v)
}

return []map[string]interface{}{m}
if v, ok := tfMap["include_null_and_empty"].(bool); ok {
apiObject.IncludeNullAndEmpty = aws.Bool(v)
}

if v, ok := tfMap["include_partition_value"].(bool); ok {
apiObject.IncludePartitionValue = aws.Bool(v)
}

if v, ok := tfMap["include_table_alter_operations"].(bool); ok {
apiObject.IncludeTableAlterOperations = aws.Bool(v)
}

if v, ok := tfMap["include_transaction_details"].(bool); ok {
apiObject.IncludeTransactionDetails = aws.Bool(v)
}

if v, ok := tfMap["message_format"].(string); ok && v != "" {
apiObject.MessageFormat = aws.String(v)
}

if v, ok := tfMap["partition_include_schema_table"].(bool); ok {
apiObject.PartitionIncludeSchemaTable = aws.Bool(v)
}

if v, ok := tfMap["service_access_role_arn"].(string); ok && v != "" {
apiObject.ServiceAccessRoleArn = aws.String(v)
}

if v, ok := tfMap["stream_arn"].(string); ok && v != "" {
apiObject.StreamArn = aws.String(v)
}

return apiObject
}

func flattenDmsKinesisSettings(apiObject *dms.KinesisSettings) map[string]interface{} {
if apiObject == nil {
return nil
}

tfMap := map[string]interface{}{}

if v := apiObject.IncludeControlDetails; v != nil {
tfMap["include_control_details"] = aws.BoolValue(v)
}

if v := apiObject.IncludeNullAndEmpty; v != nil {
tfMap["include_null_and_empty"] = aws.BoolValue(v)
}

if v := apiObject.IncludePartitionValue; v != nil {
tfMap["include_partition_value"] = aws.BoolValue(v)
}

if v := apiObject.IncludeTableAlterOperations; v != nil {
tfMap["include_table_alter_operations"] = aws.BoolValue(v)
}

if v := apiObject.IncludeTransactionDetails; v != nil {
tfMap["include_transaction_details"] = aws.BoolValue(v)
}

if v := apiObject.MessageFormat; v != nil {
tfMap["message_format"] = aws.StringValue(v)
}

if v := apiObject.PartitionIncludeSchemaTable; v != nil {
tfMap["partition_include_schema_table"] = aws.BoolValue(v)
}

if v := apiObject.ServiceAccessRoleArn; v != nil {
tfMap["service_access_role_arn"] = aws.StringValue(v)
}

if v := apiObject.StreamArn; v != nil {
tfMap["stream_arn"] = aws.StringValue(v)
}

return tfMap
}

func flattenDmsMongoDbSettings(settings *dms.MongoDbSettings) []map[string]interface{} {
Expand Down
Loading