Skip to content

Commit

Permalink
mydumper: generate tables needed to dump for mydumper automatically (p…
Browse files Browse the repository at this point in the history
…ingcap#310)

generate the tables needed to be dumped for mydumper automatically when tables needed to be dumped are not given.
  • Loading branch information
lichunzhu authored Oct 18, 2019
1 parent 26b90b3 commit e512948
Show file tree
Hide file tree
Showing 21 changed files with 155 additions and 27 deletions.
2 changes: 1 addition & 1 deletion dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
"github.com/pingcap/tidb-tools/pkg/column-mapping"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb-tools/pkg/table-router"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"go.uber.org/zap"
yaml "gopkg.in/yaml.v2"
)
Expand Down
4 changes: 3 additions & 1 deletion dm/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"io/ioutil"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -161,7 +162,8 @@ func (t *testServer) TestTaskAutoResume(c *C) {
// start task
cli := t.createClient(c, fmt.Sprintf("127.0.0.1:%d", port))
subtaskCfgBytes, err := ioutil.ReadFile("./subtask.toml")
_, err = cli.StartSubTask(context.Background(), &pb.StartSubTaskRequest{Task: string(subtaskCfgBytes)})
// strings.Replace is used here to uncomment extra-args to avoid mydumper connecting to DB and generating arg --tables-list which will cause failure
_, err = cli.StartSubTask(context.Background(), &pb.StartSubTaskRequest{Task: strings.Replace(string(subtaskCfgBytes), "#extra-args", "extra-args", 1)})
c.Assert(err, IsNil)

// check task in paused state
Expand Down
17 changes: 13 additions & 4 deletions mydumper/mydumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,14 @@ func NewMydumper(cfg *config.SubTaskConfig) *Mydumper {
cfg: cfg,
logger: log.With(zap.String("task", cfg.Name), zap.String("unit", "dump")),
}
m.args = m.constructArgs()
return m
}

// Init implements Unit.Init
func (m *Mydumper) Init() error {
return nil // always return nil
var err error
m.args, err = m.constructArgs()
return err
}

// Process implements Unit.Process
Expand Down Expand Up @@ -233,7 +234,7 @@ func (m *Mydumper) IsFreshTask() (bool, error) {
}

// constructArgs constructs arguments for exec.Command
func (m *Mydumper) constructArgs() []string {
func (m *Mydumper) constructArgs() ([]string, error) {
cfg := m.cfg
db := cfg.From

Expand Down Expand Up @@ -263,11 +264,19 @@ func (m *Mydumper) constructArgs() []string {
if len(extraArgs) > 0 {
ret = append(ret, ParseArgLikeBash(extraArgs)...)
}
if needToGenerateDoTables(extraArgs) {
m.logger.Info("Tables needed to dump are not given, now we will start to generate table list that mydumper needs to dump through black-white list from given fromDB")
doTables, err := fetchMyDumperDoTables(cfg)
if err != nil {
return nil, err
}
ret = append(ret, "--tables-list", doTables)
}

m.logger.Info("create mydumper", zap.Strings("argument", ret))

ret = append(ret, "--password", db.Password)
return ret
return ret, nil
}

// logArgs constructs arguments for log from SubTaskConfig
Expand Down
71 changes: 65 additions & 6 deletions mydumper/mydumper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@
package mydumper

import (
"database/sql"
"strings"
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/tidb-tools/pkg/filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/conn"
)

var _ = Suite(&testMydumperSuite{})
Expand All @@ -29,7 +33,9 @@ func TestSuite(t *testing.T) {
}

type testMydumperSuite struct {
cfg *config.SubTaskConfig
cfg *config.SubTaskConfig
origApplyNewBaseDB func(config config.DBConfig) (*conn.BaseDB, error)
origFetchTargetDoTables func(*sql.DB, *filter.Filter, *router.Table) (map[string][]*filter.Table, error)
}

func (m *testMydumperSuite) SetUpSuite(c *C) {
Expand All @@ -50,15 +56,68 @@ func (m *testMydumperSuite) SetUpSuite(c *C) {
Dir: "./dumped_data",
},
}

m.origApplyNewBaseDB = applyNewBaseDB
m.origFetchTargetDoTables = fetchTargetDoTables
applyNewBaseDB = func(config config.DBConfig) (*conn.BaseDB, error) {
return &conn.BaseDB{}, nil
}
fetchTargetDoTables = func(db *sql.DB, bw *filter.Filter, router *router.Table) (map[string][]*filter.Table, error) {
mapper := make(map[string][]*filter.Table)
mapper["mockDatabase"] = append(mapper["mockDatabase"], &filter.Table{
Schema: "mockDatabase",
Name: "mockTable1",
})
mapper["mockDatabase"] = append(mapper["mockDatabase"], &filter.Table{
Schema: "mockDatabase",
Name: "mockTable2",
})
return mapper, nil
}
}

func (m *testMydumperSuite) TestArgs(c *C) {
func (m *testMydumperSuite) TearDownSuite(c *C) {
applyNewBaseDB = m.origApplyNewBaseDB
fetchTargetDoTables = m.origFetchTargetDoTables
}

func generateArgsAndCompare(c *C, m *testMydumperSuite, expectedExtraArgs, extraArgs string) {
expected := strings.Fields("--host 127.0.0.1 --port 3306 --user root " +
"--outputdir ./dumped_data --threads 4 --chunk-filesize 64 --skip-tz-utc " +
"--regex ^(?!(mysql|information_schema|performance_schema)) " +
"--password 123")
m.cfg.MydumperConfig.ExtraArgs = "--regex '^(?!(mysql|information_schema|performance_schema))'"
expectedExtraArgs + " --password 123")
m.cfg.MydumperConfig.ExtraArgs = extraArgs

mydumper := NewMydumper(m.cfg)
args := mydumper.constructArgs()
args, err := mydumper.constructArgs()
c.Assert(err, IsNil)
c.Assert(args, DeepEquals, expected)
}

func testThroughGivenArgs(c *C, m *testMydumperSuite, arg, index string) {
quotedIndex := "'" + index + "'" // add quotes for constructArgs
generateArgsAndCompare(c, m, arg+" "+index, arg+" "+quotedIndex)
}

func (m *testMydumperSuite) TestShouldNotGenerateExtraArgs(c *C) {
// -x, --regex
index := "^(?!(mysql|information_schema|performance_schema))"
testThroughGivenArgs(c, m, "-x", index)
testThroughGivenArgs(c, m, "--regex", index)
// -T, --tables-list
index = "testDatabase.testTable"
testThroughGivenArgs(c, m, "-T", index)
testThroughGivenArgs(c, m, "--tables-list", index)
// -B, --database
index = "testDatabase"
testThroughGivenArgs(c, m, "-B", index)
testThroughGivenArgs(c, m, "--database", index)
}

func (m *testMydumperSuite) TestShouldGenerateExtraArgs(c *C) {
expectedMockResult := "--tables-list mockDatabase.mockTable1,mockDatabase.mockTable2"
// empty extraArgs
generateArgsAndCompare(c, m, expectedMockResult, "")
// extraArgs doesn't contains -T/-B/-x args
statement := "--statement-size=100"
generateArgsAndCompare(c, m, statement+" "+expectedMockResult, statement)
}
57 changes: 57 additions & 0 deletions mydumper/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,20 @@

package mydumper

import (
"strings"

"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/pkg/conn"
"github.com/pingcap/dm/pkg/utils"

"github.com/pingcap/tidb-tools/pkg/filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
)

var applyNewBaseDB = conn.DefaultDBProvider.Apply
var fetchTargetDoTables = utils.FetchTargetDoTables

// ParseArgLikeBash parses list arguments like bash, which helps us to run
// executable command via os/exec more likely running from bash
func ParseArgLikeBash(args []string) []string {
Expand All @@ -37,3 +51,46 @@ func trimOutQuotes(arg string) string {
}
return arg
}

// fetchMyDumperDoTables fetches and filters the tables that needed to be dumped through black-white list and route rules
func fetchMyDumperDoTables(cfg *config.SubTaskConfig) (string, error) {
fromDB, err := applyNewBaseDB(cfg.From)
if err != nil {
return "", err
}
defer fromDB.Close()
bw := filter.New(cfg.CaseSensitive, cfg.BWList)
r, err := router.NewTableRouter(cfg.CaseSensitive, cfg.RouteRules)
if err != nil {
return "", err
}
sourceTables, err := fetchTargetDoTables(fromDB.DB, bw, r)
if err != nil {
return "", err
}
var filteredTables []string
// TODO: For tables which contains special chars like ' , ` mydumper will fail while dumping. Once this bug is fixed on mydumper we should add quotes to table.Schema and table.Name
for _, tables := range sourceTables {
for _, table := range tables {
filteredTables = append(filteredTables, table.Schema+"."+table.Name)
}
}
return strings.Join(filteredTables, ","), nil
}

// needToGenerateDoTables will check whether customers specify the databases/tables that needed to be dumped
// If not, this function will return true to notify mydumper to generate args
func needToGenerateDoTables(args []string) bool {
for _, arg := range args {
if arg == "-B" || arg == "--database" {
return false
}
if arg == "-T" || arg == "--tables-list" {
return false
}
if arg == "-x" || arg == "--regex" {
return false
}
}
return true
}
2 changes: 1 addition & 1 deletion pkg/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
tmysql "github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb-tools/pkg/table-router"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"go.uber.org/zap"
)

Expand Down
2 changes: 1 addition & 1 deletion tests/all_mode/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ mydumpers:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: "-B all_mode"
extra-args: ""

loaders:
global:
Expand Down
2 changes: 1 addition & 1 deletion tests/compatibility/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ mydumpers:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: "-B compatibility"
extra-args: ""

loaders:
global:
Expand Down
2 changes: 1 addition & 1 deletion tests/dmctl_basic/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ mydumpers:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: "--regex '^dmctl.*'"
extra-args: ""

loaders:
global:
Expand Down
2 changes: 1 addition & 1 deletion tests/http_apis/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ mydumpers:
threads: 4
chunk-filesize: 0
skip-tz-utc: true
extra-args: "-B http_apis --statement-size=100"
extra-args: "--statement-size=100"

loaders:
global:
Expand Down
2 changes: 1 addition & 1 deletion tests/incremental_mode/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ mydumpers:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: "-B incremental_mode"
extra-args: ""

loaders:
global:
Expand Down
2 changes: 1 addition & 1 deletion tests/initial_unit/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ mydumpers:
threads: 4
chunk-filesize: 0
skip-tz-utc: true
extra-args: "-B initial_unit --statement-size=100"
extra-args: "--statement-size=100"

loaders:
global:
Expand Down
2 changes: 1 addition & 1 deletion tests/load_interrupt/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ mydumpers:
threads: 4
chunk-filesize: 0
skip-tz-utc: true
extra-args: "-B load_interrupt --statement-size=100"
extra-args: "--statement-size=100"

loaders:
global:
Expand Down
2 changes: 1 addition & 1 deletion tests/online_ddl/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ mydumpers:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: "-B online_ddl"
extra-args: ""

loaders:
global:
Expand Down
2 changes: 1 addition & 1 deletion tests/print_status/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ mydumpers:
threads: 4
chunk-filesize: 0
skip-tz-utc: true
extra-args: "-B print_status --statement-size=5000"
extra-args: "--statement-size=5000"

loaders:
global:
Expand Down
2 changes: 1 addition & 1 deletion tests/relay_interrupt/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ mydumpers:
threads: 4
chunk-filesize: 0
skip-tz-utc: true
extra-args: "-B relay_interrupt --statement-size=100"
extra-args: "--statement-size=100"

loaders:
global:
Expand Down
2 changes: 1 addition & 1 deletion tests/safe_mode/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ mydumpers:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: "-B safe_mode_test"
extra-args: ""

loaders:
global:
Expand Down
2 changes: 1 addition & 1 deletion tests/sequence_safe_mode/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ mydumpers:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: "-B sequence_safe_mode_test"
extra-args: ""

loaders:
global:
Expand Down
2 changes: 1 addition & 1 deletion tests/sequence_sharding/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ mydumpers:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: "--regex '^sharding_seq.*'"
extra-args: ""

loaders:
global:
Expand Down
2 changes: 1 addition & 1 deletion tests/sharding/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ mydumpers:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: "--regex '^sharding.*'"
extra-args: ""

loaders:
global:
Expand Down
1 change: 1 addition & 0 deletions tests/start_task/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mydumpers:
threads: 4
chunk-filesize: 0
skip-tz-utc: true
# start_task integration_test will add failpoint to function fetchTables will cause mydumper failing to start and won't rebuild dm-worker during restart so extra-args is given here
extra-args: "-B start_task --statement-size=100"

loaders:
Expand Down

0 comments on commit e512948

Please sign in to comment.