Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dms/endpoint: Pause replication tasks for updates #34316

Merged
merged 10 commits into from
Nov 9, 2023
3 changes: 3 additions & 0 deletions .changelog/34316.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_dms_endpoint: Add `pause_replication_tasks`, which when set to `true`, pauses associated running replication tasks, regardless if they are managed by Terraform, prior to modifying the endpoint (only tasks paused by the resource will be restarted after the modification completes)
```
117 changes: 116 additions & 1 deletion internal/service/dms/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,10 @@ func ResourceEndpoint() *schema.Resource {
Sensitive: true,
ConflictsWith: []string{"secrets_manager_access_role_arn", "secrets_manager_arn"},
},
"pause_replication_tasks": {
Type: schema.TypeBool,
Optional: true,
},
"port": {
Type: schema.TypeInt,
Optional: true,
Expand Down Expand Up @@ -1304,11 +1308,25 @@ func resourceEndpointUpdate(ctx context.Context, d *schema.ResourceData, meta in
}
}

_, err := conn.ModifyEndpointWithContext(ctx, input)
var tasks []*dms.ReplicationTask
if v, ok := d.GetOk("pause_replication_tasks"); ok && v.(bool) {
var err error
tasks, err = stopEndpointReplicationTasks(ctx, conn, d.Get("endpoint_arn").(string))
if err != nil {
return sdkdiag.AppendErrorf(diags, "pausing replication tasks before updating DMS Endpoint (%s): %s", d.Id(), err)
}
}

_, err := conn.ModifyEndpointWithContext(ctx, input)
if err != nil {
return sdkdiag.AppendErrorf(diags, "updating DMS Endpoint (%s): %s", d.Id(), err)
}

if v, ok := d.GetOk("pause_replication_tasks"); ok && v.(bool) && len(tasks) > 0 {
if err := startEndpointReplicationTasks(ctx, conn, d.Get("endpoint_arn").(string), tasks); err != nil {
return sdkdiag.AppendErrorf(diags, "starting replication tasks after updating DMS Endpoint (%s): %s", d.Id(), err)
}
}
}

return append(diags, resourceEndpointRead(ctx, d, meta)...)
Expand Down Expand Up @@ -1581,6 +1599,103 @@ func resourceEndpointSetState(d *schema.ResourceData, endpoint *dms.Endpoint) er
return nil
}

func steadyEndpointReplicationTasks(ctx context.Context, conn *dms.DatabaseMigrationService, arn string) error {
tasks, err := FindReplicationTasksByEndpointARN(ctx, conn, arn)
if err != nil {
return err
}

for _, task := range tasks {
rtID := aws.StringValue(task.ReplicationTaskIdentifier)
switch aws.StringValue(task.Status) {
case replicationTaskStatusRunning, replicationTaskStatusFailed, replicationTaskStatusReady, replicationTaskStatusStopped:
continue
case replicationTaskStatusCreating, replicationTaskStatusDeleting, replicationTaskStatusModifying, replicationTaskStatusStopping, replicationTaskStatusStarting:
if err := waitReplicationTaskSteady(ctx, conn, rtID); err != nil {
return err
}
}
}

return nil
}

func stopEndpointReplicationTasks(ctx context.Context, conn *dms.DatabaseMigrationService, arn string) ([]*dms.ReplicationTask, error) {
if err := steadyEndpointReplicationTasks(ctx, conn, arn); err != nil {
return nil, err
}

tasks, err := FindReplicationTasksByEndpointARN(ctx, conn, arn)
if err != nil {
return nil, err
}

var stoppedTasks []*dms.ReplicationTask
for _, task := range tasks {
rtID := aws.StringValue(task.ReplicationTaskIdentifier)
switch aws.StringValue(task.Status) {
case replicationTaskStatusRunning:
err := stopReplicationTask(ctx, rtID, conn)
if tfawserr.ErrCodeEquals(err, dms.ErrCodeInvalidResourceStateFault) {
continue
}

if err != nil {
return stoppedTasks, err
}
stoppedTasks = append(stoppedTasks, task)
default:
continue
}
}

return stoppedTasks, nil
}

func startEndpointReplicationTasks(ctx context.Context, conn *dms.DatabaseMigrationService, arn string, tasks []*dms.ReplicationTask) error {
if len(tasks) == 0 {
return nil
}

if err := steadyEndpointReplicationTasks(ctx, conn, arn); err != nil {
return err
}

for _, task := range tasks {
_, err := conn.TestConnectionWithContext(ctx, &dms.TestConnectionInput{
EndpointArn: aws.String(arn),
ReplicationInstanceArn: task.ReplicationInstanceArn,
})

if tfawserr.ErrMessageContains(err, dms.ErrCodeInvalidResourceStateFault, "already being tested") {
continue
}

if err != nil {
return fmt.Errorf("testing connection: %w", err)
}

err = conn.WaitUntilTestConnectionSucceedsWithContext(ctx, &dms.DescribeConnectionsInput{
Filters: []*dms.Filter{
{
Name: aws.String("endpoint-arn"),
Values: []*string{aws.String(arn)},
},
},
})

if err != nil {
return fmt.Errorf("waiting until test connection succeeds: %w", err)
}

if err := startReplicationTask(ctx, conn, aws.StringValue(task.ReplicationTaskIdentifier)); err != nil {
return fmt.Errorf("starting replication task: %w", err)
}
}

return nil
}

func flattenOpenSearchSettings(settings *dms.ElasticsearchSettings) []map[string]interface{} {
if settings == nil {
return []map[string]interface{}{}
Expand Down
239 changes: 239 additions & 0 deletions internal/service/dms/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2112,6 +2112,43 @@ func TestAccDMSEndpoint_Redshift_SSEKMSKeyId(t *testing.T) {
})
}

func TestAccDMSEndpoint_pauseReplicationTasks(t *testing.T) {
ctx := acctest.Context(t)
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
endpointNameSource := "aws_dms_endpoint.source"
endpointNameTarget := "aws_dms_endpoint.target"
replicationTaskName := "aws_dms_replication_task.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(ctx, t) },
ErrorCheck: acctest.ErrorCheck(t, dms.EndpointsID),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckReplicationTaskDestroy(ctx),
Steps: []resource.TestStep{
{
Config: testAccEndpointConfig_pauseReplicationTasks(rName, "3306"),
Check: resource.ComposeTestCheckFunc(
testAccCheckEndpointExists(ctx, endpointNameSource),
testAccCheckEndpointExists(ctx, endpointNameTarget),
testAccCheckReplicationTaskExists(ctx, replicationTaskName),
resource.TestCheckResourceAttr(endpointNameTarget, "port", "3306"),
resource.TestCheckResourceAttr(replicationTaskName, "status", "running"),
),
},
{
Config: testAccEndpointConfig_pauseReplicationTasks(rName, "3307"),
Check: resource.ComposeTestCheckFunc(
testAccCheckEndpointExists(ctx, endpointNameSource),
testAccCheckEndpointExists(ctx, endpointNameTarget),
testAccCheckReplicationTaskExists(ctx, replicationTaskName),
resource.TestCheckResourceAttr(endpointNameTarget, "port", "3307"),
resource.TestCheckResourceAttr(replicationTaskName, "status", "running"),
),
},
},
})
}

// testAccCheckResourceAttrRegionalHostname ensures the Terraform state exactly matches a formatted DNS hostname with region and partition DNS suffix
func testAccCheckResourceAttrRegionalHostname(resourceName, attributeName, serviceName, hostnamePrefix string) resource.TestCheckFunc {
return func(s *terraform.State) error {
Expand Down Expand Up @@ -4471,3 +4508,205 @@ resource "aws_kms_key" "test" {
}
`, rName))
}

func testAccEndpointConfig_pauseReplicationTasks(rName, port string) string {
return acctest.ConfigCompose(
acctest.ConfigAvailableAZsNoOptIn(),
fmt.Sprintf(`
data "aws_partition" "current" {}

data "aws_region" "current" {}

resource "aws_vpc" "test" {
cidr_block = "10.1.0.0/16"

tags = {
Name = %[1]q
}
}

resource "aws_subnet" "test1" {
cidr_block = "10.1.1.0/24"
availability_zone = data.aws_availability_zones.available.names[0]
vpc_id = aws_vpc.test.id

tags = {
Name = %[1]q
}
}

resource "aws_subnet" "test2" {
cidr_block = "10.1.2.0/24"
availability_zone = data.aws_availability_zones.available.names[1]
vpc_id = aws_vpc.test.id

tags = {
Name = "%[1]s-2"
}
}

resource "aws_security_group" "test" {
vpc_id = aws_vpc.test.id

ingress {
protocol = -1
self = true
from_port = 0
to_port = 0
}

egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
}

resource "aws_db_subnet_group" "test" {
name = %[1]q
subnet_ids = [aws_subnet.test1.id, aws_subnet.test2.id]

tags = {
Name = %[1]q
}
}

data "aws_rds_engine_version" "default" {
engine = "aurora-mysql"
}

data "aws_rds_orderable_db_instance" "test" {
engine = data.aws_rds_engine_version.default.engine
engine_version = data.aws_rds_engine_version.default.version
preferred_instance_classes = ["db.t3.small", "db.t3.medium", "db.t3.large"]
}

resource "aws_rds_cluster_parameter_group" "test" {
name = "%[1]s-pg-cluster"
family = data.aws_rds_engine_version.default.parameter_group_family
description = "DMS cluster parameter group"

parameter {
name = "binlog_format"
value = "ROW"
apply_method = "pending-reboot"
}

parameter {
name = "binlog_row_image"
value = "Full"
apply_method = "pending-reboot"
}

parameter {
name = "binlog_checksum"
value = "NONE"
apply_method = "pending-reboot"
}
}

resource "aws_rds_cluster" "source" {
cluster_identifier = "%[1]s-aurora-cluster-source"
engine = data.aws_rds_orderable_db_instance.test.engine
engine_version = data.aws_rds_orderable_db_instance.test.engine_version
database_name = "tftest"
master_username = "tftest"
master_password = "mustbeeightcharaters"
skip_final_snapshot = true
vpc_security_group_ids = [aws_security_group.test.id]
db_subnet_group_name = aws_db_subnet_group.test.name
db_cluster_parameter_group_name = aws_rds_cluster_parameter_group.test.name
}

resource "aws_rds_cluster_instance" "source" {
identifier = "%[1]s-source-primary"
cluster_identifier = aws_rds_cluster.source.id
engine = data.aws_rds_orderable_db_instance.test.engine
engine_version = data.aws_rds_orderable_db_instance.test.engine_version
instance_class = data.aws_rds_orderable_db_instance.test.instance_class
db_subnet_group_name = aws_db_subnet_group.test.name
}

resource "aws_rds_cluster" "target" {
cluster_identifier = "%[1]s-aurora-cluster-target"
engine = data.aws_rds_orderable_db_instance.test.engine
engine_version = data.aws_rds_orderable_db_instance.test.engine_version
database_name = "tftest"
master_username = "tftest"
master_password = "mustbeeightcharaters"
skip_final_snapshot = true
vpc_security_group_ids = [aws_security_group.test.id]
db_subnet_group_name = aws_db_subnet_group.test.name
}

resource "aws_rds_cluster_instance" "target" {
identifier = "%[1]s-target-primary"
cluster_identifier = aws_rds_cluster.target.id
engine = data.aws_rds_orderable_db_instance.test.engine
engine_version = data.aws_rds_orderable_db_instance.test.engine_version
instance_class = data.aws_rds_orderable_db_instance.test.instance_class
db_subnet_group_name = aws_db_subnet_group.test.name
}

resource "aws_dms_endpoint" "source" {
database_name = "tftest"
endpoint_id = "%[1]s-source"
endpoint_type = "source"
engine_name = "aurora"
password = "mustbeeightcharaters"
pause_replication_tasks = true
port = %[2]s
server_name = aws_rds_cluster.source.endpoint
username = "tftest"
}

resource "aws_dms_endpoint" "target" {
database_name = "tftest"
endpoint_id = "%[1]s-target"
endpoint_type = "target"
engine_name = "aurora"
password = "mustbeeightcharaters"
pause_replication_tasks = true
port = %[2]s
server_name = aws_rds_cluster.target.endpoint
username = "tftest"
}

resource "aws_dms_replication_subnet_group" "test" {
replication_subnet_group_id = %[1]q
replication_subnet_group_description = "terraform test for replication subnet group"
subnet_ids = [aws_subnet.test1.id, aws_subnet.test2.id]
}

resource "aws_dms_replication_instance" "test" {
allocated_storage = 5
auto_minor_version_upgrade = true
replication_instance_class = "dms.c4.large"
replication_instance_id = %[1]q
preferred_maintenance_window = "sun:00:30-sun:02:30"
publicly_accessible = false
replication_subnet_group_id = aws_dms_replication_subnet_group.test.replication_subnet_group_id
vpc_security_group_ids = [aws_security_group.test.id]
}

resource "aws_dms_replication_task" "test" {
migration_type = "full-load-and-cdc"
replication_instance_arn = aws_dms_replication_instance.test.replication_instance_arn
replication_task_id = %[1]q
replication_task_settings = "{\"BeforeImageSettings\":null,\"FailTaskWhenCleanTaskResourceFailed\":false,\"ChangeProcessingDdlHandlingPolicy\":{\"HandleSourceTableAltered\":true,\"HandleSourceTableDropped\":true,\"HandleSourceTableTruncated\":true},\"ChangeProcessingTuning\":{\"BatchApplyMemoryLimit\":500,\"BatchApplyPreserveTransaction\":true,\"BatchApplyTimeoutMax\":30,\"BatchApplyTimeoutMin\":1,\"BatchSplitSize\":0,\"CommitTimeout\":1,\"MemoryKeepTime\":60,\"MemoryLimitTotal\":1024,\"MinTransactionSize\":1000,\"StatementCacheSize\":50},\"CharacterSetSettings\":null,\"ControlTablesSettings\":{\"ControlSchema\":\"\",\"FullLoadExceptionTableEnabled\":false,\"HistoryTableEnabled\":false,\"HistoryTimeslotInMinutes\":5,\"StatusTableEnabled\":false,\"SuspendedTablesTableEnabled\":false},\"ErrorBehavior\":{\"ApplyErrorDeletePolicy\":\"IGNORE_RECORD\",\"ApplyErrorEscalationCount\":0,\"ApplyErrorEscalationPolicy\":\"LOG_ERROR\",\"ApplyErrorFailOnTruncationDdl\":false,\"ApplyErrorInsertPolicy\":\"LOG_ERROR\",\"ApplyErrorUpdatePolicy\":\"LOG_ERROR\",\"DataErrorEscalationCount\":0,\"DataErrorEscalationPolicy\":\"SUSPEND_TABLE\",\"DataErrorPolicy\":\"LOG_ERROR\",\"DataTruncationErrorPolicy\":\"LOG_ERROR\",\"EventErrorPolicy\":\"IGNORE\",\"FailOnNoTablesCaptured\":false,\"FailOnTransactionConsistencyBreached\":false,\"FullLoadIgnoreConflicts\":true,\"RecoverableErrorCount\":-1,\"RecoverableErrorInterval\":5,\"RecoverableErrorStopRetryAfterThrottlingMax\":false,\"RecoverableErrorThrottling\":true,\"RecoverableErrorThrottlingMax\":1800,\"TableErrorEscalationCount\":0,\"TableErrorEscalationPolicy\":\"STOP_TASK\",\"TableErrorPolicy\":\"SUSPEND_TABLE\"},\"FullLoadSettings\":{\"CommitRate\":10000,\"CreatePkAfterFullLoad\":false,\"MaxFullLoadSubTasks\":8,\"StopTaskCachedChangesApplied\":false,\"StopTaskCachedChangesNotApplied\":false,\"TargetTablePrepMode\":\"DROP_AND_CREATE\",\"TransactionConsistencyTimeout\":600},\"Logging\":{\"EnableLogging\":false,\"LogComponents\":[{\"Id\":\"TRANSFORMATION\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"SOURCE_UNLOAD\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"IO\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"TARGET_LOAD\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"PERFORMANCE\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"SOURCE_CAPTURE\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"SORTER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"REST_SERVER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"VALIDATOR_EXT\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"TARGET_APPLY\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"TASK_MANAGER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"TABLES_MANAGER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"METADATA_MANAGER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"FILE_FACTORY\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"COMMON\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"ADDONS\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"DATA_STRUCTURE\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"COMMUNICATION\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"FILE_TRANSFER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"}]},\"LoopbackPreventionSettings\":null,\"PostProcessingRules\":null,\"StreamBufferSettings\":{\"CtrlStreamBufferSizeInMB\":5,\"StreamBufferCount\":3,\"StreamBufferSizeInMB\":8},\"TargetMetadata\":{\"BatchApplyEnabled\":false,\"FullLobMode\":false,\"InlineLobMaxSize\":0,\"LimitedSizeLobMode\":true,\"LoadMaxFileSize\":0,\"LobChunkSize\":0,\"LobMaxSize\":32,\"ParallelApplyBufferSize\":0,\"ParallelApplyQueuesPerThread\":0,\"ParallelApplyThreads\":0,\"ParallelLoadBufferSize\":0,\"ParallelLoadQueuesPerThread\":0,\"ParallelLoadThreads\":0,\"SupportLobs\":true,\"TargetSchema\":\"\",\"TaskRecoveryTableEnabled\":false},\"TTSettings\":{\"EnableTT\":false,\"TTRecordSettings\":null,\"TTS3Settings\":null}}"
source_endpoint_arn = aws_dms_endpoint.source.endpoint_arn
table_mappings = "{\"rules\":[{\"rule-type\":\"selection\",\"rule-id\":\"1\",\"rule-name\":\"testrule\",\"object-locator\":{\"schema-name\":\"%%\",\"table-name\":\"%%\"},\"rule-action\":\"include\"}]}"

start_replication_task = true

tags = {
Name = %[1]q
}

target_endpoint_arn = aws_dms_endpoint.target.endpoint_arn

depends_on = [aws_rds_cluster_instance.source, aws_rds_cluster_instance.target]
}
`, rName, port))
}
Loading
Loading