diff --git a/drainer/collector.go b/drainer/collector.go index 786efcd4a..b84ccd13c 100644 --- a/drainer/collector.go +++ b/drainer/collector.go @@ -14,6 +14,7 @@ package drainer import ( + "bytes" "crypto/tls" "fmt" "net/http" @@ -34,6 +35,7 @@ import ( "github.com/pingcap/tidb/store" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" + "github.com/pingcap/tipb/go-binlog" "go.uber.org/zap" "golang.org/x/net/context" ) @@ -260,9 +262,25 @@ func (c *Collector) reportErr(ctx context.Context, err error) { } } +// ref https://github.com/pingcap/tidb/pull/14954 +// TiDB will write a fake ddl binlog like: select setval(`seq`.`sequence_name`, 1000) +// when the value of sequence is changed for replicate the value of the sequence. +// we CAN NOT query the job from the tikv according the job id. +// just skip this kind of binlog now. +func skipQueryJob(binlog *binlog.Binlog) bool { + q := binlog.GetDdlQuery() + return bytes.HasPrefix(q, []byte("select setval")) +} + func (c *Collector) syncBinlog(item *binlogItem) error { binlog := item.binlog + // DO NOT replicate the value of sequence now. + if skipQueryJob(binlog) { + return nil + } + if binlog.DdlJobId > 0 { + log.Info("start query job", zap.Int64("id", binlog.DdlJobId), zap.Stringer("binlog", binlog)) msgPrefix := fmt.Sprintf("get ddl job by id %d error", binlog.DdlJobId) var job *model.Job for { diff --git a/drainer/schema.go b/drainer/schema.go index eacb31f13..9f1be25ac 100644 --- a/drainer/schema.go +++ b/drainer/schema.go @@ -359,7 +359,7 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string, schemaName = schema.Name.O tableName = table.Name.O - case model.ActionCreateTable, model.ActionCreateView, model.ActionRecoverTable: + case model.ActionCreateTable, model.ActionCreateView, model.ActionCreateSequence, model.ActionRecoverTable: table := job.BinlogInfo.TableInfo if table == nil { return "", "", "", errors.NotFoundf("table %d", job.TableID) @@ -380,7 +380,7 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string, schemaName = schema.Name.O tableName = table.Name.O - case model.ActionDropTable, model.ActionDropView: + case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence: schema, ok := s.SchemaByID(job.SchemaID) if !ok { return "", "", "", errors.NotFoundf("schema %d", job.SchemaID) @@ -422,7 +422,6 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string, schemaName = schema.Name.O tableName = table.Name.O s.truncateTableID[job.TableID] = struct{}{} - default: binlogInfo := job.BinlogInfo if binlogInfo == nil { diff --git a/tests/sequence/drainer.toml b/tests/sequence/drainer.toml new file mode 100644 index 000000000..0f13e2748 --- /dev/null +++ b/tests/sequence/drainer.toml @@ -0,0 +1,17 @@ +data-dir = '/tmp/tidb_binlog_test/data.drainer' + +[syncer] +txn-batch = 1 +worker-count = 1 +safe-mode = false +db-type = 'mysql' +replicate-do-db = ['seq'] + +[syncer.to] +host = '127.0.0.1' +user = 'root' +password = '' +port = 3306 + +[syncer.to.checkpoint] +schema = "seq_checkpoint" diff --git a/tests/sequence/run.sh b/tests/sequence/run.sh new file mode 100755 index 000000000..41985fbf2 --- /dev/null +++ b/tests/sequence/run.sh @@ -0,0 +1,29 @@ +#!/bin/sh + +set -e + +cd "$(dirname "$0")" + +run_drainer & + +sleep 3 + +run_sql 'CREATE DATABASE seq;' +run_sql 'CREATE SEQUENCE seq.sequence_name;' +run_sql 'CREATE TABLE seq.table_name (id INT DEFAULT NEXT VALUE FOR seq.sequence_name, val int);' + +run_sql 'INSERT INTO seq.table_name(val) values(10);' + +sleep 3 + +down_run_sql 'SELECT count(*), sum(id), sum(val) FROM seq.table_name;' +check_contains 'count(*): 1' +check_contains 'sum(id): 1' +check_contains 'sum(val): 10' + + +run_sql 'DROP TABLE seq.table_name;' +run_sql 'DROP SEQUENCE seq.sequence_name;' + + +killall drainer