diff --git a/cdc/api/v1/validator_test.go b/cdc/api/v1/validator_test.go index 7f6a7f654c2..42ea3913eef 100644 --- a/cdc/api/v1/validator_test.go +++ b/cdc/api/v1/validator_test.go @@ -39,7 +39,7 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) { // test no change error changefeedConfig = model.ChangefeedConfig{SinkURI: "blackhole://"} oldInfo.SinkURI = "blackhole://" - oldInfo.Config.Sink.TxnAtomicity = "table" + oldInfo.Config.Sink.TxnAtomicity = "none" newInfo, err = VerifyUpdateChangefeedConfig(ctx, changefeedConfig, oldInfo) require.NotNil(t, err) require.Regexp(t, ".*changefeed config is the same with the old one.*", err) diff --git a/cdc/api/v2/api_helpers_test.go b/cdc/api/v2/api_helpers_test.go index 34e9b34bfb7..70f2ba2bdb3 100644 --- a/cdc/api/v2/api_helpers_test.go +++ b/cdc/api/v2/api_helpers_test.go @@ -126,7 +126,7 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) { newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0) require.Nil(t, err) // startTs can not be updated - require.Equal(t, "table", string(newCfInfo.Config.Sink.TxnAtomicity)) + require.Equal(t, "none", string(newCfInfo.Config.Sink.TxnAtomicity)) newCfInfo.Config.Sink.TxnAtomicity = "" require.Equal(t, uint64(0), newCfInfo.StartTs) require.Equal(t, uint64(10), newCfInfo.TargetTs) diff --git a/cdc/sink/mysql/mysql_params.go b/cdc/sink/mysql/mysql_params.go index a92de3af802..388fc2b31fe 100644 --- a/cdc/sink/mysql/mysql_params.go +++ b/cdc/sink/mysql/mysql_params.go @@ -49,9 +49,10 @@ const ( defaultReadTimeout = "2m" defaultWriteTimeout = "2m" defaultDialTimeout = "2m" - defaultSafeMode = true - defaultTxnIsolationRC = "READ-COMMITTED" - defaultCharacterSet = "utf8mb4" + // Note(dongmen): defaultSafeMode is set to false since v6.4.0. + defaultSafeMode = false + defaultTxnIsolationRC = "READ-COMMITTED" + defaultCharacterSet = "utf8mb4" ) var ( diff --git a/cdc/sink/mysql/mysql_params_test.go b/cdc/sink/mysql/mysql_params_test.go index 7bccf7aca35..13de90e106e 100644 --- a/cdc/sink/mysql/mysql_params_test.go +++ b/cdc/sink/mysql/mysql_params_test.go @@ -203,12 +203,12 @@ func TestParseSinkURIToParams(t *testing.T) { expected.maxTxnRow = 20 expected.batchReplaceEnabled = true expected.batchReplaceSize = 50 - expected.safeMode = true + expected.safeMode = false expected.timezone = `"UTC"` expected.changefeedID = model.DefaultChangeFeedID("cf-id") expected.tidbTxnMode = "pessimistic" uriStr := "mysql://127.0.0.1:3306/?worker-count=64&max-txn-row=20" + - "&batch-replace-enable=true&batch-replace-size=50&safe-mode=true" + + "&batch-replace-enable=true&batch-replace-size=50&safe-mode=false" + "&tidb-txn-mode=pessimistic" uri, err := url.Parse(uriStr) require.Nil(t, err) diff --git a/cdc/sink/mysql/mysql_test.go b/cdc/sink/mysql/mysql_test.go index f26450f422c..26352f4d468 100644 --- a/cdc/sink/mysql/mysql_test.go +++ b/cdc/sink/mysql/mysql_test.go @@ -1102,12 +1102,12 @@ func TestNewMySQLSinkExecDML(t *testing.T) { db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) require.Nil(t, err) mock.ExpectBegin() - mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`,`b`) VALUES (?,?),(?,?)"). + mock.ExpectExec("INSERT INTO `s1`.`t1`(`a`,`b`) VALUES (?,?),(?,?)"). WithArgs(1, "test", 2, "test"). WillReturnResult(sqlmock.NewResult(2, 2)) mock.ExpectCommit() mock.ExpectBegin() - mock.ExpectExec("REPLACE INTO `s1`.`t2`(`a`,`b`) VALUES (?,?),(?,?)"). + mock.ExpectExec("INSERT INTO `s1`.`t2`(`a`,`b`) VALUES (?,?),(?,?)"). WithArgs(1, "test", 2, "test"). WillReturnResult(sqlmock.NewResult(2, 2)) mock.ExpectCommit() diff --git a/cdc/sinkv2/eventsink/txn/mysql/mysql_test.go b/cdc/sinkv2/eventsink/txn/mysql/mysql_test.go index 1200dd2e160..4bafefb660a 100644 --- a/cdc/sinkv2/eventsink/txn/mysql/mysql_test.go +++ b/cdc/sinkv2/eventsink/txn/mysql/mysql_test.go @@ -260,7 +260,7 @@ func TestNewMySQLBackendExecDML(t *testing.T) { db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) require.Nil(t, err) mock.ExpectBegin() - mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`,`b`) VALUES (?,?),(?,?)"). + mock.ExpectExec("INSERT INTO `s1`.`t1`(`a`,`b`) VALUES (?,?),(?,?)"). WithArgs(1, "test", 2, "test"). WillReturnResult(sqlmock.NewResult(2, 2)) mock.ExpectCommit() @@ -772,7 +772,7 @@ func TestMySQLSinkExecDMLError(t *testing.T) { db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) require.Nil(t, err) mock.ExpectBegin() - mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`,`b`) VALUES (?,?)").WillDelayFor(1 * time.Second). + mock.ExpectExec("INSERT INTO `s1`.`t1`(`a`,`b`) VALUES (?,?)").WillDelayFor(1 * time.Second). WillReturnError(&dmysql.MySQLError{Number: mysql.ErrNoSuchTable}) mock.ExpectClose() return db, nil diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index 92680216e26..67f4389228a 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -330,7 +330,7 @@ func main() { } }() - <-consumer.ready // Await till the consumer has been set up + <-consumer.ready // wait till the consumer has been set up log.Info("TiCDC consumer up and running!...") sigterm := make(chan os.Signal, 1) diff --git a/pkg/applier/redo_test.go b/pkg/applier/redo_test.go index ecae2dbb6d0..8f8550b3469 100644 --- a/pkg/applier/redo_test.go +++ b/pkg/applier/redo_test.go @@ -234,7 +234,7 @@ func TestApplyDMLs(t *testing.T) { close(ddlEventCh) cfg := &RedoApplierConfig{ - SinkURI: "mysql://127.0.0.1:4000/?worker-count=1&max-txn-row=1&tidb_placement_mode=ignore", + SinkURI: "mysql://127.0.0.1:4000/?worker-count=1&max-txn-row=1&tidb_placement_mode=ignore&safe-mode=true", } ap := NewRedoApplier(cfg) err := ap.Apply(ctx) diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 5e09c0e9bcd..c4d24097c90 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -42,8 +42,11 @@ const ( // tableTxnAtomicity means atomicity of single table transactions is guaranteed. tableTxnAtomicity AtomicityLevel = "table" - defaultMqTxnAtomicity = noneTxnAtomicity - defaultMysqlTxnAtomicity = tableTxnAtomicity + defaultMqTxnAtomicity = noneTxnAtomicity + // Note(dongmen): We change this default value to `noneTxnAtomicity` in v6.4.0. + // TODO(dongmen): If everything goes well, we can remove this default value in v6.5.0, + // and keep a defaultTxnAtomicity is enough. + defaultMysqlTxnAtomicity = noneTxnAtomicity ) const ( diff --git a/pkg/config/sink_test.go b/pkg/config/sink_test.go index ce3e9ecec7b..fcfc2e21982 100644 --- a/pkg/config/sink_test.go +++ b/pkg/config/sink_test.go @@ -91,7 +91,7 @@ func TestValidateApplyParameter(t *testing.T) { { sinkURI: "mysql://normal:123456@127.0.0.1:3306", expectedErr: "", - expectedLevel: tableTxnAtomicity, + expectedLevel: noneTxnAtomicity, }, { sinkURI: "mysql://normal:123456@127.0.0.1:3306?transaction-atomicity=table", diff --git a/pkg/sink/mysql/config.go b/pkg/sink/mysql/config.go index 5a6ef02c6de..2b3e1e2e37e 100644 --- a/pkg/sink/mysql/config.go +++ b/pkg/sink/mysql/config.go @@ -52,9 +52,10 @@ const ( defaultReadTimeout = "2m" defaultWriteTimeout = "2m" defaultDialTimeout = "2m" - defaultSafeMode = true - defaultTxnIsolationRC = "READ-COMMITTED" - defaultCharacterSet = "utf8mb4" + // Note(dongmen): defaultSafeMode is set to false since v6.4.0. + defaultSafeMode = false + defaultTxnIsolationRC = "READ-COMMITTED" + defaultCharacterSet = "utf8mb4" // BackoffBaseDelay indicates the base delay time for retrying. BackoffBaseDelay = 500 * time.Millisecond diff --git a/pkg/sink/mysql/config_test.go b/pkg/sink/mysql/config_test.go index f5b1feb0450..68264363213 100644 --- a/pkg/sink/mysql/config_test.go +++ b/pkg/sink/mysql/config_test.go @@ -173,12 +173,12 @@ func TestApplySinkURIParamsToConfig(t *testing.T) { expected.MaxTxnRow = 20 expected.BatchReplaceEnabled = true expected.BatchReplaceSize = 50 - expected.SafeMode = true + expected.SafeMode = false expected.Timezone = `"UTC"` expected.tidbTxnMode = "pessimistic" expected.EnableOldValue = true uriStr := "mysql://127.0.0.1:3306/?worker-count=64&max-txn-row=20" + - "&batch-replace-enable=true&batch-replace-size=50&safe-mode=true" + + "&batch-replace-enable=true&batch-replace-size=50&safe-mode=false" + "&tidb-txn-mode=pessimistic" uri, err := url.Parse(uriStr) require.Nil(t, err) diff --git a/tests/integration_tests/_utils/run_kafka_consumer b/tests/integration_tests/_utils/run_kafka_consumer index fc97cf77e20..29ec63d5e4b 100755 --- a/tests/integration_tests/_utils/run_kafka_consumer +++ b/tests/integration_tests/_utils/run_kafka_consumer @@ -18,9 +18,9 @@ cd $workdir # some consumer may require `consumer_replica_config`, set it separately if [ "$consumer_replica_config" != "" ]; then echo "consumer replica config found: $consumer_replica_config" - cdc_kafka_consumer --log-file $workdir/cdc_kafka_consumer$log_suffix.log --log-level info --upstream-uri $sink_uri --downstream-uri mysql://root@127.0.0.1:3306/ --config $consumer_replica_config >>$workdir/cdc_kafka_consumer_stdout$log_suffix.log 2>&1 & + cdc_kafka_consumer --log-file $workdir/cdc_kafka_consumer$log_suffix.log --log-level info --upstream-uri $sink_uri --downstream-uri mysql://root@127.0.0.1:3306/?safe-mode=true --config $consumer_replica_config >>$workdir/cdc_kafka_consumer_stdout$log_suffix.log 2>&1 & else - cdc_kafka_consumer --log-file $workdir/cdc_kafka_consumer$log_suffix.log --log-level info --upstream-uri $sink_uri --downstream-uri mysql://root@127.0.0.1:3306/ >>$workdir/cdc_kafka_consumer_stdout$log_suffix.log 2>&1 & + cdc_kafka_consumer --log-file $workdir/cdc_kafka_consumer$log_suffix.log --log-level info --upstream-uri $sink_uri --downstream-uri mysql://root@127.0.0.1:3306/?safe-mode=true >>$workdir/cdc_kafka_consumer_stdout$log_suffix.log 2>&1 & fi cd $pwd