Skip to content

Commit

Permalink
cdc/sink: mysql sink manage checkpoint per table (pingcap#3645)
Browse files Browse the repository at this point in the history
# Conflicts:
#	cdc/sink/common/common_test.go
#	cdc/sink/mysql_params_test.go
#	cdc/sink/mysql_test.go
#	cdc/sink/mysql_worker.go
#	cdc/sink/mysql_worker_test.go
#	tests/integration_tests/sink_hang/run.sh
  • Loading branch information
sdojjy committed Dec 27, 2021
1 parent 4cfeb72 commit dc74082
Show file tree
Hide file tree
Showing 8 changed files with 726 additions and 753 deletions.
2 changes: 1 addition & 1 deletion cdc/sink/buffer_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"testing"
"time"

"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/tiflow/cdc/model"
"github.com/stretchr/testify/require"
)

Expand Down
33 changes: 12 additions & 21 deletions cdc/sink/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package common
import (
"sort"
"sync"
"sync/atomic"

"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -55,7 +54,6 @@ func (t *txnsWithTheSameCommitTs) Append(row *model.RowChangedEvent) {
type UnresolvedTxnCache struct {
unresolvedTxnsMu sync.Mutex
unresolvedTxns map[model.TableID][]*txnsWithTheSameCommitTs
checkpointTs uint64
}

// NewUnresolvedTxnCache returns a new UnresolvedTxnCache
Expand Down Expand Up @@ -103,32 +101,27 @@ func (c *UnresolvedTxnCache) Append(filter *filter.Filter, rows ...*model.RowCha

// Resolved returns resolved txns according to resolvedTs
// The returned map contains many txns grouped by tableID. for each table, the each commitTs of txn in txns slice is strictly increasing
func (c *UnresolvedTxnCache) Resolved(resolvedTs uint64) map[model.TableID][]*model.SingleTableTxn {
if resolvedTs <= atomic.LoadUint64(&c.checkpointTs) {
return nil
}

func (c *UnresolvedTxnCache) Resolved(resolvedTsMap *sync.Map) (map[model.TableID]uint64, map[model.TableID][]*model.SingleTableTxn) {
c.unresolvedTxnsMu.Lock()
defer c.unresolvedTxnsMu.Unlock()
if len(c.unresolvedTxns) == 0 {
return nil
return nil, nil
}

_, resolvedTxnsMap := splitResolvedTxn(resolvedTs, c.unresolvedTxns)
return resolvedTxnsMap
}

// UpdateCheckpoint updates the checkpoint ts
func (c *UnresolvedTxnCache) UpdateCheckpoint(checkpointTs uint64) {
atomic.StoreUint64(&c.checkpointTs, checkpointTs)
return splitResolvedTxn(resolvedTsMap, c.unresolvedTxns)
}

func splitResolvedTxn(
resolvedTs uint64, unresolvedTxns map[model.TableID][]*txnsWithTheSameCommitTs,
) (minTs uint64, resolvedRowsMap map[model.TableID][]*model.SingleTableTxn) {
resolvedTsMap *sync.Map, unresolvedTxns map[model.TableID][]*txnsWithTheSameCommitTs,
) (flushedResolvedTsMap map[model.TableID]uint64, resolvedRowsMap map[model.TableID][]*model.SingleTableTxn) {
resolvedRowsMap = make(map[model.TableID][]*model.SingleTableTxn, len(unresolvedTxns))
minTs = resolvedTs
flushedResolvedTsMap = make(map[model.TableID]uint64, len(unresolvedTxns))
for tableID, txns := range unresolvedTxns {
v, ok := resolvedTsMap.Load(tableID)
if !ok {
continue
}
resolvedTs := v.(uint64)
i := sort.Search(len(txns), func(i int) bool {
return txns[i].commitTs > resolvedTs
})
Expand All @@ -154,9 +147,7 @@ func splitResolvedTxn(
}
}
resolvedRowsMap[tableID] = resolvedTxns
if len(resolvedTxnsWithTheSameCommitTs) > 0 && resolvedTxnsWithTheSameCommitTs[0].commitTs < minTs {
minTs = resolvedTxnsWithTheSameCommitTs[0].commitTs
}
flushedResolvedTsMap[tableID] = resolvedTs
}
return
}
65 changes: 42 additions & 23 deletions cdc/sink/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,22 @@ package common

import (
"sort"
"sync"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/pingcap/check"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/stretchr/testify/require"
)

type SinkCommonSuite struct{}
func TestSplitResolvedTxn(test *testing.T) {
defer testleak.AfterTestT(test)()

func Test(t *testing.T) { check.TestingT(t) }

var _ = check.Suite(&SinkCommonSuite{})

func (s SinkCommonSuite) TestSplitResolvedTxn(c *check.C) {
defer testleak.AfterTest(c)()
testCases := [][]struct {
input []*model.RowChangedEvent
resolvedTs model.Ts
expected map[model.TableID][]*model.SingleTableTxn
input []*model.RowChangedEvent
resolvedTsMap map[model.TableID]uint64
expected map[model.TableID][]*model.SingleTableTxn
}{{{ // Testing basic transaction collocation, no txns with the same committs
input: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}},
Expand All @@ -45,7 +41,10 @@ func (s SinkCommonSuite) TestSplitResolvedTxn(c *check.C) {
{StartTs: 1, CommitTs: 11, Table: &model.TableName{TableID: 1}},
{StartTs: 1, CommitTs: 12, Table: &model.TableName{TableID: 2}},
},
resolvedTs: 6,
resolvedTsMap: map[model.TableID]uint64{
1: uint64(6),
2: uint64(6),
},
expected: map[model.TableID][]*model.SingleTableTxn{
1: {{Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 5, Rows: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}},
Expand All @@ -59,7 +58,11 @@ func (s SinkCommonSuite) TestSplitResolvedTxn(c *check.C) {
input: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 3}},
},
resolvedTs: 13,
resolvedTsMap: map[model.TableID]uint64{
1: uint64(13),
2: uint64(13),
3: uint64(13),
},
expected: map[model.TableID][]*model.SingleTableTxn{
1: {{Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 8, Rows: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}},
Expand All @@ -76,17 +79,24 @@ func (s SinkCommonSuite) TestSplitResolvedTxn(c *check.C) {
}}},
},
}}, {{ // Testing the short circuit path
input: []*model.RowChangedEvent{},
resolvedTs: 6,
expected: nil,
input: []*model.RowChangedEvent{},
resolvedTsMap: map[model.TableID]uint64{
1: uint64(13),
2: uint64(13),
3: uint64(13),
},
expected: nil,
}, {
input: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 11, Table: &model.TableName{TableID: 1}},
{StartTs: 1, CommitTs: 12, Table: &model.TableName{TableID: 1}},
{StartTs: 1, CommitTs: 13, Table: &model.TableName{TableID: 2}},
},
resolvedTs: 6,
expected: map[model.TableID][]*model.SingleTableTxn{},
resolvedTsMap: map[model.TableID]uint64{
1: uint64(6),
2: uint64(6),
},
expected: map[model.TableID][]*model.SingleTableTxn{},
}}, {{ // Testing the txns with the same commitTs
input: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}},
Expand All @@ -99,7 +109,10 @@ func (s SinkCommonSuite) TestSplitResolvedTxn(c *check.C) {
{StartTs: 1, CommitTs: 6, Table: &model.TableName{TableID: 2}},
{StartTs: 1, CommitTs: 7, Table: &model.TableName{TableID: 2}},
},
resolvedTs: 6,
resolvedTsMap: map[model.TableID]uint64{
1: uint64(6),
2: uint64(6),
},
expected: map[model.TableID][]*model.SingleTableTxn{
1: {{Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 5, Rows: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 5, Table: &model.TableName{TableID: 1}},
Expand All @@ -119,7 +132,10 @@ func (s SinkCommonSuite) TestSplitResolvedTxn(c *check.C) {
{StartTs: 2, CommitTs: 8, Table: &model.TableName{TableID: 1}},
{StartTs: 1, CommitTs: 9, Table: &model.TableName{TableID: 1}},
},
resolvedTs: 13,
resolvedTsMap: map[model.TableID]uint64{
1: uint64(13),
2: uint64(13),
},
expected: map[model.TableID][]*model.SingleTableTxn{
1: {{Table: &model.TableName{TableID: 1}, StartTs: 1, CommitTs: 8, Rows: []*model.RowChangedEvent{
{StartTs: 1, CommitTs: 8, Table: &model.TableName{TableID: 1}},
Expand All @@ -144,7 +160,11 @@ func (s SinkCommonSuite) TestSplitResolvedTxn(c *check.C) {
cache := NewUnresolvedTxnCache()
for _, t := range tc {
cache.Append(nil, t.input...)
resolved := cache.Resolved(t.resolvedTs)
resolvedTsMap := sync.Map{}
for tableID, ts := range t.resolvedTsMap {
resolvedTsMap.Store(tableID, ts)
}
_, resolved := cache.Resolved(&resolvedTsMap)
for tableID, txns := range resolved {
sort.Slice(txns, func(i, j int) bool {
if txns[i].CommitTs != txns[j].CommitTs {
Expand All @@ -154,8 +174,7 @@ func (s SinkCommonSuite) TestSplitResolvedTxn(c *check.C) {
})
resolved[tableID] = txns
}
c.Assert(resolved, check.DeepEquals, t.expected,
check.Commentf("%s", cmp.Diff(resolved, t.expected)))
require.Equal(test, t.expected, resolved, cmp.Diff(resolved, t.expected))
}
}
}
80 changes: 35 additions & 45 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

dmysql "github.com/go-sql-driver/mysql"
Expand Down Expand Up @@ -92,10 +91,10 @@ type mysqlSink struct {
filter *tifilter.Filter
cyclic *cyclic.Cyclic

txnCache *common.UnresolvedTxnCache
workers []*mysqlSinkWorker
resolvedTs uint64
maxResolvedTs uint64
txnCache *common.UnresolvedTxnCache
workers []*mysqlSinkWorker
tableCheckpointTs sync.Map
tableMaxResolvedTs sync.Map

execWaitNotifier *notify.Notifier
resolvedNotifier *notify.Notifier
Expand Down Expand Up @@ -131,12 +130,10 @@ func (s *mysqlSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.Row
// FlushRowChangedEvents will flushes all received events, we don't allow mysql
// sink to receive events before resolving
func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
if atomic.LoadUint64(&s.maxResolvedTs) < resolvedTs {
atomic.StoreUint64(&s.maxResolvedTs, resolvedTs)
v, ok := s.tableMaxResolvedTs.Load(tableID)
if !ok || v.(uint64) < resolvedTs {
s.tableMaxResolvedTs.Store(tableID, resolvedTs)
}
// resolvedTs can be fallen back, such as a new table is added into this sink
// with a smaller start-ts
atomic.StoreUint64(&s.resolvedTs, resolvedTs)
s.resolvedNotifier.Notify()

// check and throw error
Expand All @@ -146,13 +143,7 @@ func (s *mysqlSink) FlushRowChangedEvents(ctx context.Context, tableID model.Tab
default:
}

checkpointTs := resolvedTs
for _, worker := range s.workers {
workerCheckpointTs := atomic.LoadUint64(&worker.checkpointTs)
if workerCheckpointTs < checkpointTs {
checkpointTs = workerCheckpointTs
}
}
checkpointTs := s.getTableCheckpointTs(tableID)
s.statistics.PrintStatus(ctx)
return checkpointTs, nil
}
Expand All @@ -169,13 +160,12 @@ func (s *mysqlSink) flushRowChangedEvents(ctx context.Context, receiver *notify.
return
case <-receiver.C:
}
resolvedTs := atomic.LoadUint64(&s.resolvedTs)
resolvedTxnsMap := s.txnCache.Resolved(resolvedTs)
flushedResolvedTsMap, resolvedTxnsMap := s.txnCache.Resolved(&s.tableMaxResolvedTs)
if len(resolvedTxnsMap) == 0 {
for _, worker := range s.workers {
atomic.StoreUint64(&worker.checkpointTs, resolvedTs)
}
s.txnCache.UpdateCheckpoint(resolvedTs)
s.tableMaxResolvedTs.Range(func(key, value interface{}) bool {
s.tableCheckpointTs.Store(key, value)
return true
})
continue
}

Expand All @@ -187,10 +177,9 @@ func (s *mysqlSink) flushRowChangedEvents(ctx context.Context, receiver *notify.
}

s.dispatchAndExecTxns(ctx, resolvedTxnsMap)
for _, worker := range s.workers {
atomic.StoreUint64(&worker.checkpointTs, resolvedTs)
for tableID, resolvedTs := range flushedResolvedTsMap {
s.tableCheckpointTs.Store(tableID, resolvedTs)
}
s.txnCache.UpdateCheckpoint(resolvedTs)
}
}

Expand Down Expand Up @@ -743,7 +732,6 @@ type mysqlSinkWorker struct {
execDMLs func(context.Context, []*model.RowChangedEvent, uint64, int) error
metricBucketSize prometheus.Counter
receiver *notify.Receiver
checkpointTs uint64
closedCh chan struct{}
}

Expand Down Expand Up @@ -786,10 +774,9 @@ func (w *mysqlSinkWorker) appendFinishTxn(wg *sync.WaitGroup) {

func (w *mysqlSinkWorker) run(ctx context.Context) (err error) {
var (
toExecRows []*model.RowChangedEvent
replicaID uint64
txnNum int
lastCommitTs uint64
toExecRows []*model.RowChangedEvent
replicaID uint64
txnNum int
)

// mark FinishWg before worker exits, all data txns can be omitted.
Expand Down Expand Up @@ -827,7 +814,6 @@ func (w *mysqlSinkWorker) run(ctx context.Context) (err error) {
txnNum = 0
return err
}
atomic.StoreUint64(&w.checkpointTs, lastCommitTs)
toExecRows = toExecRows[:0]
w.metricBucketSize.Add(float64(txnNum))
txnNum = 0
Expand Down Expand Up @@ -857,7 +843,6 @@ func (w *mysqlSinkWorker) run(ctx context.Context) (err error) {
}
replicaID = txn.ReplicaID
toExecRows = append(toExecRows, txn.Rows...)
lastCommitTs = txn.CommitTs
txnNum++
case <-w.receiver.C:
if err := flushRows(); err != nil {
Expand Down Expand Up @@ -903,12 +888,20 @@ func (s *mysqlSink) Barrier(ctx context.Context, tableID model.TableID) error {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-ticker.C:
maxResolvedTs, ok := s.tableMaxResolvedTs.Load(tableID)
log.Warn("Barrier doesn't return in time, may be stuck",
zap.Uint64("resolved-ts", atomic.LoadUint64(&s.maxResolvedTs)),
zap.Uint64("checkpoint-ts", s.checkpointTs()))
zap.Int64("tableID", tableID),
zap.Bool("has resolvedTs", ok),
zap.Any("resolvedTs", maxResolvedTs),
zap.Uint64("checkpointTs", s.getTableCheckpointTs(tableID)))
default:
maxResolvedTs := atomic.LoadUint64(&s.maxResolvedTs)
if s.checkpointTs() >= maxResolvedTs {
v, ok := s.tableMaxResolvedTs.Load(tableID)
if !ok {
log.Info("No table resolvedTs is found", zap.Int64("table-id", tableID))
return nil
}
maxResolvedTs := v.(uint64)
if s.getTableCheckpointTs(tableID) >= maxResolvedTs {
return nil
}
checkpointTs, err := s.FlushRowChangedEvents(ctx, tableID, maxResolvedTs)
Expand All @@ -924,15 +917,12 @@ func (s *mysqlSink) Barrier(ctx context.Context, tableID model.TableID) error {
}
}

func (s *mysqlSink) checkpointTs() uint64 {
checkpointTs := atomic.LoadUint64(&s.resolvedTs)
for _, worker := range s.workers {
workerCheckpointTs := atomic.LoadUint64(&worker.checkpointTs)
if workerCheckpointTs < checkpointTs {
checkpointTs = workerCheckpointTs
}
func (s *mysqlSink) getTableCheckpointTs(tableID model.TableID) uint64 {
v, ok := s.tableCheckpointTs.Load(tableID)
if ok {
return v.(uint64)
}
return checkpointTs
return uint64(0)
}

func logDMLTxnErr(err error) error {
Expand Down
Loading

0 comments on commit dc74082

Please sign in to comment.