Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle create/drop sequence job type #950

Merged
merged 2 commits into from
Apr 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions drainer/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package drainer

import (
"bytes"
"crypto/tls"
"fmt"
"net/http"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/pingcap/tidb/store"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tipb/go-binlog"
"go.uber.org/zap"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -260,9 +262,25 @@ func (c *Collector) reportErr(ctx context.Context, err error) {
}
}

// ref https://github.com/pingcap/tidb/pull/14954
// TiDB will write a fake ddl binlog like: select setval(`seq`.`sequence_name`, 1000)
// when the value of sequence is changed for replicate the value of the sequence.
// we CAN NOT query the job from the tikv according the job id.
// just skip this kind of binlog now.
func skipQueryJob(binlog *binlog.Binlog) bool {
q := binlog.GetDdlQuery()
return bytes.HasPrefix(q, []byte("select setval"))
}

func (c *Collector) syncBinlog(item *binlogItem) error {
binlog := item.binlog
// DO NOT replicate the value of sequence now.
if skipQueryJob(binlog) {
return nil
}

if binlog.DdlJobId > 0 {
log.Info("start query job", zap.Int64("id", binlog.DdlJobId), zap.Stringer("binlog", binlog))
msgPrefix := fmt.Sprintf("get ddl job by id %d error", binlog.DdlJobId)
var job *model.Job
for {
Expand Down
5 changes: 2 additions & 3 deletions drainer/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string,
schemaName = schema.Name.O
tableName = table.Name.O

case model.ActionCreateTable, model.ActionCreateView, model.ActionRecoverTable:
case model.ActionCreateTable, model.ActionCreateView, model.ActionCreateSequence, model.ActionRecoverTable:
table := job.BinlogInfo.TableInfo
if table == nil {
return "", "", "", errors.NotFoundf("table %d", job.TableID)
Expand All @@ -380,7 +380,7 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string,
schemaName = schema.Name.O
tableName = table.Name.O

case model.ActionDropTable, model.ActionDropView:
case model.ActionDropTable, model.ActionDropView, model.ActionDropSequence:
schema, ok := s.SchemaByID(job.SchemaID)
if !ok {
return "", "", "", errors.NotFoundf("schema %d", job.SchemaID)
Expand Down Expand Up @@ -422,7 +422,6 @@ func (s *Schema) handleDDL(job *model.Job) (schemaName string, tableName string,
schemaName = schema.Name.O
tableName = table.Name.O
s.truncateTableID[job.TableID] = struct{}{}

default:
binlogInfo := job.BinlogInfo
if binlogInfo == nil {
Expand Down
17 changes: 17 additions & 0 deletions tests/sequence/drainer.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
data-dir = '/tmp/tidb_binlog_test/data.drainer'

[syncer]
txn-batch = 1
worker-count = 1
safe-mode = false
db-type = 'mysql'
replicate-do-db = ['seq']

[syncer.to]
host = '127.0.0.1'
user = 'root'
password = ''
port = 3306

[syncer.to.checkpoint]
schema = "seq_checkpoint"
29 changes: 29 additions & 0 deletions tests/sequence/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/bin/sh

set -e

cd "$(dirname "$0")"

run_drainer &

sleep 3

run_sql 'CREATE DATABASE seq;'
run_sql 'CREATE SEQUENCE seq.sequence_name;'
run_sql 'CREATE TABLE seq.table_name (id INT DEFAULT NEXT VALUE FOR seq.sequence_name, val int);'

run_sql 'INSERT INTO seq.table_name(val) values(10);'

sleep 3

down_run_sql 'SELECT count(*), sum(id), sum(val) FROM seq.table_name;'
check_contains 'count(*): 1'
check_contains 'sum(id): 1'
check_contains 'sum(val): 10'


run_sql 'DROP TABLE seq.table_name;'
run_sql 'DROP SEQUENCE seq.sequence_name;'


killall drainer