From 4bc970c79b8b74ddaccf1469c8ea2dd383cb841e Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Wed, 11 Nov 2020 16:39:39 +0800 Subject: [PATCH 01/27] WIP --- tests/move_table/conf/diff_config.toml | 27 +++ tests/move_table/conf/workload | 13 ++ tests/move_table/main.go | 225 +++++++++++++++++++++++++ tests/move_table/run.sh | 71 ++++++++ 4 files changed, 336 insertions(+) create mode 100644 tests/move_table/conf/diff_config.toml create mode 100644 tests/move_table/conf/workload create mode 100644 tests/move_table/main.go create mode 100644 tests/move_table/run.sh diff --git a/tests/move_table/conf/diff_config.toml b/tests/move_table/conf/diff_config.toml new file mode 100644 index 00000000000..9c31c91b2fd --- /dev/null +++ b/tests/move_table/conf/diff_config.toml @@ -0,0 +1,27 @@ +# diff Configuration. + +log-level = "info" +chunk-size = 10 +check-thread-count = 4 +sample-percent = 100 +use-rowid = false +use-checksum = true +fix-sql-file = "fix.sql" + +# tables need to check. +[[check-tables]] + schema = "move_table" + tables = ["~usertable.*"] + +[[source-db]] + host = "127.0.0.1" + port = 4000 + user = "root" + password = "" + instance-id = "source-1" + +[target-db] + host = "127.0.0.1" + port = 3306 + user = "root" + password = "" diff --git a/tests/move_table/conf/workload b/tests/move_table/conf/workload new file mode 100644 index 00000000000..6322c4a4405 --- /dev/null +++ b/tests/move_table/conf/workload @@ -0,0 +1,13 @@ +threadcount=10 +recordcount=150000 +operationcount=0 +workload=core + +readallfields=true + +readproportion=0 +updateproportion=0 +scanproportion=0 +insertproportion=0 + +requestdistribution=uniform diff --git a/tests/move_table/main.go b/tests/move_table/main.go new file mode 100644 index 00000000000..e64448b4871 --- /dev/null +++ b/tests/move_table/main.go @@ -0,0 +1,225 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// This is a program that drives the CDC cluster to move a table +package main + +import ( + "bytes" + "context" + "flag" + "fmt" + "net/http" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/kv" + "github.com/pingcap/ticdc/pkg/retry" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/pkg/logutil" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" +) + +var pd = flag.String("pd", "http://127.0.0.1:2379", "PD address and port") + +func main() { + flag.Parse() + log.Info("table mover started") + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + + cluster, err := newCluster(ctx, *pd) + if err != nil { + log.Fatal("failed to create cluster info", zap.Error(err)) + } + + ticker := time.NewTicker(15 * time.Second) + for { + select { + case <-ctx.Done(): + log.Info("Exiting", zap.Error(ctx.Err())) + return + case <-ticker.C: + err := retry.Run(100*time.Millisecond, 20, func() error { + return cluster.refreshInfo(ctx) + }) + + if err != nil { + log.Warn("error refreshing cluster info", zap.Error(err)) + } + + if len(cluster.captures) <= 1 { + log.Warn("no enough captures", zap.Reflect("captures", cluster.captures)) + continue + } + + var ( + tableID int64 + sourceCapture string + targetCapture string + changefeed string + ) + for capture, tables := range cluster.captures { + if len(tables) == 0 { + continue + } + + tableID = tables[0].ID + sourceCapture = capture + changefeed = tables[0].changefeed + } + + if tableID == 0 { + log.Warn("no table", zap.Reflect("captures", cluster.captures)) + continue + } + + for capture := range cluster.captures { + if capture != sourceCapture { + targetCapture = sourceCapture + } + } + + if targetCapture == "" { + log.Fatal("no target, unexpected") + } + + err = moveTable(ctx, cluster.ownerAddr, changefeed, targetCapture, tableID) + if err != nil { + log.Warn("failed to move table", zap.Error(err)) + } + + log.Info("moved table successful", zap.Int64("tableID", tableID)) + } + } + +} + +type tableInfo struct { + ID int64 + changefeed string +} + +type cluster struct { + ownerAddr string + captures map[string][]*tableInfo + cdcEtcdCli kv.CDCEtcdClient +} + +func newCluster(ctx context.Context, pd string) (*cluster, error) { + logConfig := logutil.DefaultZapLoggerConfig + logConfig.Level = zap.NewAtomicLevelAt(zapcore.ErrorLevel) + + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{pd}, + TLS: nil, + Context: ctx, + LogConfig: &logConfig, + DialTimeout: 5 * time.Second, + DialOptions: []grpc.DialOption{ + grpc.WithInsecure(), + grpc.WithBlock(), + grpc.WithConnectParams(grpc.ConnectParams{ + Backoff: backoff.Config{ + BaseDelay: time.Second, + Multiplier: 1.1, + Jitter: 0.1, + MaxDelay: 3 * time.Second, + }, + MinConnectTimeout: 3 * time.Second, + }), + }, + }) + + if err != nil { + return nil, errors.Trace(err) + } + + ret := &cluster{ + ownerAddr: "", + captures: nil, + cdcEtcdCli: kv.NewCDCEtcdClient(ctx, etcdCli), + } + + return ret, nil +} + +func (c *cluster) refreshInfo(ctx context.Context) error { + ownerID, err := c.cdcEtcdCli.GetOwnerID(ctx, kv.CaptureOwnerKey) + if err != nil { + return errors.Trace(err) + } + + captureInfo, err := c.cdcEtcdCli.GetCaptureInfo(ctx, ownerID) + if err != nil { + return errors.Trace(err) + } + + c.ownerAddr = captureInfo.AdvertiseAddr + + _, changefeeds, err := c.cdcEtcdCli.GetChangeFeeds(ctx) + if err != nil { + return errors.Trace(err) + } + if len(changefeeds) == 0 { + return errors.New("No changefeed") + } + + var changefeed string + for k := range changefeeds { + changefeed = k + break + } + + allTasks, err := c.cdcEtcdCli.GetAllTaskStatus(ctx, changefeed) + if err != nil { + return errors.Trace(err) + } + + c.captures = make(map[string][]*tableInfo) + for capture, taskInfo := range allTasks { + c.captures[capture] = make([]*tableInfo, len(taskInfo.Tables)) + for tableID := range taskInfo.Tables { + c.captures[capture] = append(c.captures[capture], &tableInfo{ + ID: tableID, + changefeed: changefeed, + }) + } + } + + return nil +} + +func moveTable(ctx context.Context, ownerAddr string, changefeed string, target string, tableID int64) error { + formStr := fmt.Sprintf("cf-id=%s&target-cp-id=%s&table-id=%d", changefeed, target, tableID) + rd := bytes.NewReader([]byte(formStr)) + req, err := http.NewRequestWithContext(ctx, "POST", ownerAddr+"/capture/owner/move_table", rd) + if err != nil { + return errors.Trace(err) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return errors.Trace(err) + } + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return errors.New(resp.Status) + } + + return nil +} diff --git a/tests/move_table/run.sh b/tests/move_table/run.sh new file mode 100644 index 00000000000..78f242f1ad0 --- /dev/null +++ b/tests/move_table/run.sh @@ -0,0 +1,71 @@ +#!/bin/bash + +set -e + +CUR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + + +function run() { + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + start_ts=$(run_cdc_cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) + run_sql "CREATE DATABASE move_table;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=move_table + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "debug" --logsuffix "cdc1" --addr 127.0.0.1:8300 + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "debug" --logsuffix "cdc2" --addr 127.0.0.1:8301 + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "debug" --logsuffix "cdc3" --addr 127.0.0.1:8302 + + TOPIC_NAME="ticdc-move-table-test-$RANDOM" + case $SINK_TYPE in + kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4";; + *) SINK_URI="mysql://root@127.0.0.1:3306/?max-txn-row=1";; + esac + + run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4" + fi + + sleep 5 + GO111MODULE=on go run main.go -pd http://$UP_PD_HOST_1:$UP_PD_PORT_1 2>&1 | tee $WORK_DIR/tester.log & + + # Add a check table to reduce check time, or if we check data with sync diff + # directly, there maybe a lot of diff data at first because of the incremental scan + run_sql "CREATE table move_table.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "move_table.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists "move_table.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + run_sql "truncate table move_table.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + run_sql "CREATE table move_table.check2(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "move_table.check2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=move_table + run_sql "CREATE table move_table.check3(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "move_table.check3" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + run_sql "create table move_table.USERTABLE2 like move_table.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "insert into move_table.USERTABLE2 select * from move_table.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "create table move_table.check4(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "move_table.USERTABLE2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists "move_table.check4" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + + cleanup_process $CDC_BINARY +} + +trap stop_tidb_cluster EXIT +run $* +echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" From 75af669f0b9250fae5940b1f4d01a9b4bd5fff4a Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Wed, 11 Nov 2020 16:57:09 +0800 Subject: [PATCH 02/27] fix integration test --- tests/move_table/run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/move_table/run.sh b/tests/move_table/run.sh index 78f242f1ad0..9173ca98858 100644 --- a/tests/move_table/run.sh +++ b/tests/move_table/run.sh @@ -35,7 +35,7 @@ function run() { fi sleep 5 - GO111MODULE=on go run main.go -pd http://$UP_PD_HOST_1:$UP_PD_PORT_1 2>&1 | tee $WORK_DIR/tester.log & + GO111MODULE=on go run $CUR/main.go -pd http://$UP_PD_HOST_1:$UP_PD_PORT_1 2>&1 | tee $WORK_DIR/tester.log & # Add a check table to reduce check time, or if we check data with sync diff # directly, there maybe a lot of diff data at first because of the incremental scan From 54d58c99f138b5b94301c01036a48b8ae6853f02 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Wed, 11 Nov 2020 17:12:17 +0800 Subject: [PATCH 03/27] fix integration test --- tests/move_table/run.sh | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/move_table/run.sh b/tests/move_table/run.sh index 9173ca98858..b124a7d3361 100644 --- a/tests/move_table/run.sh +++ b/tests/move_table/run.sh @@ -35,7 +35,9 @@ function run() { fi sleep 5 - GO111MODULE=on go run $CUR/main.go -pd http://$UP_PD_HOST_1:$UP_PD_PORT_1 2>&1 | tee $WORK_DIR/tester.log & + cd $CUR + GO111MODULE=on go run main.go -pd http://$UP_PD_HOST_1:$UP_PD_PORT_1 2>&1 | tee $WORK_DIR/tester.log & + cd $WORK_DIR # Add a check table to reduce check time, or if we check data with sync diff # directly, there maybe a lot of diff data at first because of the incremental scan From 1dabb0b273f9ebfc0c2ca8fa68a6fc500b34d722 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Thu, 12 Nov 2020 14:48:36 +0800 Subject: [PATCH 04/27] fix integration test --- tests/move_table/main.go | 100 ++++++++++++++++++++++++++++----------- tests/move_table/run.sh | 18 +++---- 2 files changed, 83 insertions(+), 35 deletions(-) diff --git a/tests/move_table/main.go b/tests/move_table/main.go index e64448b4871..cf0fc6aa771 100644 --- a/tests/move_table/main.go +++ b/tests/move_table/main.go @@ -19,7 +19,9 @@ import ( "context" "flag" "fmt" + "io/ioutil" "net/http" + "strings" "time" "github.com/pingcap/errors" @@ -35,9 +37,14 @@ import ( ) var pd = flag.String("pd", "http://127.0.0.1:2379", "PD address and port") +var logLevel = flag.String("log-level", "debug", "Set log level of the logger") func main() { flag.Parse() + if strings.ToLower(*logLevel) == "debug" { + log.SetLevel(zapcore.DebugLevel) + } + log.Info("table mover started") ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) defer cancel() @@ -47,7 +54,7 @@ func main() { log.Fatal("failed to create cluster info", zap.Error(err)) } - ticker := time.NewTicker(15 * time.Second) + ticker := time.NewTicker(1 * time.Second) for { select { case <-ctx.Done(): @@ -62,35 +69,28 @@ func main() { log.Warn("error refreshing cluster info", zap.Error(err)) } + log.Info("task status", zap.Reflect("status", cluster.captures)) + if len(cluster.captures) <= 1 { log.Warn("no enough captures", zap.Reflect("captures", cluster.captures)) continue } - var ( - tableID int64 - sourceCapture string - targetCapture string - changefeed string - ) + var sourceCapture string + for capture, tables := range cluster.captures { if len(tables) == 0 { continue } - - tableID = tables[0].ID sourceCapture = capture - changefeed = tables[0].changefeed + break } - if tableID == 0 { - log.Warn("no table", zap.Reflect("captures", cluster.captures)) - continue - } + var targetCapture string - for capture := range cluster.captures { - if capture != sourceCapture { - targetCapture = sourceCapture + for candidateCapture := range cluster.captures { + if candidateCapture != sourceCapture { + targetCapture = candidateCapture } } @@ -98,20 +98,50 @@ func main() { log.Fatal("no target, unexpected") } - err = moveTable(ctx, cluster.ownerAddr, changefeed, targetCapture, tableID) - if err != nil { - log.Warn("failed to move table", zap.Error(err)) + // move all tables to another capture + for _, table := range cluster.captures[sourceCapture] { + err = moveTable(ctx, cluster.ownerAddr, table.Changefeed, targetCapture, table.ID) + if err != nil { + log.Warn("failed to move table", zap.Error(err)) + continue + } + + log.Info("moved table successful", zap.Int64("tableID", table.ID)) } - log.Info("moved table successful", zap.Int64("tableID", tableID)) + log.Info("all tables are moved", zap.String("sourceCapture", sourceCapture), zap.String("targetCapture", targetCapture)) + + for counter := 0; counter < 30; counter++ { + err := retry.Run(100*time.Millisecond, 5, func() error { + return cluster.refreshInfo(ctx) + }) + + if err != nil { + log.Warn("error refreshing cluster info", zap.Error(err)) + } + + tables, ok := cluster.captures[sourceCapture] + if !ok { + log.Warn("source capture is gone", zap.String("sourceCapture", sourceCapture)) + break + } + + if len(tables) == 0 { + log.Info("source capture is now empty", zap.String("sourceCapture", sourceCapture)) + break + } + + if counter != 30 { + log.Debug("source capture is not empty, will try again", zap.String("sourceCapture", sourceCapture)) + time.Sleep(time.Second * 10) + } + } } } - } - type tableInfo struct { ID int64 - changefeed string + Changefeed string } type cluster struct { @@ -155,6 +185,8 @@ func newCluster(ctx context.Context, pd string) (*cluster, error) { cdcEtcdCli: kv.NewCDCEtcdClient(ctx, etcdCli), } + log.Info("new cluster initialized") + return ret, nil } @@ -164,11 +196,14 @@ func (c *cluster) refreshInfo(ctx context.Context) error { return errors.Trace(err) } + log.Debug("retrieved owner ID", zap.String("ownerID", ownerID)) + captureInfo, err := c.cdcEtcdCli.GetCaptureInfo(ctx, ownerID) if err != nil { return errors.Trace(err) } + log.Debug("retrieved owner addr", zap.String("ownerAddr", captureInfo.AdvertiseAddr)) c.ownerAddr = captureInfo.AdvertiseAddr _, changefeeds, err := c.cdcEtcdCli.GetChangeFeeds(ctx) @@ -179,6 +214,8 @@ func (c *cluster) refreshInfo(ctx context.Context) error { return errors.New("No changefeed") } + log.Debug("retrieved changefeeds", zap.Reflect("changefeeds", changefeeds)) + var changefeed string for k := range changefeeds { changefeed = k @@ -190,13 +227,15 @@ func (c *cluster) refreshInfo(ctx context.Context) error { return errors.Trace(err) } + log.Debug("retrieved all tasks", zap.Reflect("tasks", allTasks)) + c.captures = make(map[string][]*tableInfo) for capture, taskInfo := range allTasks { - c.captures[capture] = make([]*tableInfo, len(taskInfo.Tables)) + c.captures[capture] = make([]*tableInfo, 0, len(taskInfo.Tables)) for tableID := range taskInfo.Tables { c.captures[capture] = append(c.captures[capture], &tableInfo{ ID: tableID, - changefeed: changefeed, + Changefeed: changefeed, }) } } @@ -206,18 +245,25 @@ func (c *cluster) refreshInfo(ctx context.Context) error { func moveTable(ctx context.Context, ownerAddr string, changefeed string, target string, tableID int64) error { formStr := fmt.Sprintf("cf-id=%s&target-cp-id=%s&table-id=%d", changefeed, target, tableID) + log.Debug("preparing HTTP API call to owner", zap.String("formStr", formStr)) rd := bytes.NewReader([]byte(formStr)) - req, err := http.NewRequestWithContext(ctx, "POST", ownerAddr+"/capture/owner/move_table", rd) + req, err := http.NewRequestWithContext(ctx, "POST", "http://"+ownerAddr+"/capture/owner/move_table", rd) if err != nil { return errors.Trace(err) } + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") resp, err := http.DefaultClient.Do(req) if err != nil { return errors.Trace(err) } if resp.StatusCode < 200 || resp.StatusCode >= 300 { + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return errors.Trace(err) + } + log.Warn("http error", zap.ByteString("body", body)) return errors.New(resp.Status) } diff --git a/tests/move_table/run.sh b/tests/move_table/run.sh index b124a7d3361..98d1fd0395d 100644 --- a/tests/move_table/run.sh +++ b/tests/move_table/run.sh @@ -19,9 +19,7 @@ function run() { start_ts=$(run_cdc_cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1) run_sql "CREATE DATABASE move_table;" ${UP_TIDB_HOST} ${UP_TIDB_PORT} go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=move_table - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "debug" --logsuffix "cdc1" --addr 127.0.0.1:8300 - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "debug" --logsuffix "cdc2" --addr 127.0.0.1:8301 - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "debug" --logsuffix "cdc3" --addr 127.0.0.1:8302 + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "debug" --logsuffix "1" --addr 127.0.0.1:8300 TOPIC_NAME="ticdc-move-table-test-$RANDOM" case $SINK_TYPE in @@ -34,11 +32,6 @@ function run() { run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4" fi - sleep 5 - cd $CUR - GO111MODULE=on go run main.go -pd http://$UP_PD_HOST_1:$UP_PD_PORT_1 2>&1 | tee $WORK_DIR/tester.log & - cd $WORK_DIR - # Add a check table to reduce check time, or if we check data with sync diff # directly, there maybe a lot of diff data at first because of the incremental scan run_sql "CREATE table move_table.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} @@ -59,8 +52,17 @@ function run() { run_sql "create table move_table.USERTABLE2 like move_table.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "insert into move_table.USERTABLE2 select * from move_table.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "create table move_table.USERTABLE3 like move_table.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "insert into move_table.USERTABLE3 select * from move_table.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "create table move_table.check4(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + + sleep 10 check_table_exists "move_table.USERTABLE2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "debug" --logsuffix "2" --addr 127.0.0.1:8301 + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "debug" --logsuffix "3" --addr 127.0.0.1:8302 + + check_table_exists "move_table.USERTABLE3" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} check_table_exists "move_table.check4" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml From fe03dcba7e82f40966bb0d147696a7136d61e533 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Thu, 12 Nov 2020 15:13:43 +0800 Subject: [PATCH 05/27] fix integration test --- tests/move_table/conf/workload | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/move_table/conf/workload b/tests/move_table/conf/workload index 6322c4a4405..c249663fe5b 100644 --- a/tests/move_table/conf/workload +++ b/tests/move_table/conf/workload @@ -1,5 +1,5 @@ threadcount=10 -recordcount=150000 +recordcount=50000 operationcount=0 workload=core From e736022ef990580a172d743ab7f448e1d5d0102a Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Thu, 12 Nov 2020 15:45:02 +0800 Subject: [PATCH 06/27] fix integration test --- tests/move_table/run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/move_table/run.sh b/tests/move_table/run.sh index 98d1fd0395d..6ecb75c5a1a 100644 --- a/tests/move_table/run.sh +++ b/tests/move_table/run.sh @@ -62,7 +62,7 @@ function run() { run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "debug" --logsuffix "2" --addr 127.0.0.1:8301 run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "debug" --logsuffix "3" --addr 127.0.0.1:8302 - check_table_exists "move_table.USERTABLE3" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + check_table_exists "move_table.USERTABLE3" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120 check_table_exists "move_table.check4" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml From 093856f99794e8ea9baac62db5fa7f681f23dad2 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Thu, 12 Nov 2020 16:34:11 +0800 Subject: [PATCH 07/27] fix integration test --- tests/move_table/conf/workload | 2 +- tests/move_table/run.sh | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/move_table/conf/workload b/tests/move_table/conf/workload index c249663fe5b..632789648c4 100644 --- a/tests/move_table/conf/workload +++ b/tests/move_table/conf/workload @@ -1,5 +1,5 @@ threadcount=10 -recordcount=50000 +recordcount=90000 operationcount=0 workload=core diff --git a/tests/move_table/run.sh b/tests/move_table/run.sh index 6ecb75c5a1a..adf501d00e2 100644 --- a/tests/move_table/run.sh +++ b/tests/move_table/run.sh @@ -54,6 +54,10 @@ function run() { run_sql "insert into move_table.USERTABLE2 select * from move_table.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "create table move_table.USERTABLE3 like move_table.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "insert into move_table.USERTABLE3 select * from move_table.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "create table move_table.USERTABLE4 like move_table.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "insert into move_table.USERTABLE4 select * from move_table.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "create table move_table.USERTABLE5 like move_table.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "insert into move_table.USERTABLE5 select * from move_table.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "create table move_table.check4(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} sleep 10 @@ -63,6 +67,8 @@ function run() { run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "debug" --logsuffix "3" --addr 127.0.0.1:8302 check_table_exists "move_table.USERTABLE3" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120 + check_table_exists "move_table.USERTABLE4" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120 + check_table_exists "move_table.USERTABLE5" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120 check_table_exists "move_table.check4" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml From db2734edb91671c9aee6ced43c8779de9592d38c Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Fri, 13 Nov 2020 13:44:20 +0800 Subject: [PATCH 08/27] fix startTs --- cdc/changefeed.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cdc/changefeed.go b/cdc/changefeed.go index 9b56969bed2..eb6a0060325 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -522,13 +522,15 @@ func (c *changeFeed) handleMoveTableJobs(ctx context.Context, captures map[model case model.MoveTableStatusDeleted: // add table to target capture status, exist := cloneStatus(job.To) + replicaInfo := job.TableReplicaInfo.Clone() + replicaInfo.StartTs = c.status.CheckpointTs if !exist { // the target capture is not exist, add table to orphanTables. c.orphanTables[tableID] = job.TableReplicaInfo.StartTs log.Warn("the target capture is not exist, sent the table to orphanTables", zap.Reflect("job", job)) continue } - status.AddTable(tableID, job.TableReplicaInfo, job.TableReplicaInfo.StartTs) + status.AddTable(tableID, job.TableReplicaInfo, c.status.CheckpointTs) job.Status = model.MoveTableStatusFinished delete(c.moveTableJobs, tableID) log.Info("handle the move job, add table to the target capture", zap.Reflect("job", job)) From b22104ae89fe97849c6051903f23ac2ac712cfac Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Fri, 13 Nov 2020 14:14:40 +0800 Subject: [PATCH 09/27] add fatal for startTs < checkpoint --- cdc/processor.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/cdc/processor.go b/cdc/processor.go index e06d70fe42b..0c02078c00f 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -86,6 +86,7 @@ type processor struct { globalResolvedTs uint64 localResolvedTs uint64 checkpointTs uint64 + globalcheckpointTs uint64 flushCheckpointInterval time.Duration ddlPuller puller.Puller @@ -662,6 +663,7 @@ func (p *processor) globalStatusWorker(ctx context.Context) error { defer globalResolvedTsNotifier.Close() updateStatus := func(changefeedStatus *model.ChangeFeedStatus) { + atomic.StoreUint64(&p.globalcheckpointTs, changefeedStatus.CheckpointTs) if lastResolvedTs == changefeedStatus.ResolvedTs && lastCheckPointTs == changefeedStatus.CheckpointTs { return @@ -913,6 +915,15 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo p.stateMu.Lock() defer p.stateMu.Unlock() + globalcheckpointTs := atomic.LoadUint64(&p.globalcheckpointTs) + + if replicaInfo.StartTs < globalcheckpointTs { + log.Fatal("addTable: startTs < checkpoint", + zap.Int64("tableID", tableID), + zap.Uint64("checkpoint", globalcheckpointTs), + zap.Uint64("startTs", replicaInfo.StartTs)) + } + var tableName string err := retry.Run(time.Millisecond*5, 3, func() error { if name, ok := p.schemaStorage.GetLastSnapshot().GetTableNameByID(tableID); ok { From 1015ff1d320a3c786b7ccfc018403de129e15f20 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Fri, 13 Nov 2020 15:02:37 +0800 Subject: [PATCH 10/27] Test --- cdc/changefeed.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdc/changefeed.go b/cdc/changefeed.go index eb6a0060325..d9d828f2148 100644 --- a/cdc/changefeed.go +++ b/cdc/changefeed.go @@ -526,11 +526,11 @@ func (c *changeFeed) handleMoveTableJobs(ctx context.Context, captures map[model replicaInfo.StartTs = c.status.CheckpointTs if !exist { // the target capture is not exist, add table to orphanTables. - c.orphanTables[tableID] = job.TableReplicaInfo.StartTs + c.orphanTables[tableID] = replicaInfo.StartTs log.Warn("the target capture is not exist, sent the table to orphanTables", zap.Reflect("job", job)) continue } - status.AddTable(tableID, job.TableReplicaInfo, c.status.CheckpointTs) + status.AddTable(tableID, replicaInfo, c.status.CheckpointTs) job.Status = model.MoveTableStatusFinished delete(c.moveTableJobs, tableID) log.Info("handle the move job, add table to the target capture", zap.Reflect("job", job)) From c9e173361956097fcd2b5d1ae57dd56d15b4338b Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Fri, 13 Nov 2020 15:58:16 +0800 Subject: [PATCH 11/27] debug log --- cdc/owner.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cdc/owner.go b/cdc/owner.go index ad7cd91d92d..54a24267b72 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -1176,6 +1176,11 @@ func (o *Owner) cleanUpStaleTasks(ctx context.Context, captures []*model.Capture captureIDs[captureID] = struct{}{} } + log.Debug("cleanUpStaleTasks", + zap.Reflect("statuses", statuses), + zap.Reflect("positions", positions), + zap.Reflect("workloads", workloads)) + for captureID := range captureIDs { if _, ok := active[captureID]; !ok { status, ok1 := statuses[captureID] From f837e100aa9ecaa148fead34b0ca81c4a21a941d Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Fri, 13 Nov 2020 16:49:35 +0800 Subject: [PATCH 12/27] move fatal --- cdc/processor.go | 19 ++++++++++--------- tests/move_table/main.go | 1 + 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/cdc/processor.go b/cdc/processor.go index 0c02078c00f..023f4772fa1 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -915,15 +915,6 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo p.stateMu.Lock() defer p.stateMu.Unlock() - globalcheckpointTs := atomic.LoadUint64(&p.globalcheckpointTs) - - if replicaInfo.StartTs < globalcheckpointTs { - log.Fatal("addTable: startTs < checkpoint", - zap.Int64("tableID", tableID), - zap.Uint64("checkpoint", globalcheckpointTs), - zap.Uint64("startTs", replicaInfo.StartTs)) - } - var tableName string err := retry.Run(time.Millisecond*5, 3, func() error { if name, ok := p.schemaStorage.GetLastSnapshot().GetTableNameByID(tableID); ok { @@ -946,6 +937,16 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo return } } + + globalcheckpointTs := atomic.LoadUint64(&p.globalcheckpointTs) + + if replicaInfo.StartTs < globalcheckpointTs { + log.Fatal("addTable: startTs < checkpoint", + zap.Int64("tableID", tableID), + zap.Uint64("checkpoint", globalcheckpointTs), + zap.Uint64("startTs", replicaInfo.StartTs)) + } + globalResolvedTs := atomic.LoadUint64(&p.sinkEmittedResolvedTs) log.Debug("Add table", zap.Int64("tableID", tableID), zap.String("name", tableName), diff --git a/tests/move_table/main.go b/tests/move_table/main.go index cf0fc6aa771..83a556f509b 100644 --- a/tests/move_table/main.go +++ b/tests/move_table/main.go @@ -139,6 +139,7 @@ func main() { } } } + type tableInfo struct { ID int64 Changefeed string From 69671c027ba2c2438dbee662c0246a4087e392b5 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Fri, 20 Nov 2020 13:14:59 +0800 Subject: [PATCH 13/27] delay SchemaStorage GC --- cdc/processor.go | 10 ++++++++-- errors.toml | 2 +- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/cdc/processor.go b/cdc/processor.go index b565bf76197..41709b94801 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -61,6 +61,8 @@ const ( defaultMemBufferCapacity int64 = 10 * 1024 * 1024 * 1024 // 10G defaultSyncResolvedBatch = 1024 + + schemaStorageGCLag = time.Minute * 20 ) var ( @@ -671,7 +673,11 @@ func (p *processor) globalStatusWorker(ctx context.Context) error { return } if lastCheckPointTs < changefeedStatus.CheckpointTs { - p.schemaStorage.DoGC(changefeedStatus.CheckpointTs) + // Delay GC to accommodate pullers starting from a startTs that's too small + // TODO fix startTs problem and remove GC delay, or use other mechanism that prevents the problem deterministically + gcTime := oracle.GetTimeFromTS(changefeedStatus.CheckpointTs).Add(-schemaStorageGCLag) + gcTs := oracle.ComposeTS(gcTime.Unix(), 0) + p.schemaStorage.DoGC(gcTs) lastCheckPointTs = changefeedStatus.CheckpointTs } if lastResolvedTs < changefeedStatus.ResolvedTs { @@ -943,7 +949,7 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo globalcheckpointTs := atomic.LoadUint64(&p.globalcheckpointTs) if replicaInfo.StartTs < globalcheckpointTs { - log.Fatal("addTable: startTs < checkpoint", + log.Warn("addTable: startTs < checkpoint", zap.Int64("tableID", tableID), zap.Uint64("checkpoint", globalcheckpointTs), zap.Uint64("startTs", replicaInfo.StartTs)) diff --git a/errors.toml b/errors.toml index 6019129b324..c37b394079d 100755 --- a/errors.toml +++ b/errors.toml @@ -1,5 +1,5 @@ # AUTOGENERATED BY github.com/pingcap/tiup/components/errdoc/errdoc-gen -# YOU CAN CHANGE THE 'description'/'workaround' FIELDS IF THEM ARE IMPROPER. +# DO NOT EDIT THIS FILE, PLEASE CHANGE ERROR DEFINITION IF CONTENT IMPROPER. ["CDC:ErrAPIInvalidParam"] error = ''' From 2f7b9ed3005699f4697559342805db13b33a7cbc Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Fri, 20 Nov 2020 13:41:27 +0800 Subject: [PATCH 14/27] fix errordoc --- errors.toml | 683 ---------------------------------------------------- 1 file changed, 683 deletions(-) delete mode 100755 errors.toml diff --git a/errors.toml b/errors.toml deleted file mode 100755 index c37b394079d..00000000000 --- a/errors.toml +++ /dev/null @@ -1,683 +0,0 @@ -# AUTOGENERATED BY github.com/pingcap/tiup/components/errdoc/errdoc-gen -# DO NOT EDIT THIS FILE, PLEASE CHANGE ERROR DEFINITION IF CONTENT IMPROPER. - -["CDC:ErrAPIInvalidParam"] -error = ''' -invalid api parameter -''' - -["CDC:ErrAdminStopProcessor"] -error = ''' -stop processor by admin command -''' - -["CDC:ErrAsyncBroadcaseNotSupport"] -error = ''' -Async broadcasts not supported -''' - -["CDC:ErrAvroEncodeFailed"] -error = ''' -encode to avro native data -''' - -["CDC:ErrAvroEncodeToBinary"] -error = ''' -encode to binray from native -''' - -["CDC:ErrAvroMarshalFailed"] -error = ''' -json marshal failed -''' - -["CDC:ErrAvroSchemaAPIError"] -error = ''' -schema manager API error -''' - -["CDC:ErrAvroToEnvelopeError"] -error = ''' -to envelope failed -''' - -["CDC:ErrAvroUnknownType"] -error = ''' -unknown type for Avro: %v -''' - -["CDC:ErrBufferReachLimit"] -error = ''' -puller mem buffer reach size limit -''' - -["CDC:ErrCachedTSONotExists"] -error = ''' -GetCachedCurrentVersion: cache entry does not exist -''' - -["CDC:ErrCanalDecodeFailed"] -error = ''' -canal decode failed -''' - -["CDC:ErrCanalEncodeFailed"] -error = ''' -canal encode failed -''' - -["CDC:ErrCaptureCampaignOwner"] -error = ''' -campaign owner failed -''' - -["CDC:ErrCaptureNotExist"] -error = ''' -capture not exists, key: %s -''' - -["CDC:ErrCaptureRegister"] -error = ''' -capture register to etcd failed -''' - -["CDC:ErrCaptureResignOwner"] -error = ''' -resign owner failed -''' - -["CDC:ErrCaptureSuicide"] -error = ''' -capture suicide -''' - -["CDC:ErrChangeFeedAlreadyExists"] -error = ''' -changefeed already exists, key: %s -''' - -["CDC:ErrChangeFeedNotExists"] -error = ''' -changefeed not exists, key: %s -''' - -["CDC:ErrChangefeedAbnormalState"] -error = ''' -changefeed in abnormal state: %s, replication status: %+v -''' - -["CDC:ErrCheckClusterVersionFromPD"] -error = ''' -failed to request PD -''' - -["CDC:ErrCheckDirWritable"] -error = ''' -check dir writable failed -''' - -["CDC:ErrCodecDecode"] -error = ''' -codec decode error -''' - -["CDC:ErrCreateMarkTableFailed"] -error = ''' -create mark table failed -''' - -["CDC:ErrDDLEventIgnored"] -error = ''' -ddl event is ignored -''' - -["CDC:ErrDatumUnflatten"] -error = ''' -unflatten datume data -''' - -["CDC:ErrDecodeFailed"] -error = ''' -decode failed: %s -''' - -["CDC:ErrDecodeRowToDatum"] -error = ''' -decode row data to datum failed -''' - -["CDC:ErrEncodeFailed"] -error = ''' -encode failed: %s -''' - -["CDC:ErrEventFeedAborted"] -error = ''' -single event feed aborted -''' - -["CDC:ErrEventFeedEventError"] -error = ''' -eventfeed returns event error -''' - -["CDC:ErrExecDDLFailed"] -error = ''' -exec DDL failed -''' - -["CDC:ErrFetchHandleValue"] -error = ''' -can't find handle column, please check if the pk is handle -''' - -["CDC:ErrFileSinkCreateDir"] -error = ''' -file sink create dir -''' - -["CDC:ErrFileSinkFileOp"] -error = ''' -file sink file operation -''' - -["CDC:ErrFileSinkMetaAlreadyExists"] -error = ''' -file sink meta file already exists -''' - -["CDC:ErrFileSorterDecode"] -error = ''' -decode failed -''' - -["CDC:ErrFileSorterEncode"] -error = ''' -encode failed -''' - -["CDC:ErrFileSorterInvalidData"] -error = ''' -invalid data -''' - -["CDC:ErrFileSorterOpenFile"] -error = ''' -open file failed -''' - -["CDC:ErrFileSorterReadFile"] -error = ''' -read file failed -''' - -["CDC:ErrFileSorterWriteFile"] -error = ''' -write file failed -''' - -["CDC:ErrFilterRuleInvalid"] -error = ''' -filter rule is invalid -''' - -["CDC:ErrGRPCDialFailed"] -error = ''' -grpc dial failed -''' - -["CDC:ErrGetAllStoresFailed"] -error = ''' -get stores from pd failed -''' - -["CDC:ErrGetRegionFailed"] -error = ''' -get region failed -''' - -["CDC:ErrGetStoreSnapshot"] -error = ''' -get snapshot failed -''' - -["CDC:ErrGetTiKVRPCContext"] -error = ''' -get tikv grpc context failed -''' - -["CDC:ErrIndexKeyTableNotFound"] -error = ''' -table not found with index ID %d in index kv -''' - -["CDC:ErrInternalServerError"] -error = ''' -internal server error -''' - -["CDC:ErrIntersectNoOverlap"] -error = ''' -span doesn't overlap: %+v vs %+v -''' - -["CDC:ErrInvalidAdminJobType"] -error = ''' -invalid admin job type: %d -''' - -["CDC:ErrInvalidChangefeedID"] -error = ''' -bad changefeed id, please match the pattern "^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$", eg, "simple-changefeed-task" -''' - -["CDC:ErrInvalidEtcdKey"] -error = ''' -invalid key: %s -''' - -["CDC:ErrInvalidRecordKey"] -error = ''' -invalid record key - %q -''' - -["CDC:ErrInvalidServerOption"] -error = ''' -invalid server option -''' - -["CDC:ErrInvalidTaskKey"] -error = ''' -invalid task key: %s -''' - -["CDC:ErrJSONCodecInvalidData"] -error = ''' -json codec invalid data -''' - -["CDC:ErrKVStorageBackoffFailed"] -error = ''' -backoff failed -''' - -["CDC:ErrKVStorageRegionError"] -error = ''' -req with region error -''' - -["CDC:ErrKVStorageRespEmpty"] -error = ''' -tikv response body missing -''' - -["CDC:ErrKVStorageSendReq"] -error = ''' -send req to kv storage -''' - -["CDC:ErrKafkaAsyncSendMessage"] -error = ''' -kafka async send message failed -''' - -["CDC:ErrKafkaFlushUnfished"] -error = ''' -flush not finished before producer close -''' - -["CDC:ErrKafkaInvalidClientID"] -error = ''' -invalid kafka client ID '%s' -''' - -["CDC:ErrKafkaInvalidConfig"] -error = ''' -kafka config invalid -''' - -["CDC:ErrKafkaInvalidPartitionNum"] -error = ''' -invalid partition num %d -''' - -["CDC:ErrKafkaInvalidVersion"] -error = ''' -invalid kafka version -''' - -["CDC:ErrKafkaNewSaramaProducer"] -error = ''' -new sarama producer -''' - -["CDC:ErrKafkaSendMessage"] -error = ''' -kafka send message failed -''' - -["CDC:ErrLoadTimezone"] -error = ''' -load timezone -''' - -["CDC:ErrLocateRegion"] -error = ''' -locate region by id -''' - -["CDC:ErrMarshalFailed"] -error = ''' -marshal failed -''' - -["CDC:ErrMaxwellDecodeFailed"] -error = ''' -maxwell decode failed -''' - -["CDC:ErrMaxwellEncodeFailed"] -error = ''' -maxwell encode failed -''' - -["CDC:ErrMaxwellInvalidData"] -error = ''' -maxwell invalid data -''' - -["CDC:ErrMetaListDatabases"] -error = ''' -meta store list databases -''' - -["CDC:ErrMetaNotInRegion"] -error = ''' -meta not exists in region -''' - -["CDC:ErrMySQLConnectionError"] -error = ''' -MySQL connection error -''' - -["CDC:ErrMySQLInvalidConfig"] -error = ''' -MySQL config invaldi -''' - -["CDC:ErrMySQLQueryError"] -error = ''' -MySQL query error -''' - -["CDC:ErrMySQLTxnError"] -error = ''' -MySQL txn error -''' - -["CDC:ErrMySQLWorkerPanic"] -error = ''' -MySQL worker panic -''' - -["CDC:ErrNewCaptureFailed"] -error = ''' -new capture failed -''' - -["CDC:ErrNewProcessorFailed"] -error = ''' -new processor failed -''' - -["CDC:ErrNewSemVersion"] -error = ''' -create sem version -''' - -["CDC:ErrNewStore"] -error = ''' -new store failed -''' - -["CDC:ErrNoPendingRegion"] -error = ''' -received event regionID %v, requestID %v from %v, but neither pending region nor running region was found -''' - -["CDC:ErrOldValueNotEnabled"] -error = ''' -old value is not enabled -''' - -["CDC:ErrOwnerCampaignKeyDeleted"] -error = ''' -owner campaign key deleted -''' - -["CDC:ErrOwnerChangefeedNotFound"] -error = ''' -changefeed %s not found in owner cache -''' - -["CDC:ErrOwnerEtcdWatch"] -error = ''' -etcd watch returns error -''' - -["CDC:ErrOwnerSortDir"] -error = ''' -owner sort dir -''' - -["CDC:ErrPDBatchLoadRegions"] -error = ''' -pd batch load regions failed -''' - -["CDC:ErrPDEtcdAPIError"] -error = ''' -etcd api call error -''' - -["CDC:ErrPendingRegionCancel"] -error = ''' -pending region cancelled due to stream disconnecting -''' - -["CDC:ErrPrepareAvroFailed"] -error = ''' -prepare avro failed -''' - -["CDC:ErrPrewriteNotMatch"] -error = ''' -prewrite not match, key: %b, start-ts: %d -''' - -["CDC:ErrProcessorEtcdWatch"] -error = ''' -etcd watch returns error -''' - -["CDC:ErrProcessorSortDir"] -error = ''' -sort dir error -''' - -["CDC:ErrProcessorTableNotFound"] -error = ''' -table not found in processor cache -''' - -["CDC:ErrProcessorUnknown"] -error = ''' -processor running unknown error -''' - -["CDC:ErrPulsarNewProducer"] -error = ''' -new pulsar producer -''' - -["CDC:ErrPulsarSendMessage"] -error = ''' -pulsar send message failed -''' - -["CDC:ErrRegionsNotCoverSpan"] -error = ''' -regions not completely left cover span, span %v regions: %v -''' - -["CDC:ErrResolveLocks"] -error = ''' -resolve locks failed -''' - -["CDC:ErrS3SinkInitialzie"] -error = ''' -new s3 sink -''' - -["CDC:ErrS3SinkStorageAPI"] -error = ''' -s3 sink storage api -''' - -["CDC:ErrS3SinkWriteStorage"] -error = ''' -write to storage -''' - -["CDC:ErrScanLockFailed"] -error = ''' -scan lock failed -''' - -["CDC:ErrSchemaSnapshotNotFound"] -error = ''' -can not found schema snapshot, ts: %d -''' - -["CDC:ErrSchemaStorageGCed"] -error = ''' -can not found schema snapshot, the specified ts(%d) is less than gcTS(%d) -''' - -["CDC:ErrSchemaStorageTableMiss"] -error = ''' -table %d not found -''' - -["CDC:ErrSchemaStorageUnresolved"] -error = ''' -can not found schema snapshot, the specified ts(%d) is more than resolvedTs(%d) -''' - -["CDC:ErrServeHTTP"] -error = ''' -serve http error -''' - -["CDC:ErrServerNewPDClient"] -error = ''' -server creates pd client failed -''' - -["CDC:ErrSinkURIInvalid"] -error = ''' -sink uri invalid -''' - -["CDC:ErrSnapshotSchemaExists"] -error = ''' -schema %s(%d) already exists -''' - -["CDC:ErrSnapshotSchemaNotFound"] -error = ''' -schema %d not found in schema snapshot -''' - -["CDC:ErrSnapshotTableExists"] -error = ''' -table %s.%s already exists -''' - -["CDC:ErrSnapshotTableNotFound"] -error = ''' -table %d not found in schema snapshot -''' - -["CDC:ErrSupportPostOnly"] -error = ''' -this api supports POST method only -''' - -["CDC:ErrTaskPositionNotExists"] -error = ''' -task position not exists, key: %s -''' - -["CDC:ErrTaskStatusNotExists"] -error = ''' -task status not exists, key: %s -''' - -["CDC:ErrTiKVEventFeed"] -error = ''' -tikv event feed failed -''' - -["CDC:ErrToTLSConfigFailed"] -error = ''' -generate tls config failed -''' - -["CDC:ErrURLFormatInvalid"] -error = ''' -url format is invalid -''' - -["CDC:ErrUnknownKVEventType"] -error = ''' -unknown kv event type: %v, entry: %v -''' - -["CDC:ErrUnknownMetaType"] -error = ''' -unknown meta type %v -''' - -["CDC:ErrUnknownSortEngine"] -error = ''' -unknown sort engine %s -''' - -["CDC:ErrUnmarshalFailed"] -error = ''' -unmarshal failed -''' - -["CDC:ErrVersionIncompatible"] -error = ''' -version is incompatible: %s -''' - -["CDC:ErrWaitHandleOperationTimeout"] -error = ''' -waiting processor to handle the operation finished timeout -''' - -["CDC:ErrWriteTsConflict"] -error = ''' -write ts conflict -''' - -["CDC:ErrWrongTableInfo"] -error = ''' -wrong table info in unflatten, table id %d, index table id: %d -''' - From c81896f8221128767b7ba290cbaf9938ef9cb7aa Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Fri, 20 Nov 2020 13:51:39 +0800 Subject: [PATCH 15/27] fix errordoc --- errors.toml | 683 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 683 insertions(+) create mode 100755 errors.toml diff --git a/errors.toml b/errors.toml new file mode 100755 index 00000000000..c37b394079d --- /dev/null +++ b/errors.toml @@ -0,0 +1,683 @@ +# AUTOGENERATED BY github.com/pingcap/tiup/components/errdoc/errdoc-gen +# DO NOT EDIT THIS FILE, PLEASE CHANGE ERROR DEFINITION IF CONTENT IMPROPER. + +["CDC:ErrAPIInvalidParam"] +error = ''' +invalid api parameter +''' + +["CDC:ErrAdminStopProcessor"] +error = ''' +stop processor by admin command +''' + +["CDC:ErrAsyncBroadcaseNotSupport"] +error = ''' +Async broadcasts not supported +''' + +["CDC:ErrAvroEncodeFailed"] +error = ''' +encode to avro native data +''' + +["CDC:ErrAvroEncodeToBinary"] +error = ''' +encode to binray from native +''' + +["CDC:ErrAvroMarshalFailed"] +error = ''' +json marshal failed +''' + +["CDC:ErrAvroSchemaAPIError"] +error = ''' +schema manager API error +''' + +["CDC:ErrAvroToEnvelopeError"] +error = ''' +to envelope failed +''' + +["CDC:ErrAvroUnknownType"] +error = ''' +unknown type for Avro: %v +''' + +["CDC:ErrBufferReachLimit"] +error = ''' +puller mem buffer reach size limit +''' + +["CDC:ErrCachedTSONotExists"] +error = ''' +GetCachedCurrentVersion: cache entry does not exist +''' + +["CDC:ErrCanalDecodeFailed"] +error = ''' +canal decode failed +''' + +["CDC:ErrCanalEncodeFailed"] +error = ''' +canal encode failed +''' + +["CDC:ErrCaptureCampaignOwner"] +error = ''' +campaign owner failed +''' + +["CDC:ErrCaptureNotExist"] +error = ''' +capture not exists, key: %s +''' + +["CDC:ErrCaptureRegister"] +error = ''' +capture register to etcd failed +''' + +["CDC:ErrCaptureResignOwner"] +error = ''' +resign owner failed +''' + +["CDC:ErrCaptureSuicide"] +error = ''' +capture suicide +''' + +["CDC:ErrChangeFeedAlreadyExists"] +error = ''' +changefeed already exists, key: %s +''' + +["CDC:ErrChangeFeedNotExists"] +error = ''' +changefeed not exists, key: %s +''' + +["CDC:ErrChangefeedAbnormalState"] +error = ''' +changefeed in abnormal state: %s, replication status: %+v +''' + +["CDC:ErrCheckClusterVersionFromPD"] +error = ''' +failed to request PD +''' + +["CDC:ErrCheckDirWritable"] +error = ''' +check dir writable failed +''' + +["CDC:ErrCodecDecode"] +error = ''' +codec decode error +''' + +["CDC:ErrCreateMarkTableFailed"] +error = ''' +create mark table failed +''' + +["CDC:ErrDDLEventIgnored"] +error = ''' +ddl event is ignored +''' + +["CDC:ErrDatumUnflatten"] +error = ''' +unflatten datume data +''' + +["CDC:ErrDecodeFailed"] +error = ''' +decode failed: %s +''' + +["CDC:ErrDecodeRowToDatum"] +error = ''' +decode row data to datum failed +''' + +["CDC:ErrEncodeFailed"] +error = ''' +encode failed: %s +''' + +["CDC:ErrEventFeedAborted"] +error = ''' +single event feed aborted +''' + +["CDC:ErrEventFeedEventError"] +error = ''' +eventfeed returns event error +''' + +["CDC:ErrExecDDLFailed"] +error = ''' +exec DDL failed +''' + +["CDC:ErrFetchHandleValue"] +error = ''' +can't find handle column, please check if the pk is handle +''' + +["CDC:ErrFileSinkCreateDir"] +error = ''' +file sink create dir +''' + +["CDC:ErrFileSinkFileOp"] +error = ''' +file sink file operation +''' + +["CDC:ErrFileSinkMetaAlreadyExists"] +error = ''' +file sink meta file already exists +''' + +["CDC:ErrFileSorterDecode"] +error = ''' +decode failed +''' + +["CDC:ErrFileSorterEncode"] +error = ''' +encode failed +''' + +["CDC:ErrFileSorterInvalidData"] +error = ''' +invalid data +''' + +["CDC:ErrFileSorterOpenFile"] +error = ''' +open file failed +''' + +["CDC:ErrFileSorterReadFile"] +error = ''' +read file failed +''' + +["CDC:ErrFileSorterWriteFile"] +error = ''' +write file failed +''' + +["CDC:ErrFilterRuleInvalid"] +error = ''' +filter rule is invalid +''' + +["CDC:ErrGRPCDialFailed"] +error = ''' +grpc dial failed +''' + +["CDC:ErrGetAllStoresFailed"] +error = ''' +get stores from pd failed +''' + +["CDC:ErrGetRegionFailed"] +error = ''' +get region failed +''' + +["CDC:ErrGetStoreSnapshot"] +error = ''' +get snapshot failed +''' + +["CDC:ErrGetTiKVRPCContext"] +error = ''' +get tikv grpc context failed +''' + +["CDC:ErrIndexKeyTableNotFound"] +error = ''' +table not found with index ID %d in index kv +''' + +["CDC:ErrInternalServerError"] +error = ''' +internal server error +''' + +["CDC:ErrIntersectNoOverlap"] +error = ''' +span doesn't overlap: %+v vs %+v +''' + +["CDC:ErrInvalidAdminJobType"] +error = ''' +invalid admin job type: %d +''' + +["CDC:ErrInvalidChangefeedID"] +error = ''' +bad changefeed id, please match the pattern "^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$", eg, "simple-changefeed-task" +''' + +["CDC:ErrInvalidEtcdKey"] +error = ''' +invalid key: %s +''' + +["CDC:ErrInvalidRecordKey"] +error = ''' +invalid record key - %q +''' + +["CDC:ErrInvalidServerOption"] +error = ''' +invalid server option +''' + +["CDC:ErrInvalidTaskKey"] +error = ''' +invalid task key: %s +''' + +["CDC:ErrJSONCodecInvalidData"] +error = ''' +json codec invalid data +''' + +["CDC:ErrKVStorageBackoffFailed"] +error = ''' +backoff failed +''' + +["CDC:ErrKVStorageRegionError"] +error = ''' +req with region error +''' + +["CDC:ErrKVStorageRespEmpty"] +error = ''' +tikv response body missing +''' + +["CDC:ErrKVStorageSendReq"] +error = ''' +send req to kv storage +''' + +["CDC:ErrKafkaAsyncSendMessage"] +error = ''' +kafka async send message failed +''' + +["CDC:ErrKafkaFlushUnfished"] +error = ''' +flush not finished before producer close +''' + +["CDC:ErrKafkaInvalidClientID"] +error = ''' +invalid kafka client ID '%s' +''' + +["CDC:ErrKafkaInvalidConfig"] +error = ''' +kafka config invalid +''' + +["CDC:ErrKafkaInvalidPartitionNum"] +error = ''' +invalid partition num %d +''' + +["CDC:ErrKafkaInvalidVersion"] +error = ''' +invalid kafka version +''' + +["CDC:ErrKafkaNewSaramaProducer"] +error = ''' +new sarama producer +''' + +["CDC:ErrKafkaSendMessage"] +error = ''' +kafka send message failed +''' + +["CDC:ErrLoadTimezone"] +error = ''' +load timezone +''' + +["CDC:ErrLocateRegion"] +error = ''' +locate region by id +''' + +["CDC:ErrMarshalFailed"] +error = ''' +marshal failed +''' + +["CDC:ErrMaxwellDecodeFailed"] +error = ''' +maxwell decode failed +''' + +["CDC:ErrMaxwellEncodeFailed"] +error = ''' +maxwell encode failed +''' + +["CDC:ErrMaxwellInvalidData"] +error = ''' +maxwell invalid data +''' + +["CDC:ErrMetaListDatabases"] +error = ''' +meta store list databases +''' + +["CDC:ErrMetaNotInRegion"] +error = ''' +meta not exists in region +''' + +["CDC:ErrMySQLConnectionError"] +error = ''' +MySQL connection error +''' + +["CDC:ErrMySQLInvalidConfig"] +error = ''' +MySQL config invaldi +''' + +["CDC:ErrMySQLQueryError"] +error = ''' +MySQL query error +''' + +["CDC:ErrMySQLTxnError"] +error = ''' +MySQL txn error +''' + +["CDC:ErrMySQLWorkerPanic"] +error = ''' +MySQL worker panic +''' + +["CDC:ErrNewCaptureFailed"] +error = ''' +new capture failed +''' + +["CDC:ErrNewProcessorFailed"] +error = ''' +new processor failed +''' + +["CDC:ErrNewSemVersion"] +error = ''' +create sem version +''' + +["CDC:ErrNewStore"] +error = ''' +new store failed +''' + +["CDC:ErrNoPendingRegion"] +error = ''' +received event regionID %v, requestID %v from %v, but neither pending region nor running region was found +''' + +["CDC:ErrOldValueNotEnabled"] +error = ''' +old value is not enabled +''' + +["CDC:ErrOwnerCampaignKeyDeleted"] +error = ''' +owner campaign key deleted +''' + +["CDC:ErrOwnerChangefeedNotFound"] +error = ''' +changefeed %s not found in owner cache +''' + +["CDC:ErrOwnerEtcdWatch"] +error = ''' +etcd watch returns error +''' + +["CDC:ErrOwnerSortDir"] +error = ''' +owner sort dir +''' + +["CDC:ErrPDBatchLoadRegions"] +error = ''' +pd batch load regions failed +''' + +["CDC:ErrPDEtcdAPIError"] +error = ''' +etcd api call error +''' + +["CDC:ErrPendingRegionCancel"] +error = ''' +pending region cancelled due to stream disconnecting +''' + +["CDC:ErrPrepareAvroFailed"] +error = ''' +prepare avro failed +''' + +["CDC:ErrPrewriteNotMatch"] +error = ''' +prewrite not match, key: %b, start-ts: %d +''' + +["CDC:ErrProcessorEtcdWatch"] +error = ''' +etcd watch returns error +''' + +["CDC:ErrProcessorSortDir"] +error = ''' +sort dir error +''' + +["CDC:ErrProcessorTableNotFound"] +error = ''' +table not found in processor cache +''' + +["CDC:ErrProcessorUnknown"] +error = ''' +processor running unknown error +''' + +["CDC:ErrPulsarNewProducer"] +error = ''' +new pulsar producer +''' + +["CDC:ErrPulsarSendMessage"] +error = ''' +pulsar send message failed +''' + +["CDC:ErrRegionsNotCoverSpan"] +error = ''' +regions not completely left cover span, span %v regions: %v +''' + +["CDC:ErrResolveLocks"] +error = ''' +resolve locks failed +''' + +["CDC:ErrS3SinkInitialzie"] +error = ''' +new s3 sink +''' + +["CDC:ErrS3SinkStorageAPI"] +error = ''' +s3 sink storage api +''' + +["CDC:ErrS3SinkWriteStorage"] +error = ''' +write to storage +''' + +["CDC:ErrScanLockFailed"] +error = ''' +scan lock failed +''' + +["CDC:ErrSchemaSnapshotNotFound"] +error = ''' +can not found schema snapshot, ts: %d +''' + +["CDC:ErrSchemaStorageGCed"] +error = ''' +can not found schema snapshot, the specified ts(%d) is less than gcTS(%d) +''' + +["CDC:ErrSchemaStorageTableMiss"] +error = ''' +table %d not found +''' + +["CDC:ErrSchemaStorageUnresolved"] +error = ''' +can not found schema snapshot, the specified ts(%d) is more than resolvedTs(%d) +''' + +["CDC:ErrServeHTTP"] +error = ''' +serve http error +''' + +["CDC:ErrServerNewPDClient"] +error = ''' +server creates pd client failed +''' + +["CDC:ErrSinkURIInvalid"] +error = ''' +sink uri invalid +''' + +["CDC:ErrSnapshotSchemaExists"] +error = ''' +schema %s(%d) already exists +''' + +["CDC:ErrSnapshotSchemaNotFound"] +error = ''' +schema %d not found in schema snapshot +''' + +["CDC:ErrSnapshotTableExists"] +error = ''' +table %s.%s already exists +''' + +["CDC:ErrSnapshotTableNotFound"] +error = ''' +table %d not found in schema snapshot +''' + +["CDC:ErrSupportPostOnly"] +error = ''' +this api supports POST method only +''' + +["CDC:ErrTaskPositionNotExists"] +error = ''' +task position not exists, key: %s +''' + +["CDC:ErrTaskStatusNotExists"] +error = ''' +task status not exists, key: %s +''' + +["CDC:ErrTiKVEventFeed"] +error = ''' +tikv event feed failed +''' + +["CDC:ErrToTLSConfigFailed"] +error = ''' +generate tls config failed +''' + +["CDC:ErrURLFormatInvalid"] +error = ''' +url format is invalid +''' + +["CDC:ErrUnknownKVEventType"] +error = ''' +unknown kv event type: %v, entry: %v +''' + +["CDC:ErrUnknownMetaType"] +error = ''' +unknown meta type %v +''' + +["CDC:ErrUnknownSortEngine"] +error = ''' +unknown sort engine %s +''' + +["CDC:ErrUnmarshalFailed"] +error = ''' +unmarshal failed +''' + +["CDC:ErrVersionIncompatible"] +error = ''' +version is incompatible: %s +''' + +["CDC:ErrWaitHandleOperationTimeout"] +error = ''' +waiting processor to handle the operation finished timeout +''' + +["CDC:ErrWriteTsConflict"] +error = ''' +write ts conflict +''' + +["CDC:ErrWrongTableInfo"] +error = ''' +wrong table info in unflatten, table id %d, index table id: %d +''' + From 0ff9f62f31fa40ca91cce6e3633838ae1b91796a Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Fri, 20 Nov 2020 14:17:02 +0800 Subject: [PATCH 16/27] fix errordoc --- errors.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/errors.toml b/errors.toml index c37b394079d..6019129b324 100755 --- a/errors.toml +++ b/errors.toml @@ -1,5 +1,5 @@ # AUTOGENERATED BY github.com/pingcap/tiup/components/errdoc/errdoc-gen -# DO NOT EDIT THIS FILE, PLEASE CHANGE ERROR DEFINITION IF CONTENT IMPROPER. +# YOU CAN CHANGE THE 'description'/'workaround' FIELDS IF THEM ARE IMPROPER. ["CDC:ErrAPIInvalidParam"] error = ''' From ef4da3c7a76d923bf45fc4d6977b2caeb9895450 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 23 Nov 2020 16:16:03 +0800 Subject: [PATCH 17/27] fix leak test --- pkg/notify/notify_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/notify/notify_test.go b/pkg/notify/notify_test.go index abcc7a9978e..b0d5eee0e88 100644 --- a/pkg/notify/notify_test.go +++ b/pkg/notify/notify_test.go @@ -50,7 +50,7 @@ func (s *notifySuite) TestNotifyHub(c *check.C) { r2.Stop() r3.Stop() c.Assert(len(notifier.receivers), check.Equals, 0) - time.Sleep(time.Second) + time.Sleep(time.Second * 8) r4 := notifier.NewReceiver(-1) <-r4.C r4.Stop() From e35c0daa6b95055415e11506ee2c60b86dc2f7fd Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 23 Nov 2020 16:17:11 +0800 Subject: [PATCH 18/27] reduce move_table workload --- tests/move_table/conf/workload | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/move_table/conf/workload b/tests/move_table/conf/workload index 632789648c4..c249663fe5b 100644 --- a/tests/move_table/conf/workload +++ b/tests/move_table/conf/workload @@ -1,5 +1,5 @@ threadcount=10 -recordcount=90000 +recordcount=50000 operationcount=0 workload=core From 672f5c80910ccdabf514033da7da0c8a3a468edc Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 23 Nov 2020 16:40:50 +0800 Subject: [PATCH 19/27] fix leak test --- pkg/notify/notify_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/notify/notify_test.go b/pkg/notify/notify_test.go index b0d5eee0e88..cb737a4b9c4 100644 --- a/pkg/notify/notify_test.go +++ b/pkg/notify/notify_test.go @@ -36,11 +36,13 @@ func (s *notifySuite) TestNotifyHub(c *check.C) { r1 := notifier.NewReceiver(-1) r2 := notifier.NewReceiver(-1) r3 := notifier.NewReceiver(-1) + finishedCh := make(chan struct{}) go func() { for i := 0; i < 5; i++ { time.Sleep(time.Second) notifier.Notify() } + close(finishedCh) }() <-r1.C r1.Stop() @@ -50,7 +52,6 @@ func (s *notifySuite) TestNotifyHub(c *check.C) { r2.Stop() r3.Stop() c.Assert(len(notifier.receivers), check.Equals, 0) - time.Sleep(time.Second * 8) r4 := notifier.NewReceiver(-1) <-r4.C r4.Stop() @@ -59,6 +60,7 @@ func (s *notifySuite) TestNotifyHub(c *check.C) { r5 := notifier2.NewReceiver(10 * time.Millisecond) <-r5.C r5.Stop() + <-finishedCh // To make the leak checker happy } func (s *notifySuite) TestContinusStop(c *check.C) { From 52111a9ec2e4d82ede87bae4ae31f9d809ce7052 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 23 Nov 2020 17:09:37 +0800 Subject: [PATCH 20/27] modify integration test --- tests/move_table/main.go | 2 +- tests/move_table/run.sh | 36 ++++++++---------------------------- 2 files changed, 9 insertions(+), 29 deletions(-) diff --git a/tests/move_table/main.go b/tests/move_table/main.go index 83a556f509b..af594f1bc33 100644 --- a/tests/move_table/main.go +++ b/tests/move_table/main.go @@ -46,7 +46,7 @@ func main() { } log.Info("table mover started") - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) defer cancel() cluster, err := newCluster(ctx, *pd) diff --git a/tests/move_table/run.sh b/tests/move_table/run.sh index adf501d00e2..e7c2caf05c2 100644 --- a/tests/move_table/run.sh +++ b/tests/move_table/run.sh @@ -39,38 +39,18 @@ function run() { check_table_exists "move_table.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml - run_sql "truncate table move_table.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml - run_sql "CREATE table move_table.check2(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - check_table_exists "move_table.check2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 - check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml - - go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=move_table - run_sql "CREATE table move_table.check3(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - check_table_exists "move_table.check3" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 - check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml - - run_sql "create table move_table.USERTABLE2 like move_table.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "insert into move_table.USERTABLE2 select * from move_table.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "create table move_table.USERTABLE3 like move_table.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "insert into move_table.USERTABLE3 select * from move_table.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "create table move_table.USERTABLE4 like move_table.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "insert into move_table.USERTABLE4 select * from move_table.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "create table move_table.USERTABLE5 like move_table.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "insert into move_table.USERTABLE5 select * from move_table.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - run_sql "create table move_table.check4(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - - sleep 10 - check_table_exists "move_table.USERTABLE2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "debug" --logsuffix "2" --addr 127.0.0.1:8301 run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "debug" --logsuffix "3" --addr 127.0.0.1:8302 - check_table_exists "move_table.USERTABLE3" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120 - check_table_exists "move_table.USERTABLE4" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120 - check_table_exists "move_table.USERTABLE5" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 120 - check_table_exists "move_table.check4" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + sleep 20 + cd $CUR + GO111MODULE=on go run main.go -config ./config.toml 2>&1 | tee $WORK_DIR/tester.log & + cd $WORK_DIR + run_sql "truncate table move_table.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml + run_sql "CREATE table move_table.check2(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + check_table_exists "move_table.check2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml cleanup_process $CDC_BINARY From d8d4a9ca4cffd7ea70e08a3161407ff210ac07ef Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 23 Nov 2020 17:14:44 +0800 Subject: [PATCH 21/27] modify integration test --- tests/move_table/run.sh | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/move_table/run.sh b/tests/move_table/run.sh index e7c2caf05c2..72a0da006b4 100644 --- a/tests/move_table/run.sh +++ b/tests/move_table/run.sh @@ -32,6 +32,9 @@ function run() { run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4" fi + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "debug" --logsuffix "2" --addr 127.0.0.1:8301 + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "debug" --logsuffix "3" --addr 127.0.0.1:8302 + # Add a check table to reduce check time, or if we check data with sync diff # directly, there maybe a lot of diff data at first because of the incremental scan run_sql "CREATE table move_table.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} @@ -39,10 +42,7 @@ function run() { check_table_exists "move_table.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "debug" --logsuffix "2" --addr 127.0.0.1:8301 - run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --loglevel "debug" --logsuffix "3" --addr 127.0.0.1:8302 - - sleep 20 + sleep 60 cd $CUR GO111MODULE=on go run main.go -config ./config.toml 2>&1 | tee $WORK_DIR/tester.log & cd $WORK_DIR From d3dcfb2f292abbfa0035f16f2e2ade2cb0cf01fc Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 23 Nov 2020 17:17:36 +0800 Subject: [PATCH 22/27] modify integration test --- tests/move_table/run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/move_table/run.sh b/tests/move_table/run.sh index 72a0da006b4..9d2c99e5a62 100644 --- a/tests/move_table/run.sh +++ b/tests/move_table/run.sh @@ -44,7 +44,7 @@ function run() { sleep 60 cd $CUR - GO111MODULE=on go run main.go -config ./config.toml 2>&1 | tee $WORK_DIR/tester.log & + GO111MODULE=on go run main.go 2>&1 | tee $WORK_DIR/tester.log & cd $WORK_DIR run_sql "truncate table move_table.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} From faca2a6a89a63fa332b6c9d6b15f245484aad2f8 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 23 Nov 2020 17:23:28 +0800 Subject: [PATCH 23/27] modify table mover --- tests/move_table/main.go | 152 ++++++++++++++++++--------------------- tests/move_table/run.sh | 2 +- 2 files changed, 72 insertions(+), 82 deletions(-) diff --git a/tests/move_table/main.go b/tests/move_table/main.go index af594f1bc33..8e3ddbfe898 100644 --- a/tests/move_table/main.go +++ b/tests/move_table/main.go @@ -54,88 +54,78 @@ func main() { log.Fatal("failed to create cluster info", zap.Error(err)) } - ticker := time.NewTicker(1 * time.Second) - for { - select { - case <-ctx.Done(): - log.Info("Exiting", zap.Error(ctx.Err())) - return - case <-ticker.C: - err := retry.Run(100*time.Millisecond, 20, func() error { - return cluster.refreshInfo(ctx) - }) + err = retry.Run(100*time.Millisecond, 20, func() error { + return cluster.refreshInfo(ctx) + }) + + if err != nil { + log.Warn("error refreshing cluster info", zap.Error(err)) + } + + log.Info("task status", zap.Reflect("status", cluster.captures)) + + if len(cluster.captures) <= 1 { + log.Fatal("no enough captures", zap.Reflect("captures", cluster.captures)) + } + + var sourceCapture string + + for capture, tables := range cluster.captures { + if len(tables) == 0 { + continue + } + sourceCapture = capture + break + } + + var targetCapture string + + for candidateCapture := range cluster.captures { + if candidateCapture != sourceCapture { + targetCapture = candidateCapture + } + } + + if targetCapture == "" { + log.Fatal("no target, unexpected") + } + + // move all tables to another capture + for _, table := range cluster.captures[sourceCapture] { + err = moveTable(ctx, cluster.ownerAddr, table.Changefeed, targetCapture, table.ID) + if err != nil { + log.Warn("failed to move table", zap.Error(err)) + continue + } + + log.Info("moved table successful", zap.Int64("tableID", table.ID)) + } + + log.Info("all tables are moved", zap.String("sourceCapture", sourceCapture), zap.String("targetCapture", targetCapture)) + + for counter := 0; counter < 30; counter++ { + err := retry.Run(100*time.Millisecond, 5, func() error { + return cluster.refreshInfo(ctx) + }) + + if err != nil { + log.Warn("error refreshing cluster info", zap.Error(err)) + } + + tables, ok := cluster.captures[sourceCapture] + if !ok { + log.Warn("source capture is gone", zap.String("sourceCapture", sourceCapture)) + break + } + + if len(tables) == 0 { + log.Info("source capture is now empty", zap.String("sourceCapture", sourceCapture)) + break + } - if err != nil { - log.Warn("error refreshing cluster info", zap.Error(err)) - } - - log.Info("task status", zap.Reflect("status", cluster.captures)) - - if len(cluster.captures) <= 1 { - log.Warn("no enough captures", zap.Reflect("captures", cluster.captures)) - continue - } - - var sourceCapture string - - for capture, tables := range cluster.captures { - if len(tables) == 0 { - continue - } - sourceCapture = capture - break - } - - var targetCapture string - - for candidateCapture := range cluster.captures { - if candidateCapture != sourceCapture { - targetCapture = candidateCapture - } - } - - if targetCapture == "" { - log.Fatal("no target, unexpected") - } - - // move all tables to another capture - for _, table := range cluster.captures[sourceCapture] { - err = moveTable(ctx, cluster.ownerAddr, table.Changefeed, targetCapture, table.ID) - if err != nil { - log.Warn("failed to move table", zap.Error(err)) - continue - } - - log.Info("moved table successful", zap.Int64("tableID", table.ID)) - } - - log.Info("all tables are moved", zap.String("sourceCapture", sourceCapture), zap.String("targetCapture", targetCapture)) - - for counter := 0; counter < 30; counter++ { - err := retry.Run(100*time.Millisecond, 5, func() error { - return cluster.refreshInfo(ctx) - }) - - if err != nil { - log.Warn("error refreshing cluster info", zap.Error(err)) - } - - tables, ok := cluster.captures[sourceCapture] - if !ok { - log.Warn("source capture is gone", zap.String("sourceCapture", sourceCapture)) - break - } - - if len(tables) == 0 { - log.Info("source capture is now empty", zap.String("sourceCapture", sourceCapture)) - break - } - - if counter != 30 { - log.Debug("source capture is not empty, will try again", zap.String("sourceCapture", sourceCapture)) - time.Sleep(time.Second * 10) - } - } + if counter != 30 { + log.Debug("source capture is not empty, will try again", zap.String("sourceCapture", sourceCapture)) + time.Sleep(time.Second * 10) } } } diff --git a/tests/move_table/run.sh b/tests/move_table/run.sh index 9d2c99e5a62..f53cfccf7cd 100644 --- a/tests/move_table/run.sh +++ b/tests/move_table/run.sh @@ -42,7 +42,7 @@ function run() { check_table_exists "move_table.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml - sleep 60 + sleep 10 cd $CUR GO111MODULE=on go run main.go 2>&1 | tee $WORK_DIR/tester.log & cd $WORK_DIR From 237f93cfa68e07a360c9efca9fedfdc4fe253781 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 23 Nov 2020 17:27:02 +0800 Subject: [PATCH 24/27] modify integration test script --- tests/move_table/run.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/move_table/run.sh b/tests/move_table/run.sh index f53cfccf7cd..f1e2cb1cb6f 100644 --- a/tests/move_table/run.sh +++ b/tests/move_table/run.sh @@ -39,14 +39,14 @@ function run() { # directly, there maybe a lot of diff data at first because of the incremental scan run_sql "CREATE table move_table.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} check_table_exists "move_table.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - check_table_exists "move_table.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 - check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml - sleep 10 + sleep 5 cd $CUR GO111MODULE=on go run main.go 2>&1 | tee $WORK_DIR/tester.log & cd $WORK_DIR + check_table_exists "move_table.check1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 90 + check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml run_sql "truncate table move_table.USERTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT} check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml run_sql "CREATE table move_table.check2(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} From 017c3195b6eef39246a3b65c6f82478d4db52cc8 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 23 Nov 2020 17:35:08 +0800 Subject: [PATCH 25/27] add retry to table mover --- tests/move_table/main.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/tests/move_table/main.go b/tests/move_table/main.go index 8e3ddbfe898..9e26591fc27 100644 --- a/tests/move_table/main.go +++ b/tests/move_table/main.go @@ -55,19 +55,19 @@ func main() { } err = retry.Run(100*time.Millisecond, 20, func() error { - return cluster.refreshInfo(ctx) - }) - - if err != nil { - log.Warn("error refreshing cluster info", zap.Error(err)) - } - - log.Info("task status", zap.Reflect("status", cluster.captures)) + err := cluster.refreshInfo(ctx) + if err != nil { + log.Warn("error refreshing cluster info", zap.Error(err)) + } - if len(cluster.captures) <= 1 { - log.Fatal("no enough captures", zap.Reflect("captures", cluster.captures)) - } + log.Info("task status", zap.Reflect("status", cluster.captures)) + if len(cluster.captures) <= 1 { + return errors.New("too few captures") + } + return nil + }) + var sourceCapture string for capture, tables := range cluster.captures { From ec02d7bd5b7acf848087f24cd345c269f541eaf4 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 23 Nov 2020 17:39:05 +0800 Subject: [PATCH 26/27] fix table mover --- tests/move_table/conf/workload | 2 +- tests/move_table/main.go | 6 +++++- tests/move_table/run.sh | 1 - 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/move_table/conf/workload b/tests/move_table/conf/workload index c249663fe5b..5b9ca3189fc 100644 --- a/tests/move_table/conf/workload +++ b/tests/move_table/conf/workload @@ -1,5 +1,5 @@ threadcount=10 -recordcount=50000 +recordcount=60000 operationcount=0 workload=core diff --git a/tests/move_table/main.go b/tests/move_table/main.go index 9e26591fc27..8139359e0f6 100644 --- a/tests/move_table/main.go +++ b/tests/move_table/main.go @@ -67,7 +67,11 @@ func main() { } return nil }) - + + if err != nil { + log.Fatal("Fail to get captures", zap.Error(err)) + } + var sourceCapture string for capture, tables := range cluster.captures { diff --git a/tests/move_table/run.sh b/tests/move_table/run.sh index f1e2cb1cb6f..c363916c9d0 100644 --- a/tests/move_table/run.sh +++ b/tests/move_table/run.sh @@ -40,7 +40,6 @@ function run() { run_sql "CREATE table move_table.check1(id int primary key);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} check_table_exists "move_table.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} - sleep 5 cd $CUR GO111MODULE=on go run main.go 2>&1 | tee $WORK_DIR/tester.log & cd $WORK_DIR From c378df558f47084d0e33d35b376dc14cb4640859 Mon Sep 17 00:00:00 2001 From: Zixiong Liu Date: Mon, 23 Nov 2020 17:53:32 +0800 Subject: [PATCH 27/27] fix table mover --- tests/move_table/main.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/tests/move_table/main.go b/tests/move_table/main.go index 8139359e0f6..4f255c891cc 100644 --- a/tests/move_table/main.go +++ b/tests/move_table/main.go @@ -217,6 +217,15 @@ func (c *cluster) refreshInfo(ctx context.Context) error { break } + c.captures = make(map[string][]*tableInfo) + _, captures, err := c.cdcEtcdCli.GetCaptures(ctx) + if err != nil { + return errors.Trace(err) + } + for _, capture := range captures { + c.captures[capture.ID] = make([]*tableInfo, 0) + } + allTasks, err := c.cdcEtcdCli.GetAllTaskStatus(ctx, changefeed) if err != nil { return errors.Trace(err) @@ -224,9 +233,11 @@ func (c *cluster) refreshInfo(ctx context.Context) error { log.Debug("retrieved all tasks", zap.Reflect("tasks", allTasks)) - c.captures = make(map[string][]*tableInfo) for capture, taskInfo := range allTasks { - c.captures[capture] = make([]*tableInfo, 0, len(taskInfo.Tables)) + if _, ok := c.captures[capture]; !ok { + c.captures[capture] = make([]*tableInfo, 0, len(taskInfo.Tables)) + } + for tableID := range taskInfo.Tables { c.captures[capture] = append(c.captures[capture], &tableInfo{ ID: tableID,