Skip to content

Commit

Permalink
Merge pull request #4406 from microamp/dms-source-endpoint-mongodb
Browse files Browse the repository at this point in the history
 Add support for MongoDB as a DMS source endpoint
  • Loading branch information
bflad authored May 2, 2018
2 parents cbc11fc + e1e0c92 commit 2ebca8e
Show file tree
Hide file tree
Showing 3 changed files with 265 additions and 39 deletions.
198 changes: 161 additions & 37 deletions aws/resource_aws_dms_endpoint.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package aws

import (
"fmt"
"log"
"strings"
"time"
Expand Down Expand Up @@ -71,6 +72,7 @@ func resourceAwsDmsEndpoint() *schema.Resource {
"redshift",
"sybase",
"sqlserver",
"mongodb",
}, false),
},
"extra_connection_attributes": {
Expand Down Expand Up @@ -117,6 +119,52 @@ func resourceAwsDmsEndpoint() *schema.Resource {
Type: schema.TypeString,
Optional: true,
},
// With default values as per https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Source.MongoDB.html
"mongodb_settings": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
DiffSuppressFunc: func(k, old, new string, d *schema.ResourceData) bool {
if old == "1" && new == "0" {
return true
}
return false
},
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"auth_type": {
Type: schema.TypeString,
Optional: true,
Default: "PASSWORD",
},
"auth_mechanism": {
Type: schema.TypeString,
Optional: true,
Default: "DEFAULT",
},
"nesting_level": {
Type: schema.TypeString,
Optional: true,
Default: "NONE",
},
"extract_doc_id": {
Type: schema.TypeString,
Optional: true,
Default: "false",
},
"docs_to_investigate": {
Type: schema.TypeString,
Optional: true,
Default: "1000",
},
"auth_source": {
Type: schema.TypeString,
Optional: true,
Default: "admin",
},
},
},
},
},
}
}
Expand All @@ -131,12 +179,28 @@ func resourceAwsDmsEndpointCreate(d *schema.ResourceData, meta interface{}) erro
Tags: dmsTagsFromMap(d.Get("tags").(map[string]interface{})),
}

switch d.Get("engine_name").(string) {
// if dynamodb then add required params
if d.Get("engine_name").(string) == "dynamodb" {
case "dynamodb":
request.DynamoDbSettings = &dms.DynamoDbSettings{
ServiceAccessRoleArn: aws.String(d.Get("service_access_role").(string)),
}
} else {
case "mongodb":
request.MongoDbSettings = &dms.MongoDbSettings{
Username: aws.String(d.Get("username").(string)),
Password: aws.String(d.Get("password").(string)),
ServerName: aws.String(d.Get("server_name").(string)),
Port: aws.Int64(int64(d.Get("port").(int))),
DatabaseName: aws.String(d.Get("database_name").(string)),

AuthType: aws.String(d.Get("mongodb_settings.0.auth_type").(string)),
AuthMechanism: aws.String(d.Get("mongodb_settings.0.auth_mechanism").(string)),
NestingLevel: aws.String(d.Get("mongodb_settings.0.nesting_level").(string)),
ExtractDocId: aws.String(d.Get("mongodb_settings.0.extract_doc_id").(string)),
DocsToInvestigate: aws.String(d.Get("mongodb_settings.0.docs_to_investigate").(string)),
AuthSource: aws.String(d.Get("mongodb_settings.0.auth_source").(string)),
}
default:
request.Password = aws.String(d.Get("password").(string))
request.Port = aws.Int64(int64(d.Get("port").(int)))
request.ServerName = aws.String(d.Get("server_name").(string))
Expand Down Expand Up @@ -215,9 +279,7 @@ func resourceAwsDmsEndpointRead(d *schema.ResourceData, meta interface{}) error
if err != nil {
return err
}
d.Set("tags", dmsTagsToMap(tagsResp.TagList))

return nil
return d.Set("tags", dmsTagsToMap(tagsResp.TagList))
}

func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) error {
Expand All @@ -233,11 +295,6 @@ func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) erro
hasChanges = true
}

if d.HasChange("database_name") {
request.DatabaseName = aws.String(d.Get("database_name").(string))
hasChanges = true
}

if d.HasChange("service_access_role") {
request.DynamoDbSettings = &dms.DynamoDbSettings{
ServiceAccessRoleArn: aws.String(d.Get("service_access_role").(string)),
Expand All @@ -260,38 +317,75 @@ func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) erro
hasChanges = true
}

if d.HasChange("password") {
request.Password = aws.String(d.Get("password").(string))
hasChanges = true
}

if d.HasChange("port") {
request.Port = aws.Int64(int64(d.Get("port").(int)))
hasChanges = true
}

if d.HasChange("server_name") {
request.ServerName = aws.String(d.Get("server_name").(string))
hasChanges = true
}

if d.HasChange("ssl_mode") {
request.SslMode = aws.String(d.Get("ssl_mode").(string))
hasChanges = true
}

if d.HasChange("username") {
request.Username = aws.String(d.Get("username").(string))
hasChanges = true
}

if d.HasChange("tags") {
err := dmsSetTags(d.Get("endpoint_arn").(string), d, meta)
if err != nil {
return err
}
}

switch d.Get("engine_name").(string) {
case "mongodb":
if d.HasChange("username") ||
d.HasChange("password") ||
d.HasChange("server_name") ||
d.HasChange("port") ||
d.HasChange("database_name") ||
d.HasChange("mongodb_settings.0.auth_type") ||
d.HasChange("mongodb_settings.0.auth_mechanism") ||
d.HasChange("mongodb_settings.0.nesting_level") ||
d.HasChange("mongodb_settings.0.extract_doc_id") ||
d.HasChange("mongodb_settings.0.docs_to_investigate") ||
d.HasChange("mongodb_settings.0.auth_source") {
request.MongoDbSettings = &dms.MongoDbSettings{
Username: aws.String(d.Get("username").(string)),
Password: aws.String(d.Get("password").(string)),
ServerName: aws.String(d.Get("server_name").(string)),
Port: aws.Int64(int64(d.Get("port").(int))),
DatabaseName: aws.String(d.Get("database_name").(string)),

AuthType: aws.String(d.Get("mongodb_settings.0.auth_type").(string)),
AuthMechanism: aws.String(d.Get("mongodb_settings.0.auth_mechanism").(string)),
NestingLevel: aws.String(d.Get("mongodb_settings.0.nesting_level").(string)),
ExtractDocId: aws.String(d.Get("mongodb_settings.0.extract_doc_id").(string)),
DocsToInvestigate: aws.String(d.Get("mongodb_settings.0.docs_to_investigate").(string)),
AuthSource: aws.String(d.Get("mongodb_settings.0.auth_source").(string)),
}
request.EngineName = aws.String(d.Get("engine_name").(string)) // Must be included (should be 'mongodb')
hasChanges = true
}
default:
if d.HasChange("database_name") {
request.DatabaseName = aws.String(d.Get("database_name").(string))
hasChanges = true
}

if d.HasChange("password") {
request.Password = aws.String(d.Get("password").(string))
hasChanges = true
}

if d.HasChange("port") {
request.Port = aws.Int64(int64(d.Get("port").(int)))
hasChanges = true
}

if d.HasChange("server_name") {
request.ServerName = aws.String(d.Get("server_name").(string))
hasChanges = true
}

if d.HasChange("username") {
request.Username = aws.String(d.Get("username").(string))
hasChanges = true
}
}

if hasChanges {
log.Println("[DEBUG] DMS update endpoint:", request)

Expand All @@ -316,11 +410,7 @@ func resourceAwsDmsEndpointDelete(d *schema.ResourceData, meta interface{}) erro
log.Printf("[DEBUG] DMS delete endpoint: %#v", request)

_, err := conn.DeleteEndpoint(request)
if err != nil {
return err
}

return nil
return err
}

func resourceAwsDmsEndpointSetState(d *schema.ResourceData, endpoint *dms.Endpoint) error {
Expand All @@ -333,13 +423,30 @@ func resourceAwsDmsEndpointSetState(d *schema.ResourceData, endpoint *dms.Endpoi
d.Set("endpoint_type", strings.ToLower(*endpoint.EndpointType))
d.Set("engine_name", endpoint.EngineName)

if *endpoint.EngineName == "dynamodb" {
switch *endpoint.EngineName {
case "dynamodb":
if endpoint.DynamoDbSettings != nil {
d.Set("service_access_role", endpoint.DynamoDbSettings.ServiceAccessRoleArn)
} else {
d.Set("service_access_role", "")
}
} else {
case "mongodb":
if endpoint.MongoDbSettings != nil {
d.Set("username", endpoint.MongoDbSettings.Username)
d.Set("server_name", endpoint.MongoDbSettings.ServerName)
d.Set("port", endpoint.MongoDbSettings.Port)
d.Set("database_name", endpoint.MongoDbSettings.DatabaseName)

if err := d.Set("mongodb_settings", flattenDmsMongoDbSettings(endpoint.MongoDbSettings)); err != nil {
return fmt.Errorf("Error setting mongodb_settings for DMS: %s", err)
}
} else {
d.Set("username", endpoint.Username)
d.Set("server_name", endpoint.ServerName)
d.Set("port", endpoint.Port)
d.Set("database_name", endpoint.DatabaseName)
}
default:
d.Set("database_name", endpoint.DatabaseName)
d.Set("extra_connection_attributes", endpoint.ExtraConnectionAttributes)
d.Set("port", endpoint.Port)
Expand All @@ -352,3 +459,20 @@ func resourceAwsDmsEndpointSetState(d *schema.ResourceData, endpoint *dms.Endpoi

return nil
}

func flattenDmsMongoDbSettings(settings *dms.MongoDbSettings) []map[string]interface{} {
if settings == nil {
return []map[string]interface{}{}
}

m := map[string]interface{}{
"auth_type": aws.StringValue(settings.AuthType),
"auth_mechanism": aws.StringValue(settings.AuthMechanism),
"nesting_level": aws.StringValue(settings.NestingLevel),
"extract_doc_id": aws.StringValue(settings.ExtractDocId),
"docs_to_investigate": aws.StringValue(settings.DocsToInvestigate),
"auth_source": aws.StringValue(settings.AuthSource),
}

return []map[string]interface{}{m}
}
101 changes: 101 additions & 0 deletions aws/resource_aws_dms_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,49 @@ func TestAccAWSDmsEndpointDynamoDb(t *testing.T) {
})
}

func TestAccAWSDmsEndpointMongoDb(t *testing.T) {
resourceName := "aws_dms_endpoint.dms_endpoint"
randId := acctest.RandString(8) + "-mongodb"

resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: dmsEndpointDestroy,
Steps: []resource.TestStep{
{
Config: dmsEndpointMongoDbConfig(randId),
Check: resource.ComposeTestCheckFunc(
checkDmsEndpointExists(resourceName),
resource.TestCheckResourceAttrSet(resourceName, "endpoint_arn"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerifyIgnore: []string{"password"},
},
{
Config: dmsEndpointMongoDbConfigUpdate(randId),
Check: resource.ComposeTestCheckFunc(
checkDmsEndpointExists(resourceName),
resource.TestCheckResourceAttr(resourceName, "server_name", "tftest-new-server_name"),
resource.TestCheckResourceAttr(resourceName, "port", "27018"),
resource.TestCheckResourceAttr(resourceName, "username", "tftest-new-username"),
resource.TestCheckResourceAttr(resourceName, "password", "tftest-new-password"),
resource.TestCheckResourceAttr(resourceName, "database_name", "tftest-new-database_name"),
resource.TestCheckResourceAttr(resourceName, "ssl_mode", "require"),
resource.TestCheckResourceAttr(resourceName, "extra_connection_attributes", "key=value;"),
resource.TestCheckResourceAttr(resourceName, "mongodb_settings.#", "1"),
resource.TestCheckResourceAttr(resourceName, "mongodb_settings.0.auth_mechanism", "SCRAM_SHA_1"),
resource.TestCheckResourceAttr(resourceName, "mongodb_settings.0.nesting_level", "ONE"),
resource.TestCheckResourceAttr(resourceName, "mongodb_settings.0.extract_doc_id", "true"),
resource.TestCheckResourceAttr(resourceName, "mongodb_settings.0.docs_to_investigate", "1001"),
),
},
},
})
}

func dmsEndpointDestroy(s *terraform.State) error {
for _, rs := range s.RootModule().Resources {
if rs.Type != "aws_dms_endpoint" {
Expand Down Expand Up @@ -296,3 +339,61 @@ EOF
}
`, randId)
}

func dmsEndpointMongoDbConfig(randId string) string {
return fmt.Sprintf(`
resource "aws_dms_endpoint" "dms_endpoint" {
endpoint_id = "tf-test-dms-endpoint-%[1]s"
endpoint_type = "source"
engine_name = "mongodb"
server_name = "tftest"
port = 27017
username = "tftest"
password = "tftest"
database_name = "tftest"
ssl_mode = "none"
extra_connection_attributes = ""
tags {
Name = "tf-test-dms-endpoint-%[1]s"
Update = "to-update"
Remove = "to-remove"
}
mongodb_settings {
auth_type = "PASSWORD"
auth_mechanism = "DEFAULT"
nesting_level = "NONE"
extract_doc_id = "false"
docs_to_investigate = "1000"
auth_source = "admin"
}
}
`, randId)
}

func dmsEndpointMongoDbConfigUpdate(randId string) string {
return fmt.Sprintf(`
resource "aws_dms_endpoint" "dms_endpoint" {
endpoint_id = "tf-test-dms-endpoint-%[1]s"
endpoint_type = "source"
engine_name = "mongodb"
server_name = "tftest-new-server_name"
port = 27018
username = "tftest-new-username"
password = "tftest-new-password"
database_name = "tftest-new-database_name"
ssl_mode = "require"
extra_connection_attributes = "key=value;"
tags {
Name = "tf-test-dms-endpoint-%[1]s"
Update = "updated"
Add = "added"
}
mongodb_settings {
auth_mechanism = "SCRAM_SHA_1"
nesting_level = "ONE"
extract_doc_id = "true"
docs_to_investigate = "1001"
}
}
`, randId)
}
Loading

0 comments on commit 2ebca8e

Please sign in to comment.