Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

performance: use OPTIMISTIC transaction as the default (#1079) #1107

Merged
merged 19 commits into from
Sep 27, 2020
Merged
35 changes: 31 additions & 4 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import (

"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"

"github.com/dustin/go-humanize"
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
"github.com/pingcap/tidb-tools/pkg/column-mapping"
"github.com/pingcap/tidb-tools/pkg/filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"

"github.com/coreos/go-semver/semver"
"github.com/dustin/go-humanize"
"go.uber.org/zap"
yaml "gopkg.in/yaml.v2"
)
Expand All @@ -41,8 +42,10 @@ const (

// shard DDL mode.
const (
ShardPessimistic = "pessimistic"
ShardOptimistic = "optimistic"
ShardPessimistic = "pessimistic"
ShardOptimistic = "optimistic"
tidbTxnMode = "tidb_txn_mode"
tidbTxnOptimistic = "optimistic"
)

// default config item values
Expand All @@ -66,6 +69,15 @@ var (
defaultBatch = 100
defaultQueueSize = 1024 // do not give too large default value to avoid OOM
defaultCheckpointFlushInterval = 30 // in seconds

// TargetDBConfig
defaultSessionCfg = []struct {
key string
val string
minVersion *semver.Version
}{
{tidbTxnMode, tidbTxnOptimistic, semver.New("3.0.0")},
}
)

// Meta represents binlog's meta pos
Expand Down Expand Up @@ -707,3 +719,18 @@ func checkDuplicateString(ruleNames []string) []string {
}
return dupeArray
}

// AdjustTargetDBSessionCfg adjust session cfg of TiDB
func AdjustTargetDBSessionCfg(dbConfig *DBConfig, version *semver.Version) {
lowerMap := make(map[string]string, len(dbConfig.Session))
for k, v := range dbConfig.Session {
lowerMap[strings.ToLower(k)] = v
}
// all cfg in defaultSessionCfg should be lower case
for _, cfg := range defaultSessionCfg {
if _, ok := lowerMap[cfg.key]; !ok && !version.LessThan(*cfg.minVersion) {
lowerMap[cfg.key] = cfg.val
}
}
dbConfig.Session = lowerMap
}
50 changes: 44 additions & 6 deletions dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ import (
"path"
"sort"

"github.com/pingcap/dm/pkg/terror"

. "github.com/pingcap/check"
"github.com/pingcap/dm/pkg/terror"
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
"github.com/pingcap/tidb-tools/pkg/filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"

"github.com/coreos/go-semver/semver"
)

func (t *testConfig) TestInvalidTaskConfig(c *C) {
Expand Down Expand Up @@ -324,7 +325,10 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
heartbeatRI = 21
timezone = "Asia/Shanghai"
maxAllowedPacket = 10244201
session = map[string]string{
fromSession = map[string]string{
"sql_mode": " NO_AUTO_VALUE_ON_ZERO,ANSI_QUOTES",
}
toSession = map[string]string{
"sql_mode": " NO_AUTO_VALUE_ON_ZERO,ANSI_QUOTES",
}
security = Security{
Expand Down Expand Up @@ -380,7 +384,7 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
User: "user_from_1",
Password: "123",
MaxAllowedPacket: &maxAllowedPacket,
Session: session,
Session: fromSession,
Security: &security,
RawDBCfg: &rawDBCfg,
}
Expand All @@ -390,7 +394,7 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
User: "user_from_2",
Password: "abc",
MaxAllowedPacket: &maxAllowedPacket,
Session: session,
Session: fromSession,
Security: &security,
RawDBCfg: &rawDBCfg,
}
Expand Down Expand Up @@ -421,7 +425,7 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
User: "user_to",
Password: "abc",
MaxAllowedPacket: &maxAllowedPacket,
Session: session,
Session: toSession,
Security: &security,
RawDBCfg: &rawDBCfg,
},
Expand Down Expand Up @@ -645,3 +649,37 @@ func (t *testConfig) TestMySQLInstance(c *C) {
c.Assert(m.VerifyAndAdjust(), IsNil)

}

func (t *testConfig) TestAdjustTargetDBConfig(c *C) {
testCases := []struct {
dbConfig DBConfig
result DBConfig
version *semver.Version
}{
{
DBConfig{},
DBConfig{Session: map[string]string{}},
semver.New("0.0.0"),
},
{
DBConfig{Session: map[string]string{"SQL_MODE": "ANSI_QUOTES"}},
DBConfig{Session: map[string]string{"sql_mode": "ANSI_QUOTES"}},
semver.New("2.0.7"),
},
{
DBConfig{},
DBConfig{Session: map[string]string{tidbTxnMode: tidbTxnOptimistic}},
semver.New("3.0.1"),
},
{
DBConfig{Session: map[string]string{"SQL_MODE": "", tidbTxnMode: "pessimistic"}},
DBConfig{Session: map[string]string{"sql_mode": "", tidbTxnMode: "pessimistic"}},
semver.New("4.0.0-beta.2"),
},
}

for _, tc := range testCases {
AdjustTargetDBSessionCfg(&tc.dbConfig, tc.version)
c.Assert(tc.dbConfig, DeepEquals, tc.result)
}
}
35 changes: 35 additions & 0 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1027,17 +1027,47 @@ func parseAndAdjustSourceConfig(contents []string) ([]*config.SourceConfig, erro
return cfgs, err
}
if err = cfg.Adjust(fromDB.DB); err != nil {
fromDB.Close()
return cfgs, err
}
if _, err = cfg.Yaml(); err != nil {
fromDB.Close()
return cfgs, err
}

fromDB.Close()
cfgs[i] = cfg
}
return cfgs, nil
}

func adjustTargetDB(ctx context.Context, dbConfig *config.DBConfig) error {
cfg := *dbConfig
if len(cfg.Password) > 0 {
cfg.Password = utils.DecryptOrPlaintext(cfg.Password)
}

toDB, err := conn.DefaultDBProvider.Apply(cfg)
if err != nil {
return err
}
defer toDB.Close()

value, err := dbutil.ShowVersion(ctx, toDB.DB)
if err != nil {
return err
}

version, err := utils.ExtractTiDBVersion(value)
// Do not adjust if not TiDB
if err == nil {
config.AdjustTargetDBSessionCfg(dbConfig, version)
} else {
log.L().Warn("get tidb version", log.ShortError(err))
}
return nil
}

// OperateSource will create or update an upstream source.
func (s *Server) OperateSource(ctx context.Context, req *pb.OperateSourceRequest) (*pb.OperateSourceResponse, error) {
var (
Expand Down Expand Up @@ -1194,6 +1224,11 @@ func (s *Server) generateSubTask(ctx context.Context, task string) (*config.Task
return nil, nil, terror.WithClass(err, terror.ClassDMMaster)
}

err = adjustTargetDB(ctx, cfg.TargetDB)
if err != nil {
return nil, nil, terror.WithClass(err, terror.ClassDMMaster)
}

sourceCfgs, err := s.getSourceConfigs(cfg.MySQLInstances)
if err != nil {
return nil, nil, err
Expand Down
87 changes: 78 additions & 9 deletions dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,12 @@ func (t *testMaster) TestCheckTask(c *check.C) {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
server.scheduler, _ = testMockScheduler(ctx, &wg, c, sources, workers, "", t.workerClients)
mock := t.initVersionDB(c)
defer func() {
conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{}
}()
mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("version", "5.7.25-TiDB-v4.0.2"))
resp, err := server.CheckTask(context.Background(), &pb.CheckTaskRequest{
Task: taskConfig,
})
Expand All @@ -373,6 +379,9 @@ func (t *testMaster) TestCheckTask(c *check.C) {
// simulate invalid password returned from scheduler, but config was supported plaintext mysql password, so cfg.SubTaskConfigs will success
ctx, cancel = context.WithCancel(context.Background())
server.scheduler, _ = testMockScheduler(ctx, &wg, c, sources, workers, "invalid-encrypt-password", t.workerClients)
mock = t.initVersionDB(c)
mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("version", "5.7.25-TiDB-v4.0.2"))
resp, err = server.CheckTask(context.Background(), &pb.CheckTaskRequest{
Task: taskConfig,
})
Expand Down Expand Up @@ -406,6 +415,12 @@ func (t *testMaster) TestStartTask(c *check.C) {
}
server.scheduler, _ = testMockScheduler(ctx, &wg, c, sources, workers, "",
makeWorkerClientsForHandle(ctrl, taskName, sources, workers, req))
mock := t.initVersionDB(c)
defer func() {
conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{}
}()
mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("version", "5.7.25-TiDB-v4.0.2"))
resp, err = server.StartTask(context.Background(), req)
c.Assert(err, check.IsNil)
c.Assert(resp.Result, check.IsTrue)
Expand All @@ -420,6 +435,9 @@ func (t *testMaster) TestStartTask(c *check.C) {

// check start-task with an invalid source
invalidSource := "invalid-source"
mock = t.initVersionDB(c)
mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("version", "5.7.25-TiDB-v4.0.2"))
resp, err = server.StartTask(context.Background(), &pb.StartTaskRequest{
Task: taskConfig,
Sources: []string{invalidSource},
Expand All @@ -438,6 +456,9 @@ func (t *testMaster) TestStartTask(c *check.C) {
defer func() {
checker.CheckSyncConfigFunc = bakCheckSyncConfigFunc
}()
mock = t.initVersionDB(c)
mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("version", "5.7.25-TiDB-v4.0.2"))
resp, err = server.StartTask(context.Background(), &pb.StartTaskRequest{
Task: taskConfig,
Sources: sources,
Expand All @@ -448,13 +469,41 @@ func (t *testMaster) TestStartTask(c *check.C) {
clearSchedulerEnv(c, cancel, &wg)
}

// db use for remove data
// verDB user for show version
type mockDBProvider struct {
db *sql.DB
verDB *sql.DB
db *sql.DB
}

// Apply will build BaseDB with DBConfig
// return db if verDB was closed
func (d *mockDBProvider) Apply(config config.DBConfig) (*conn.BaseDB, error) {
return conn.NewBaseDB(d.db, func() {}), nil
if err := d.verDB.Ping(); err != nil {
return conn.NewBaseDB(d.db, func() {}), nil
}
return conn.NewBaseDB(d.verDB, func() {}), nil
}

func (t *testMaster) initVersionDB(c *check.C) sqlmock.Sqlmock {
db, mock, err := sqlmock.New()
c.Assert(err, check.IsNil)
if mdbp, ok := conn.DefaultDBProvider.(*mockDBProvider); ok {
mdbp.verDB = db
} else {
conn.DefaultDBProvider = &mockDBProvider{verDB: db}
}
return mock
}

func (t *testMaster) initMockDB(c *check.C) sqlmock.Sqlmock {
db, mock, err := sqlmock.New()
c.Assert(err, check.IsNil)
if mdbp, ok := conn.DefaultDBProvider.(*mockDBProvider); ok {
mdbp.db = db
} else {
conn.DefaultDBProvider = &mockDBProvider{db: db}
}
return mock
}

func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
Expand Down Expand Up @@ -503,12 +552,13 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
c.Assert(server.pessimist.Start(ctx, etcdTestCli), check.IsNil)
c.Assert(server.optimist.Start(ctx, etcdTestCli), check.IsNil)

db, mock, err := sqlmock.New()
c.Assert(err, check.IsNil)
conn.DefaultDBProvider = &mockDBProvider{db: db}
verMock := t.initVersionDB(c)
defer func() {
conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{}
}()
verMock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("version", "5.7.25-TiDB-v4.0.2"))
mock := t.initMockDB(c)
mock.ExpectBegin()
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.LoaderCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
Expand All @@ -523,6 +573,9 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
defer wg.Done()
time.Sleep(10 * time.Microsecond)
// start another same task at the same time, should get err
verMock2 := t.initVersionDB(c)
verMock2.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("version", "5.7.25-TiDB-v4.0.2"))
resp1, err1 := server.StartTask(context.Background(), req)
c.Assert(err1, check.IsNil)
c.Assert(resp1.Result, check.IsFalse)
Expand Down Expand Up @@ -593,9 +646,10 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
err = server.optimist.Start(ctx, etcdTestCli)
c.Assert(err, check.IsNil)

db, mock, err = sqlmock.New()
c.Assert(err, check.IsNil)
conn.DefaultDBProvider = &mockDBProvider{db: db}
verMock = t.initVersionDB(c)
verMock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("version", "5.7.25-TiDB-v4.0.2"))
mock = t.initMockDB(c)
mock.ExpectBegin()
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.LoaderCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
Expand All @@ -610,6 +664,9 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
defer wg.Done()
time.Sleep(10 * time.Microsecond)
// start another same task at the same time, should get err
vermock2 := t.initVersionDB(c)
vermock2.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("version", "5.7.25-TiDB-v4.0.2"))
resp1, err1 := server.StartTask(context.Background(), req)
c.Assert(err1, check.IsNil)
c.Assert(resp1.Result, check.IsFalse)
Expand Down Expand Up @@ -692,6 +749,12 @@ func (t *testMaster) TestOperateTask(c *check.C) {
sourceResps := []*pb.CommonWorkerResponse{{Result: true, Source: sources[0]}, {Result: true, Source: sources[1]}}
server.scheduler, _ = testMockScheduler(ctx, &wg, c, sources, workers, "",
makeWorkerClientsForHandle(ctrl, taskName, sources, workers, startReq, pauseReq, resumeReq, stopReq1, stopReq2))
mock := t.initVersionDB(c)
defer func() {
conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{}
}()
mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("version", "5.7.25-TiDB-v4.0.2"))
stResp, err := server.StartTask(context.Background(), startReq)
c.Assert(err, check.IsNil)
c.Assert(stResp.Result, check.IsTrue)
Expand Down Expand Up @@ -1328,6 +1391,12 @@ func (t *testMaster) TestGetTaskCfg(c *check.C) {
makeWorkerClientsForHandle(ctrl, taskName, sources, workers, req))

// start task
mock := t.initVersionDB(c)
defer func() {
conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{}
}()
mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}).
AddRow("version", "5.7.25-TiDB-v4.0.2"))
resp, err := server.StartTask(context.Background(), req)
c.Assert(err, check.IsNil)
c.Assert(resp.Result, check.IsTrue)
Expand Down
Loading