Skip to content

Commit

Permalink
sink, config (ticdc): enable transaction split and disable safeMode b…
Browse files Browse the repository at this point in the history
…y default (#7504)

close #7505
  • Loading branch information
asddongmen authored Nov 3, 2022
1 parent 02a7f72 commit f16eb3c
Show file tree
Hide file tree
Showing 13 changed files with 28 additions and 23 deletions.
2 changes: 1 addition & 1 deletion cdc/api/v1/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) {
// test no change error
changefeedConfig = model.ChangefeedConfig{SinkURI: "blackhole://"}
oldInfo.SinkURI = "blackhole://"
oldInfo.Config.Sink.TxnAtomicity = "table"
oldInfo.Config.Sink.TxnAtomicity = "none"
newInfo, err = VerifyUpdateChangefeedConfig(ctx, changefeedConfig, oldInfo)
require.NotNil(t, err)
require.Regexp(t, ".*changefeed config is the same with the old one.*", err)
Expand Down
2 changes: 1 addition & 1 deletion cdc/api/v2/api_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) {
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.Nil(t, err)
// startTs can not be updated
require.Equal(t, "table", string(newCfInfo.Config.Sink.TxnAtomicity))
require.Equal(t, "none", string(newCfInfo.Config.Sink.TxnAtomicity))
newCfInfo.Config.Sink.TxnAtomicity = ""
require.Equal(t, uint64(0), newCfInfo.StartTs)
require.Equal(t, uint64(10), newCfInfo.TargetTs)
Expand Down
7 changes: 4 additions & 3 deletions cdc/sink/mysql/mysql_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ const (
defaultReadTimeout = "2m"
defaultWriteTimeout = "2m"
defaultDialTimeout = "2m"
defaultSafeMode = true
defaultTxnIsolationRC = "READ-COMMITTED"
defaultCharacterSet = "utf8mb4"
// Note(dongmen): defaultSafeMode is set to false since v6.4.0.
defaultSafeMode = false
defaultTxnIsolationRC = "READ-COMMITTED"
defaultCharacterSet = "utf8mb4"
)

var (
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/mysql/mysql_params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,12 @@ func TestParseSinkURIToParams(t *testing.T) {
expected.maxTxnRow = 20
expected.batchReplaceEnabled = true
expected.batchReplaceSize = 50
expected.safeMode = true
expected.safeMode = false
expected.timezone = `"UTC"`
expected.changefeedID = model.DefaultChangeFeedID("cf-id")
expected.tidbTxnMode = "pessimistic"
uriStr := "mysql://127.0.0.1:3306/?worker-count=64&max-txn-row=20" +
"&batch-replace-enable=true&batch-replace-size=50&safe-mode=true" +
"&batch-replace-enable=true&batch-replace-size=50&safe-mode=false" +
"&tidb-txn-mode=pessimistic"
uri, err := url.Parse(uriStr)
require.Nil(t, err)
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1102,12 +1102,12 @@ func TestNewMySQLSinkExecDML(t *testing.T) {
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
require.Nil(t, err)
mock.ExpectBegin()
mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`,`b`) VALUES (?,?),(?,?)").
mock.ExpectExec("INSERT INTO `s1`.`t1`(`a`,`b`) VALUES (?,?),(?,?)").
WithArgs(1, "test", 2, "test").
WillReturnResult(sqlmock.NewResult(2, 2))
mock.ExpectCommit()
mock.ExpectBegin()
mock.ExpectExec("REPLACE INTO `s1`.`t2`(`a`,`b`) VALUES (?,?),(?,?)").
mock.ExpectExec("INSERT INTO `s1`.`t2`(`a`,`b`) VALUES (?,?),(?,?)").
WithArgs(1, "test", 2, "test").
WillReturnResult(sqlmock.NewResult(2, 2))
mock.ExpectCommit()
Expand Down
4 changes: 2 additions & 2 deletions cdc/sinkv2/eventsink/txn/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func TestNewMySQLBackendExecDML(t *testing.T) {
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
require.Nil(t, err)
mock.ExpectBegin()
mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`,`b`) VALUES (?,?),(?,?)").
mock.ExpectExec("INSERT INTO `s1`.`t1`(`a`,`b`) VALUES (?,?),(?,?)").
WithArgs(1, "test", 2, "test").
WillReturnResult(sqlmock.NewResult(2, 2))
mock.ExpectCommit()
Expand Down Expand Up @@ -772,7 +772,7 @@ func TestMySQLSinkExecDMLError(t *testing.T) {
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
require.Nil(t, err)
mock.ExpectBegin()
mock.ExpectExec("REPLACE INTO `s1`.`t1`(`a`,`b`) VALUES (?,?)").WillDelayFor(1 * time.Second).
mock.ExpectExec("INSERT INTO `s1`.`t1`(`a`,`b`) VALUES (?,?)").WillDelayFor(1 * time.Second).
WillReturnError(&dmysql.MySQLError{Number: mysql.ErrNoSuchTable})
mock.ExpectClose()
return db, nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func main() {
}
}()

<-consumer.ready // Await till the consumer has been set up
<-consumer.ready // wait till the consumer has been set up
log.Info("TiCDC consumer up and running!...")

sigterm := make(chan os.Signal, 1)
Expand Down
2 changes: 1 addition & 1 deletion pkg/applier/redo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func TestApplyDMLs(t *testing.T) {
close(ddlEventCh)

cfg := &RedoApplierConfig{
SinkURI: "mysql://127.0.0.1:4000/?worker-count=1&max-txn-row=1&tidb_placement_mode=ignore",
SinkURI: "mysql://127.0.0.1:4000/?worker-count=1&max-txn-row=1&tidb_placement_mode=ignore&safe-mode=true",
}
ap := NewRedoApplier(cfg)
err := ap.Apply(ctx)
Expand Down
7 changes: 5 additions & 2 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,11 @@ const (
// tableTxnAtomicity means atomicity of single table transactions is guaranteed.
tableTxnAtomicity AtomicityLevel = "table"

defaultMqTxnAtomicity = noneTxnAtomicity
defaultMysqlTxnAtomicity = tableTxnAtomicity
defaultMqTxnAtomicity = noneTxnAtomicity
// Note(dongmen): We change this default value to `noneTxnAtomicity` in v6.4.0.
// TODO(dongmen): If everything goes well, we can remove this default value in v6.5.0,
// and keep a defaultTxnAtomicity is enough.
defaultMysqlTxnAtomicity = noneTxnAtomicity
)

const (
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestValidateApplyParameter(t *testing.T) {
{
sinkURI: "mysql://normal:123456@127.0.0.1:3306",
expectedErr: "",
expectedLevel: tableTxnAtomicity,
expectedLevel: noneTxnAtomicity,
},
{
sinkURI: "mysql://normal:123456@127.0.0.1:3306?transaction-atomicity=table",
Expand Down
7 changes: 4 additions & 3 deletions pkg/sink/mysql/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ const (
defaultReadTimeout = "2m"
defaultWriteTimeout = "2m"
defaultDialTimeout = "2m"
defaultSafeMode = true
defaultTxnIsolationRC = "READ-COMMITTED"
defaultCharacterSet = "utf8mb4"
// Note(dongmen): defaultSafeMode is set to false since v6.4.0.
defaultSafeMode = false
defaultTxnIsolationRC = "READ-COMMITTED"
defaultCharacterSet = "utf8mb4"

// BackoffBaseDelay indicates the base delay time for retrying.
BackoffBaseDelay = 500 * time.Millisecond
Expand Down
4 changes: 2 additions & 2 deletions pkg/sink/mysql/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,12 @@ func TestApplySinkURIParamsToConfig(t *testing.T) {
expected.MaxTxnRow = 20
expected.BatchReplaceEnabled = true
expected.BatchReplaceSize = 50
expected.SafeMode = true
expected.SafeMode = false
expected.Timezone = `"UTC"`
expected.tidbTxnMode = "pessimistic"
expected.EnableOldValue = true
uriStr := "mysql://127.0.0.1:3306/?worker-count=64&max-txn-row=20" +
"&batch-replace-enable=true&batch-replace-size=50&safe-mode=true" +
"&batch-replace-enable=true&batch-replace-size=50&safe-mode=false" +
"&tidb-txn-mode=pessimistic"
uri, err := url.Parse(uriStr)
require.Nil(t, err)
Expand Down
4 changes: 2 additions & 2 deletions tests/integration_tests/_utils/run_kafka_consumer
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ cd $workdir
# some consumer may require `consumer_replica_config`, set it separately
if [ "$consumer_replica_config" != "" ]; then
echo "consumer replica config found: $consumer_replica_config"
cdc_kafka_consumer --log-file $workdir/cdc_kafka_consumer$log_suffix.log --log-level info --upstream-uri $sink_uri --downstream-uri mysql://root@127.0.0.1:3306/ --config $consumer_replica_config >>$workdir/cdc_kafka_consumer_stdout$log_suffix.log 2>&1 &
cdc_kafka_consumer --log-file $workdir/cdc_kafka_consumer$log_suffix.log --log-level info --upstream-uri $sink_uri --downstream-uri mysql://root@127.0.0.1:3306/?safe-mode=true --config $consumer_replica_config >>$workdir/cdc_kafka_consumer_stdout$log_suffix.log 2>&1 &
else
cdc_kafka_consumer --log-file $workdir/cdc_kafka_consumer$log_suffix.log --log-level info --upstream-uri $sink_uri --downstream-uri mysql://root@127.0.0.1:3306/ >>$workdir/cdc_kafka_consumer_stdout$log_suffix.log 2>&1 &
cdc_kafka_consumer --log-file $workdir/cdc_kafka_consumer$log_suffix.log --log-level info --upstream-uri $sink_uri --downstream-uri mysql://root@127.0.0.1:3306/?safe-mode=true >>$workdir/cdc_kafka_consumer_stdout$log_suffix.log 2>&1 &
fi

cd $pwd

0 comments on commit f16eb3c

Please sign in to comment.