Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(ticdc): refine sink interface and add init method (#5196) #5225

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (c *mockFlowController) GetConsumption() uint64 {
return 0
}

func (s *mockSink) Init(tableID model.TableID) error {
return nil
}

func (s *mockSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
for _, row := range rows {
s.received = append(s.received, struct {
Expand Down
5 changes: 4 additions & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,10 @@ func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID mode
tableNameStr = tableName.QuoteString()
}

sink := p.sinkManager.CreateTableSink(tableID, replicaInfo.StartTs)
sink, err := p.sinkManager.CreateTableSink(tableID, replicaInfo.StartTs)
if err != nil {
return nil, errors.Trace(err)
}
table := tablepipeline.NewTablePipeline(
ctx,
p.mounter,
Expand Down
5 changes: 5 additions & 0 deletions cdc/sink/black_hole.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ type blackHoleSink struct {
lastAccumulated uint64
}

// Init table sink resources
func (b *blackHoleSink) Init(tableID model.TableID) error {
return nil
}

func (b *blackHoleSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
for _, row := range rows {
log.Debug("BlockHoleSink: EmitRowChangedEvents", zap.Any("row", row))
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/buffer_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestFlushTable(t *testing.T) {
defer func() {
cancel()
}()
b := newBufferSink(ctx, newBlackHoleSink(ctx), make(chan error), 5, make(chan drawbackMsg))
b := newBufferSink(ctx, newBlackHoleSink(ctx), make(chan error), 5)
require.Equal(t, uint64(5), b.getTableCheckpointTs(2))
require.Nil(t, b.EmitRowChangedEvents(ctx))
tbl1 := &model.TableName{TableID: 1}
Expand Down Expand Up @@ -74,7 +74,7 @@ func TestFlushTable(t *testing.T) {

func TestFlushFailed(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
b := newBufferSink(ctx, newBlackHoleSink(ctx), make(chan error), 5, make(chan drawbackMsg))
b := newBufferSink(ctx, newBlackHoleSink(ctx), make(chan error), 5)
checkpoint, err := b.FlushRowChangedEvents(ctx, 3, 8)
require.True(t, checkpoint <= 8)
require.Nil(t, err)
Expand Down
4 changes: 4 additions & 0 deletions cdc/sink/cdclog/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ type fileSink struct {
ddlEncoder codec.EventBatchEncoder
}

func (f *fileSink) Init(_ model.TableID) error {
return nil
}

func (f *fileSink) flushLogMeta() error {
data, err := f.logMeta.Marshal()
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions cdc/sink/cdclog/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ type s3Sink struct {
ddlEncoder codec.EventBatchEncoder
}

func (s *s3Sink) Init(_ model.TableID) error {
return nil
}

func (s *s3Sink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
return s.emitRowChangedEvents(ctx, newTableBuffer, rows...)
}
Expand Down
7 changes: 7 additions & 0 deletions cdc/sink/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ func NewUnresolvedTxnCache() *UnresolvedTxnCache {
}
}

// RemoveTableTxn removes unresolved rows from cache
func (c *UnresolvedTxnCache) RemoveTableTxn(tableID model.TableID) {
c.unresolvedTxnsMu.Lock()
defer c.unresolvedTxnsMu.Unlock()
delete(c.unresolvedTxns, tableID)
}

// Append adds unresolved rows to cache
// the rows inputed into this function will go through the following handling logic
// 1. group by tableID from one input stream
Expand Down
39 changes: 29 additions & 10 deletions cdc/sink/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewManager(
) *Manager {
drawbackChan := make(chan drawbackMsg, 16)
return &Manager{
backendSink: newBufferSink(ctx, backendSink, errCh, checkpointTs, drawbackChan),
backendSink: newBufferSink(ctx, backendSink, errCh, checkpointTs),
changeFeedCheckpointTs: checkpointTs,
tableSinks: make(map[model.TableID]*tableSink),
drawbackChan: drawbackChan,
Expand All @@ -69,7 +69,7 @@ func NewManager(
}

// CreateTableSink creates a table sink
func (m *Manager) CreateTableSink(tableID model.TableID, checkpointTs model.Ts) Sink {
func (m *Manager) CreateTableSink(tableID model.TableID, checkpointTs model.Ts) (Sink, error) {
m.tableSinksMu.Lock()
defer m.tableSinksMu.Unlock()
if _, exist := m.tableSinks[tableID]; exist {
Expand All @@ -81,8 +81,11 @@ func (m *Manager) CreateTableSink(tableID model.TableID, checkpointTs model.Ts)
buffer: make([]*model.RowChangedEvent, 0, 128),
emittedTs: checkpointTs,
}
if err := sink.Init(tableID); err != nil {
return nil, errors.Trace(err)
}
m.tableSinks[tableID] = sink
return sink
return sink, nil
}

// Close closes the Sink manager and backend Sink, this method can be reentrantly called
Expand Down Expand Up @@ -257,6 +260,11 @@ func (t *tableSink) Close(ctx context.Context) error {
return t.manager.destroyTableSink(ctx, t.tableID)
}

// Init table sink resources
func (t *tableSink) Init(tableID model.TableID) error {
return t.manager.backendSink.Init(tableID)
}

// getResolvedTs returns resolved ts, which means all events before resolved ts
// have been sent to sink manager
func (t *tableSink) getResolvedTs() uint64 {
Expand Down Expand Up @@ -289,15 +297,13 @@ func newBufferSink(
backendSink Sink,
errCh chan error,
checkpointTs model.Ts,
drawbackChan chan drawbackMsg,
) *bufferSink {
sink := &bufferSink{
Sink: backendSink,
// buffer shares the same flow control with table sink
buffer: make(map[model.TableID][]*model.RowChangedEvent),
changeFeedCheckpointTs: checkpointTs,
flushTsChan: make(chan flushMsg, 128),
drawbackChan: drawbackChan,
}
go sink.run(ctx, errCh)
return sink
Expand All @@ -324,11 +330,6 @@ func (b *bufferSink) run(ctx context.Context, errCh chan error) {
errCh <- err
}
return
case drawback := <-b.drawbackChan:
b.bufferMu.Lock()
delete(b.buffer, drawback.tableID)
b.bufferMu.Unlock()
close(drawback.callback)
case flushEvent := <-b.flushTsChan:
b.bufferMu.Lock()
resolvedTs := flushEvent.resolvedTs
Expand Down Expand Up @@ -408,6 +409,24 @@ func (b *bufferSink) FlushRowChangedEvents(ctx context.Context, tableID model.Ta
return b.getTableCheckpointTs(tableID), nil
}

// Init table sink resources
func (b *bufferSink) Init(tableID model.TableID) error {
b.clearBufferedTableData(tableID)
return b.Sink.Init(tableID)
}

// Barrier delete buffer
func (b *bufferSink) Barrier(ctx context.Context, tableID model.TableID) error {
b.clearBufferedTableData(tableID)
return b.Sink.Barrier(ctx, tableID)
}

func (b *bufferSink) clearBufferedTableData(tableID model.TableID) {
b.bufferMu.Lock()
defer b.bufferMu.Unlock()
delete(b.buffer, tableID)
}

type flushMsg struct {
tableID model.TableID
resolvedTs uint64
Expand Down
32 changes: 25 additions & 7 deletions cdc/sink/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ func newCheckSink(c *check.C) *checkSink {
}
}

// Init table sink resources
func (c *checkSink) Init(tableID model.TableID) error {
return nil
}

func (c *checkSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
c.rowsMu.Lock()
defer c.rowsMu.Unlock()
Expand Down Expand Up @@ -110,7 +115,9 @@ func (s *managerSuite) TestManagerRandom(c *check.C) {
wg.Add(1)
go func() {
defer wg.Done()
tableSinks[i] = manager.CreateTableSink(model.TableID(i), 0)
var err error
tableSinks[i], err = manager.CreateTableSink(model.TableID(i), 0)
c.Assert(err, check.IsNil)
}()
}
wg.Wait()
Expand Down Expand Up @@ -202,7 +209,8 @@ func (s *managerSuite) TestManagerAddRemoveTable(c *check.C) {
for i := 0; i < goroutineNum; i++ {
if i%4 != 3 {
// add table
table := manager.CreateTableSink(model.TableID(i), maxResolvedTs)
table, err := manager.CreateTableSink(model.TableID(i), maxResolvedTs)
c.Assert(err, check.IsNil)
ctx, cancel := context.WithCancel(ctx)
tableCancels = append(tableCancels, cancel)
tableSinks = append(tableSinks, table)
Expand Down Expand Up @@ -243,8 +251,9 @@ func (s *managerSuite) TestManagerDestroyTableSink(c *check.C) {
defer manager.Close(ctx)

tableID := int64(49)
tableSink := manager.CreateTableSink(tableID, 100)
err := tableSink.EmitRowChangedEvents(ctx, &model.RowChangedEvent{
tableSink, err := manager.CreateTableSink(tableID, 100)
c.Assert(err, check.IsNil)
err = tableSink.EmitRowChangedEvents(ctx, &model.RowChangedEvent{
Table: &model.TableName{TableID: tableID},
CommitTs: uint64(110),
})
Expand Down Expand Up @@ -272,7 +281,11 @@ func BenchmarkManagerFlushing(b *testing.B) {
wg.Add(1)
go func() {
defer wg.Done()
tableSinks[i] = manager.CreateTableSink(model.TableID(i), 0)
var err error
tableSinks[i], err = manager.CreateTableSink(model.TableID(i), 0)
if err != nil {
b.Error(err)
}
}()
}
wg.Wait()
Expand Down Expand Up @@ -338,6 +351,10 @@ type errorSink struct {
*check.C
}

func (e *errorSink) Init(_ model.TableID) error {
return nil
}

func (e *errorSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
return errors.New("error in emit row changed events")
}
Expand Down Expand Up @@ -369,8 +386,9 @@ func (s *managerSuite) TestManagerError(c *check.C) {
errCh := make(chan error, 16)
manager := NewManager(ctx, &errorSink{C: c}, errCh, 0, "", "")
defer manager.Close(ctx)
sink := manager.CreateTableSink(1, 0)
err := sink.EmitRowChangedEvents(ctx, &model.RowChangedEvent{
sink, err := manager.CreateTableSink(1, 0)
c.Assert(err, check.IsNil)
err = sink.EmitRowChangedEvents(ctx, &model.RowChangedEvent{
CommitTs: 1,
Table: &model.TableName{TableID: 1},
})
Expand Down
4 changes: 4 additions & 0 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,10 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
return errors.Trace(err)
}

func (k *mqSink) Init(_ model.TableID) error {
return nil
}

func (k *mqSink) Close(ctx context.Context) error {
err := k.mqProducer.Close()
return errors.Trace(err)
Expand Down
42 changes: 26 additions & 16 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,30 @@ func (w *mysqlSinkWorker) cleanup() {
}
}

func (s *mysqlSink) Init(tableID model.TableID) error {
s.cleanTableResource(tableID)
return nil
}

func (s *mysqlSink) cleanTableResource(tableID model.TableID) {
// We need to clean up the old values of the table,
// otherwise when the table is dispatched back again,
// it may read the old values.
// See: https://github.com/pingcap/tiflow/issues/4464#issuecomment-1085385382.
if resolvedTs, loaded := s.tableMaxResolvedTs.LoadAndDelete(tableID); loaded {
log.Info("clean up table max resolved ts",
zap.Int64("tableID", tableID),
zap.Uint64("resolvedTs", resolvedTs.(uint64)))
}
if checkpointTs, loaded := s.tableCheckpointTs.LoadAndDelete(tableID); loaded {
log.Info("clean up table checkpoint ts",
zap.Int64("tableID", tableID),
zap.Uint64("checkpointTs", checkpointTs.(uint64)))
}
// try to remove table txn cache
s.txnCache.RemoveTableTxn(tableID)
}

func (s *mysqlSink) Close(ctx context.Context) error {
s.execWaitNotifier.Close()
s.resolvedNotifier.Close()
Expand All @@ -860,22 +884,8 @@ func (s *mysqlSink) Close(ctx context.Context) error {
}

func (s *mysqlSink) Barrier(ctx context.Context, tableID model.TableID) error {
// We need to clean up the old values of the table here,
// otherwise when the table is dispatched back again,
// it may read the old values.
// See: https://github.com/pingcap/tiflow/issues/4464#issuecomment-1085385382.
defer func() {
if resolvedTs, loaded := s.tableMaxResolvedTs.LoadAndDelete(tableID); loaded {
log.Info("clean up table max resolved ts",
zap.Int64("tableID", tableID),
zap.Uint64("resolvedTs", resolvedTs.(uint64)))
}
if checkpointTs, loaded := s.tableCheckpointTs.LoadAndDelete(tableID); loaded {
log.Info("clean up table checkpoint ts",
zap.Int64("tableID", tableID),
zap.Uint64("checkpointTs", checkpointTs.(uint64)))
}
}()
defer s.cleanTableResource(tableID)

warnDuration := 3 * time.Minute
ticker := time.NewTicker(warnDuration)
defer ticker.Stop()
Expand Down
29 changes: 29 additions & 0 deletions cdc/sink/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1339,3 +1339,32 @@ func TestMySQLSinkFlushResovledTs(t *testing.T) {
err = sink.Close(ctx)
require.Nil(t, err)
}

func TestCleanTableResource(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
tblID := model.TableID(1)
f, err := filter.NewFilter(config.GetDefaultReplicaConfig())
require.Nil(t, err)
s := &mysqlSink{
txnCache: common.NewUnresolvedTxnCache(),
filter: f,
statistics: NewStatistics(ctx, "db"),
}
require.Nil(t, s.EmitRowChangedEvents(ctx, &model.RowChangedEvent{
Table: &model.TableName{TableID: tblID, Schema: "test", Table: "t1"},
}))
s.tableCheckpointTs.Store(tblID, uint64(1))
s.tableMaxResolvedTs.Store(tblID, uint64(2))
require.Nil(t, s.Init(tblID))
m := &sync.Map{}
m.Store(tblID, uint64(10))
ret, _ := s.txnCache.Resolved(m)
require.True(t, len(ret) == 0)
_, ok := s.tableCheckpointTs.Load(tblID)
require.False(t, ok)
_, ok = s.tableMaxResolvedTs.Load(tblID)
require.False(t, ok)
}
5 changes: 5 additions & 0 deletions cdc/sink/simple_mysql_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ func newSimpleMySQLSink(ctx context.Context, sinkURI *url.URL, config *config.Re
return sink, nil
}

// Init table sink resources
func (s *simpleMySQLSink) Init(tableID model.TableID) error {
return nil
}

// EmitRowChangedEvents sends Row Changed Event to Sink
// EmitRowChangedEvents may write rows to downstream directly;
func (s *simpleMySQLSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
Expand Down
5 changes: 5 additions & 0 deletions cdc/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ const (

// Sink is an abstraction for anything that a changefeed may emit into.
type Sink interface {
// Init initializes the sink resource
// when the sink is added, this function will be called
// init resource or clean up the old values in this function
Init(tableID model.TableID) error

// EmitRowChangedEvents sends Row Changed Event to Sink
// EmitRowChangedEvents may write rows to downstream directly;
EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error
Expand Down