Skip to content

Commit

Permalink
Support to check whether the table can be duplicated by CDC (pingcap#368
Browse files Browse the repository at this point in the history
)
  • Loading branch information
leoppro authored Mar 25, 2020
1 parent 7bf0075 commit 2ddbbab
Show file tree
Hide file tree
Showing 10 changed files with 367 additions and 250 deletions.
23 changes: 0 additions & 23 deletions cdc/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package cdc

import (
"context"
"fmt"
"sync"
"time"

Expand All @@ -25,10 +24,6 @@ import (
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/roles"
"github.com/pingcap/ticdc/pkg/flags"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store"
"github.com/pingcap/tidb/store/tikv"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
"go.uber.org/zap"
Expand Down Expand Up @@ -195,21 +190,3 @@ func (c *Capture) Close(ctx context.Context) error {
func (c *Capture) register(ctx context.Context) error {
return errors.Trace(c.etcdClient.PutCaptureInfo(ctx, c.info, c.session.Lease()))
}

func createTiStore(urls string) (tidbkv.Storage, error) {
urlv, err := flags.NewURLsValue(urls)
if err != nil {
return nil, errors.Trace(err)
}

// Ignore error if it is already registered.
_ = store.Register("tikv", tikv.Driver{})

tiPath := fmt.Sprintf("tikv://%s?disableGC=true", urlv.HostString())
tiStore, err := store.New(tiPath)
if err != nil {
return nil, errors.Trace(err)
}

return tiStore, nil
}
36 changes: 31 additions & 5 deletions cdc/entry/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ type Storage struct {
schemas map[int64]*timodel.DBInfo
tables map[int64]*TableInfo

truncateTableID map[int64]struct{}
truncateTableID map[int64]struct{}
ineligibleTableID map[int64]struct{}

schemaMetaVersion int64
lastHandledTs uint64
Expand Down Expand Up @@ -213,6 +214,11 @@ func (ti *TableInfo) IsColumnUnique(colID int64) bool {
return exist
}

// ExistTableUniqueColumn returns whether the table has the unique column
func (ti *TableInfo) ExistTableUniqueColumn() bool {
return len(ti.UniqueColumns) != 0
}

// IsIndexUnique returns whether the index is unique
func (ti *TableInfo) IsIndexUnique(indexInfo *timodel.IndexInfo) bool {
if indexInfo.Primary {
Expand Down Expand Up @@ -248,6 +254,7 @@ func NewSingleStorage() *Storage {
s := &Storage{
version2SchemaTable: make(map[int64]TableName),
truncateTableID: make(map[int64]struct{}),
ineligibleTableID: make(map[int64]struct{}),
}

s.tableIDToName = make(map[int64]TableName)
Expand Down Expand Up @@ -382,6 +389,7 @@ func (s *Storage) DropTable(id int64) (string, error) {
tableName := s.tableIDToName[id]
delete(s.tableIDToName, id)
delete(s.tableNameToID, tableName)
delete(s.ineligibleTableID, id)

log.Debug("drop table success", zap.String("name", table.Name.O), zap.Int64("id", id))
return table.Name.O, nil
Expand All @@ -395,7 +403,12 @@ func (s *Storage) CreateTable(schema *timodel.DBInfo, table *timodel.TableInfo)
}

schema.Tables = append(schema.Tables, table)
s.tables[table.ID] = WrapTableInfo(table)
tbl := WrapTableInfo(table)
s.tables[table.ID] = tbl
if !tbl.ExistTableUniqueColumn() {
log.Warn("this table is not eligible to replicate", zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID))
s.ineligibleTableID[table.ID] = struct{}{}
}
s.tableIDToName[table.ID] = TableName{Schema: schema.Name.O, Table: table.Name.O}
s.tableNameToID[s.tableIDToName[table.ID]] = table.ID

Expand All @@ -409,9 +422,12 @@ func (s *Storage) ReplaceTable(table *timodel.TableInfo) error {
if !ok {
return errors.NotFoundf("table %s(%d)", table.Name, table.ID)
}

s.tables[table.ID] = WrapTableInfo(table)

tbl := WrapTableInfo(table)
s.tables[table.ID] = tbl
if !tbl.ExistTableUniqueColumn() {
log.Warn("this table is not eligible to replicate", zap.String("tableName", table.Name.O), zap.Int64("tableID", table.ID))
s.ineligibleTableID[table.ID] = struct{}{}
}
return nil
}

Expand Down Expand Up @@ -650,6 +666,7 @@ func (s *Storage) Clone() *Storage {
tables: make(map[int64]*TableInfo),

truncateTableID: make(map[int64]struct{}),
ineligibleTableID: make(map[int64]struct{}),
version2SchemaTable: make(map[int64]TableName),
}
for k, v := range s.tableIDToName {
Expand All @@ -670,6 +687,9 @@ func (s *Storage) Clone() *Storage {
for k, v := range s.truncateTableID {
n.truncateTableID[k] = v
}
for k, v := range s.ineligibleTableID {
n.ineligibleTableID[k] = v
}
for k, v := range s.version2SchemaTable {
n.version2SchemaTable[k] = v
}
Expand All @@ -688,6 +708,12 @@ func (s *Storage) IsTruncateTableID(id int64) bool {
return ok
}

// IsIneligibleTableID returns true if the table is ineligible
func (s *Storage) IsIneligibleTableID(id int64) bool {
_, ok := s.ineligibleTableID[id]
return ok
}

// SkipJob skip the job should not be executed
// TiDB write DDL Binlog for every DDL Job, we must ignore jobs that are cancelled or rollback
// For older version TiDB, it write DDL Binlog in the txn that the state of job is changed to *synced*
Expand Down
Loading

0 comments on commit 2ddbbab

Please sign in to comment.