Skip to content

Commit

Permalink
dm-syncer: add config for task name (pingcap#482)
Browse files Browse the repository at this point in the history
  • Loading branch information
WangXiangUSTC authored Feb 27, 2020
1 parent 115cecc commit 31bc6d0
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 20 deletions.
36 changes: 21 additions & 15 deletions cmd/dm-syncer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
type commonConfig struct {
*flag.FlagSet `json:"-"`

// task name
Name string
printVersion bool
ConfigFile string
ServerID int
Expand All @@ -54,11 +56,11 @@ type commonConfig struct {
EnableANSIQuotes bool
TimezoneStr string

OldConfigFormat bool
SyncerConfigFormat bool
}

func (c *commonConfig) newConfigFromOldConfig(args []string) (*config.SubTaskConfig, error) {
cfg := &oldConfig{
func (c *commonConfig) newConfigFromSyncerConfig(args []string) (*config.SubTaskConfig, error) {
cfg := &syncerConfig{
printVersion: c.printVersion,
ConfigFile: c.ConfigFile,
ServerID: c.ServerID,
Expand All @@ -80,9 +82,10 @@ func (c *commonConfig) newConfigFromOldConfig(args []string) (*config.SubTaskCon
cfg.FlagSet = flag.NewFlagSet("dm-syncer", flag.ContinueOnError)
fs := cfg.FlagSet

var OldConfigFormat bool
var SyncerConfigFormat bool

fs.BoolVar(&cfg.printVersion, "V", false, "prints version and exit")
fs.StringVar(&cfg.Name, "name", "", "the task name")
fs.StringVar(&cfg.ConfigFile, "config", "", "path to config file")
fs.IntVar(&cfg.ServerID, "server-id", 101, "MySQL slave server ID")
fs.StringVar(&cfg.Flavor, "flavor", mysql.MySQLFlavor, "use flavor for different MySQL source versions; support \"mysql\", \"mariadb\" now; if you replicate from mariadb, please set it to \"mariadb\"")
Expand All @@ -99,7 +102,7 @@ func (c *commonConfig) newConfigFromOldConfig(args []string) (*config.SubTaskCon
fs.IntVar(&cfg.MaxRetry, "max-retry", 100, "maxinum retry when network interruption")
fs.BoolVar(&cfg.EnableANSIQuotes, "enable-ansi-quotes", false, "enable ANSI_QUOTES sql_mode")
fs.StringVar(&cfg.TimezoneStr, "timezone", "", "target database timezone location string")
fs.BoolVar(&OldConfigFormat, "old-config-format", false, "read old config format")
fs.BoolVar(&SyncerConfigFormat, "syncer-config-format", false, "read syncer config format")

if err := fs.Parse(args); err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -129,8 +132,8 @@ func (c *commonConfig) parse(args []string) (*config.SubTaskConfig, error) {
return nil, flag.ErrHelp
}

if c.OldConfigFormat {
return c.newConfigFromOldConfig(args)
if c.SyncerConfigFormat {
return c.newConfigFromSyncerConfig(args)
}

return c.newSubTaskConfig(args)
Expand All @@ -142,11 +145,12 @@ func (c *commonConfig) newSubTaskConfig(args []string) (*config.SubTaskConfig, e
cfg.SetFlagSet(flag.NewFlagSet("dm-syncer", flag.ContinueOnError))
fs := cfg.GetFlagSet()

var oldConfigFormat bool
var syncerConfigFormat bool
var printVersion bool
var serverID uint

fs.BoolVar(&printVersion, "V", false, "prints version and exit")
fs.StringVar(&cfg.Name, "name", "", "the task name")
fs.StringVar(&cfg.ConfigFile, "config", "", "path to config file")
fs.UintVar(&serverID, "server-id", 101, "MySQL slave server ID")
fs.StringVar(&cfg.Flavor, "flavor", mysql.MySQLFlavor, "use flavor for different MySQL source versions; support \"mysql\", \"mariadb\" now; if you replicate from mariadb, please set it to \"mariadb\"")
Expand All @@ -163,7 +167,7 @@ func (c *commonConfig) newSubTaskConfig(args []string) (*config.SubTaskConfig, e
fs.BoolVar(&cfg.EnableANSIQuotes, "enable-ansi-quotes", false, "enable ANSI_QUOTES sql_mode")
fs.StringVar(&cfg.Timezone, "timezone", "", "target database timezone location string")
fs.StringVar(&cfg.Name, "cp-table-prefix", "dm-syncer", "the prefix of the checkpoint table name")
fs.BoolVar(&oldConfigFormat, "old-config-format", false, "read old config format")
fs.BoolVar(&syncerConfigFormat, "syncer-config-format", false, "read syncer config format")

cfg.ServerID = uint32(serverID)

Expand All @@ -185,6 +189,7 @@ func newCommonConfig() *commonConfig {
fs := cfg.FlagSet

fs.BoolVar(&cfg.printVersion, "V", false, "prints version and exit")
fs.StringVar(&cfg.Name, "name", "", "the task name")
fs.StringVar(&cfg.ConfigFile, "config", "", "path to config file")
fs.IntVar(&cfg.ServerID, "server-id", 101, "MySQL slave server ID")
fs.StringVar(&cfg.Flavor, "flavor", mysql.MySQLFlavor, "use flavor for different MySQL source versions; support \"mysql\", \"mariadb\" now; if you replicate from mariadb, please set it to \"mariadb\"")
Expand All @@ -201,15 +206,16 @@ func newCommonConfig() *commonConfig {
fs.IntVar(&cfg.MaxRetry, "max-retry", 100, "maxinum retry when network interruption")
fs.BoolVar(&cfg.EnableANSIQuotes, "enable-ansi-quotes", false, "enable ANSI_QUOTES sql_mode")
fs.StringVar(&cfg.TimezoneStr, "timezone", "", "target database timezone location string")
fs.BoolVar(&cfg.OldConfigFormat, "old-config-format", false, "read old config format")
fs.BoolVar(&cfg.SyncerConfigFormat, "syncer-config-format", false, "read syncer config format")

return cfg
}

// oldConfig is the old format of syncer tools, eventually it will be converted to new SubTaskConfig format.
type oldConfig struct {
// syncerConfig is the format of syncer tools, eventually it will be converted to new SubTaskConfig format.
type syncerConfig struct {
*flag.FlagSet `json:"-"`

Name string `toml:"name" json:"name"`
LogLevel string `toml:"log-level" json:"log-level"`
LogFile string `toml:"log-file" json:"log-file"`
LogRotate string `toml:"log-rotate" json:"log-rotate"`
Expand Down Expand Up @@ -302,14 +308,14 @@ func loadMetaFile(metaFile string) (*config.Meta, error) {
return meta, nil
}

func (oc *oldConfig) convertToNewFormat() (*config.SubTaskConfig, error) {
func (oc *syncerConfig) convertToNewFormat() (*config.SubTaskConfig, error) {
meta, err := loadMetaFile(oc.Meta)
if err != nil {
return nil, errors.Trace(err)
}
newTask := &config.SubTaskConfig{
Name: "dm-syncer",
SourceID: "dm-syncer-from-old-config",
Name: oc.Name,
SourceID: fmt.Sprintf("%s_source", oc.Name),
Mode: config.ModeIncrement,
Meta: meta,

Expand Down
6 changes: 4 additions & 2 deletions tests/_utils/run_dm_syncer
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ mkdir -p $workdir/log

format=""
meta=""
name=""

if [ "$#" -ge 4 ]; then
if [ "$#" -ge 5 ]; then
meta="--meta=$3"
format=$4
name="--name=$5"
fi

cd $workdir
echo "$binary --log-file="$workdir/log/dm-syncer.log" --config="$config" $meta $format >> $workdir/log/stdout.log 2>&1 &"
$binary -test.coverprofile="$TEST_DIR/cov.$TEST_NAME.syncer.out" DEVEL \
-L=Debug --log-file="$workdir/log/dm-syncer.log" --config="$config" $meta $format >> $workdir/log/stdout.log 2>&1 &
-L=Debug --log-file="$workdir/log/dm-syncer.log" --config="$config" $name $meta $format >> $workdir/log/stdout.log 2>&1 &
cd $PWD

2 changes: 2 additions & 0 deletions tests/_utils/test_prepare
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ function cleanup_process() {
pkill -hup dm-worker.test 2>/dev/null || true
pkill -hup dm-master.test 2>/dev/null || true
pkill -hup dm-tracer.test 2>/dev/null || true
pkill -hup dm-syncer.test 2>/dev/null || true

wait_process_exit dm-master.test
wait_process_exit dm-worker.test
wait_process_exit dm-tracer.test
wait_process_exit dm-syncer.test
}

if [ "$RESET_MASTER" = true ]; then
Expand Down
4 changes: 2 additions & 2 deletions tests/dm_syncer/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ function run() {
cat $cur/conf/dm-syncer-2.toml > $WORK_DIR/dm-syncer-2.toml
cat $cur/conf/old_meta_file > $WORK_DIR/old_meta_file
# $worker1_run_source_1 > 0 means source1 is operated to worker1
worker1_run_source_1=$(sed "s/$SOURCE_ID1/$SOURCE_ID1\n/g" $WORK_DIR/worker1/log/dm-worker.log | grep -c "$SOURCE_ID1")
worker1_run_source_1=$(sed "s/$SOURCE_ID1/$SOURCE_ID1\n/g" $WORK_DIR/worker1/log/dm-worker.log | grep -c "$SOURCE_ID1") || true
if [ $worker1_run_source_1 -gt 0 ]
then
name1=$(grep "Log: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata|awk -F: '{print $2}'|tr -d ' ')
Expand All @@ -70,7 +70,7 @@ function run() {
sleep 2
run_dm_syncer $WORK_DIR/syncer1 $WORK_DIR/dm-syncer-1.toml
meta_file=$WORK_DIR/old_meta_file
run_dm_syncer $WORK_DIR/syncer2 $WORK_DIR/dm-syncer-2.toml $meta_file --old-config-format
run_dm_syncer $WORK_DIR/syncer2 $WORK_DIR/dm-syncer-2.toml $meta_file --syncer-config-format syncer2

check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
}
Expand Down
2 changes: 1 addition & 1 deletion tests/incremental_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ function run() {
dmctl_stop_task $TASK_NAME

# $worker1_run_source_1 > 0 means source1 is operated to worker1
worker1_run_source_1=$(sed "s/$SOURCE_ID1/$SOURCE_ID1\n/g" $WORK_DIR/worker1/log/dm-worker.log | grep -c "$SOURCE_ID1")
worker1_run_source_1=$(sed "s/$SOURCE_ID1/$SOURCE_ID1\n/g" $WORK_DIR/worker1/log/dm-worker.log | grep -c "$SOURCE_ID1") || true
if [ $worker1_run_source_1 -gt 0 ]
then
name1=$(grep "Log: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata|awk -F: '{print $2}'|tr -d ' ')
Expand Down

0 comments on commit 31bc6d0

Please sign in to comment.