diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index fd8b6d48e93..9a27b4e790b 100755 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/redo" "github.com/pingcap/tiflow/cdc/sink" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" @@ -73,19 +74,25 @@ type sinkNode struct { barrierTs model.Ts flowController tableFlowController + redoManager redo.LogManager replicaConfig *config.ReplicaConfig } -func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode { +func newSinkNode( + tableID model.TableID, sink sink.Sink, + startTs model.Ts, targetTs model.Ts, + flowController tableFlowController, + redoManager redo.LogManager, +) *sinkNode { sn := &sinkNode{ - tableID: tableID, - sink: sink, - status: TableStatusInitializing, - targetTs: targetTs, - barrierTs: startTs, - + tableID: tableID, + sink: sink, + status: TableStatusInitializing, + targetTs: targetTs, + barrierTs: startTs, flowController: flowController, + redoManager: redoManager, } sn.resolvedTs.Store(model.NewResolvedTs(startTs)) sn.checkpointTs.Store(model.NewResolvedTs(startTs)) @@ -136,17 +143,41 @@ func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (er err = n.stop(ctx) } }() + currentBarrierTs := atomic.LoadUint64(&n.barrierTs) - currentCheckpointTs := n.getCheckpointTs() - if resolved.Ts > currentBarrierTs { - resolved = model.NewResolvedTs(currentBarrierTs) - } if resolved.Ts > n.targetTs { resolved = model.NewResolvedTs(n.targetTs) } + + if n.redoManager != nil && n.redoManager.Enabled() { + // redo log do not support batch resolve mode, hence we + // use `ResolvedMark` to restore a normal resolved ts + resolved = model.NewResolvedTs(resolved.ResolvedMark()) + err = n.redoManager.FlushLog(ctx, n.tableID, resolved.Ts) + + redoTs := n.redoManager.GetMinResolvedTs() + if redoTs < currentBarrierTs { + log.Debug("redoTs should not less than current barrierTs", + zap.Int64("tableID", n.tableID), + zap.Uint64("redoTs", redoTs), + zap.Uint64("barrierTs", currentBarrierTs)) + } + + // Fixme(CharlesCheung): remove this check after refactoring redoManager + if resolved.Ts > redoTs { + resolved = model.NewResolvedTs(redoTs) + } + } + + if resolved.Ts > currentBarrierTs { + resolved = model.NewResolvedTs(currentBarrierTs) + } + + currentCheckpointTs := n.getCheckpointTs() if currentCheckpointTs.EqualOrGreater(resolved) { return nil } + checkpoint, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolved) if err != nil { return errors.Trace(err) @@ -177,6 +208,16 @@ func (n *sinkNode) emitRowToSink(ctx context.Context, event *model.PolymorphicEv panic("ProcessorSyncResolvedPreEmit") }) + emitRows := func(rows ...*model.RowChangedEvent) error { + if n.redoManager != nil && n.redoManager.Enabled() { + err := n.redoManager.EmitRowChangedEvents(ctx, n.tableID, rows...) + if err != nil { + return err + } + } + return n.sink.EmitRowChangedEvents(ctx, rows...) + } + if event == nil || event.Row == nil { log.Warn("skip emit nil event", zap.Any("event", event)) return nil @@ -202,13 +243,13 @@ func (n *sinkNode) emitRowToSink(ctx context.Context, event *model.PolymorphicEv return errors.Trace(err) } // NOTICE: Please do not change the order, the delete event always comes before the insert event. - return n.sink.EmitRowChangedEvents(ctx, deleteEvent.Row, insertEvent.Row) + return emitRows(deleteEvent.Row, insertEvent.Row) } // If the handle key columns are not updated, PreColumns is directly ignored. event.Row.PreColumns = nil } - return n.sink.EmitRowChangedEvents(ctx, event.Row) + return emitRows(event.Row) } // shouldSplitUpdateEvent determines if the split event is needed to align the old format based on @@ -290,11 +331,9 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo failpoint.Return(false, errors.New("processor sync resolved injected error")) }) - var resolved model.ResolvedTs + resolved := model.NewResolvedTs(event.CRTs) if event.Resolved != nil { resolved = *(event.Resolved) - } else { - resolved = model.NewResolvedTs(event.CRTs) } if err := n.flushSink(ctx, resolved); err != nil { diff --git a/cdc/processor/pipeline/sink_test.go b/cdc/processor/pipeline/sink_test.go index d8e7d057af3..40ab45e7456 100644 --- a/cdc/processor/pipeline/sink_test.go +++ b/cdc/processor/pipeline/sink_test.go @@ -20,6 +20,7 @@ import ( "time" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/redo" "github.com/pingcap/tiflow/pkg/config" cdcContext "github.com/pingcap/tiflow/pkg/context" cerrors "github.com/pingcap/tiflow/pkg/errors" @@ -135,7 +136,7 @@ func TestStatus(t *testing.T) { }) // test stop at targetTs - node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}) + node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager()) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil). ChangefeedVars().Info.Config) require.Equal(t, TableStatusInitializing, node.Status()) @@ -184,7 +185,7 @@ func TestStatus(t *testing.T) { require.Equal(t, uint64(10), node.CheckpointTs()) // test the stop at ts command - node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}) + node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager()) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) require.Equal(t, TableStatusInitializing, node.Status()) @@ -222,7 +223,7 @@ func TestStatus(t *testing.T) { require.Equal(t, uint64(2), node.CheckpointTs()) // test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts - node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}) + node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager()) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) require.Equal(t, TableStatusInitializing, node.Status()) @@ -272,7 +273,9 @@ func TestStopStatus(t *testing.T) { }) closeCh := make(chan interface{}, 1) - node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{}) + node := newSinkNode(1, + &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, + &mockFlowController{}, redo.NewDisabledManager()) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) require.Equal(t, TableStatusInitializing, node.Status()) @@ -314,7 +317,7 @@ func TestManyTs(t *testing.T) { }, }) sink := &mockSink{} - node := newSinkNode(1, sink, 0, 10, &mockFlowController{}) + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager()) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) require.Equal(t, TableStatusInitializing, node.Status()) @@ -487,7 +490,7 @@ func TestIgnoreEmptyRowChangeEvent(t *testing.T) { }, }) sink := &mockSink{} - node := newSinkNode(1, sink, 0, 10, &mockFlowController{}) + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager()) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) @@ -512,7 +515,7 @@ func TestSplitUpdateEventWhenEnableOldValue(t *testing.T) { }, }) sink := &mockSink{} - node := newSinkNode(1, sink, 0, 10, &mockFlowController{}) + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager()) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) @@ -574,7 +577,7 @@ func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) { }, }) sink := &mockSink{} - node := newSinkNode(1, sink, 0, 10, &mockFlowController{}) + node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager()) node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) @@ -724,7 +727,7 @@ func TestFlushSinkReleaseFlowController(t *testing.T) { flowController := &flushFlowController{} sink := &flushSink{} // sNode is a sinkNode - sNode := newSinkNode(1, sink, 0, 10, flowController) + sNode := newSinkNode(1, sink, 0, 10, flowController, redo.NewDisabledManager()) sNode.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).ChangefeedVars().Info.Config) sNode.barrierTs = 10 diff --git a/cdc/processor/pipeline/table_actor.go b/cdc/processor/pipeline/table_actor.go index 1f638c3f9ca..251fab6ea03 100644 --- a/cdc/processor/pipeline/table_actor.go +++ b/cdc/processor/pipeline/table_actor.go @@ -57,8 +57,8 @@ type tableActor struct { // backend mounter mounter entry.Mounter // backend tableSink - tableSink sink.Sink - redoLogEnabled bool + tableSink sink.Sink + redoManager redo.LogManager pullerNode *pullerNode sortNode *sorterNode @@ -103,7 +103,7 @@ func NewTableActor(cdcCtx cdcContext.Context, tableName string, replicaInfo *model.TableReplicaInfo, sink sink.Sink, - redoLogEnabled bool, + redoManager redo.LogManager, targetTs model.Ts, ) (TablePipeline, error) { config := cdcCtx.ChangefeedVars().Info.Config @@ -124,18 +124,18 @@ func NewTableActor(cdcCtx cdcContext.Context, wg: wg, cancel: cancel, - tableID: tableID, - markTableID: replicaInfo.MarkTableID, - tableName: tableName, - memoryQuota: serverConfig.GetGlobalServerConfig().PerTableMemoryQuota, - upstream: up, - mounter: mounter, - replicaInfo: replicaInfo, - replicaConfig: config, - tableSink: sink, - redoLogEnabled: redoLogEnabled, - targetTs: targetTs, - started: false, + tableID: tableID, + markTableID: replicaInfo.MarkTableID, + tableName: tableName, + memoryQuota: serverConfig.GetGlobalServerConfig().PerTableMemoryQuota, + upstream: up, + mounter: mounter, + replicaInfo: replicaInfo, + replicaConfig: config, + tableSink: sink, + redoManager: redoManager, + targetTs: targetTs, + started: false, changefeedID: changefeedVars.ID, changefeedVars: changefeedVars, @@ -279,7 +279,7 @@ func (t *tableActor) start(sdtTableContext context.Context) error { zap.String("tableName", t.tableName), zap.Uint64("quota", t.memoryQuota)) - flowController := flowcontrol.NewTableFlowController(t.memoryQuota, t.redoLogEnabled) + flowController := flowcontrol.NewTableFlowController(t.memoryQuota, t.redoManager.Enabled()) sorterNode := newSorterNode(t.tableName, t.tableID, t.replicaInfo.StartTs, flowController, t.mounter, t.replicaConfig, @@ -315,7 +315,7 @@ func (t *tableActor) start(sdtTableContext context.Context) error { actorSinkNode := newSinkNode(t.tableID, t.tableSink, t.replicaInfo.StartTs, - t.targetTs, flowController) + t.targetTs, flowController, t.redoManager) actorSinkNode.initWithReplicaConfig(t.replicaConfig) t.sinkNode = actorSinkNode @@ -389,7 +389,7 @@ func (t *tableActor) ResolvedTs() model.Ts { // will be able to cooperate replication status directly. Then we will add // another replication barrier for consistent replication instead of reusing // the global resolved-ts. - if redo.IsConsistentEnabled(t.replicaConfig.Consistent.Level) { + if t.redoManager.Enabled() { return t.sinkNode.ResolvedTs() } return t.sortNode.ResolvedTs() diff --git a/cdc/processor/pipeline/table_actor_test.go b/cdc/processor/pipeline/table_actor_test.go index 1af058d4bf7..48888ec2dae 100644 --- a/cdc/processor/pipeline/table_actor_test.go +++ b/cdc/processor/pipeline/table_actor_test.go @@ -44,13 +44,14 @@ func TestAsyncStopFailed(t *testing.T) { }() tbl := &tableActor{ - stopped: 0, - tableID: 1, - router: tableActorRouter, - cancel: func() {}, - reportErr: func(err error) {}, - sinkNode: newSinkNode(1, &mockSink{}, 0, 0, &mockFlowController{}), + stopped: 0, + tableID: 1, + router: tableActorRouter, + redoManager: redo.NewDisabledManager(), + cancel: func() {}, + reportErr: func(err error) {}, } + tbl.sinkNode = newSinkNode(1, &mockSink{}, 0, 0, &mockFlowController{}, tbl.redoManager) require.True(t, tbl.AsyncStop(1)) mb := actor.NewMailbox[pmessage.Message](actor.ID(1), 0) @@ -67,6 +68,7 @@ func TestTableActorInterface(t *testing.T) { tbl := &tableActor{ markTableID: 2, tableID: 1, + redoManager: redo.NewDisabledManager(), sinkNode: sink, sortNode: sorter, tableName: "t1", @@ -90,6 +92,9 @@ func TestTableActorInterface(t *testing.T) { require.Equal(t, model.Ts(5), tbl.ResolvedTs()) tbl.replicaConfig.Consistent.Level = string(redo.ConsistentLevelEventual) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + tbl.redoManager, _ = redo.NewMockManager(ctx) sink.resolvedTs.Store(model.NewResolvedTs(6)) require.Equal(t, model.Ts(6), tbl.ResolvedTs()) } @@ -105,11 +110,12 @@ func TestTableActorCancel(t *testing.T) { }() tbl := &tableActor{ - stopped: 0, - tableID: 1, - router: tableActorRouter, - cancel: func() {}, - reportErr: func(err error) {}, + stopped: 0, + tableID: 1, + redoManager: redo.NewDisabledManager(), + router: tableActorRouter, + cancel: func() {}, + reportErr: func(err error) {}, } mb := actor.NewMailbox[pmessage.Message](actor.ID(1), 0) tbl.actorID = actor.ID(1) @@ -122,7 +128,7 @@ func TestTableActorCancel(t *testing.T) { func TestTableActorWait(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) eg, _ := errgroup.WithContext(ctx) - tbl := &tableActor{wg: eg} + tbl := &tableActor{wg: eg, redoManager: redo.NewDisabledManager()} wg := sync.WaitGroup{} wg.Add(1) stopped := false @@ -140,6 +146,7 @@ func TestHandleError(t *testing.T) { canceled := false reporterErr := false tbl := &tableActor{ + redoManager: redo.NewDisabledManager(), cancel: func() { canceled = true }, @@ -358,7 +365,7 @@ func TestNewTableActor(t *testing.T) { &model.TableReplicaInfo{ StartTs: 0, MarkTableID: 1, - }, &mockSink{}, false, 10) + }, &mockSink{}, redo.NewDisabledManager(), 10) require.NotNil(t, tbl) require.Nil(t, err) require.NotPanics(t, func() { @@ -374,7 +381,7 @@ func TestNewTableActor(t *testing.T) { &model.TableReplicaInfo{ StartTs: 0, MarkTableID: 1, - }, &mockSink{}, false, 10) + }, &mockSink{}, redo.NewDisabledManager(), 10) require.Nil(t, tbl) require.NotNil(t, err) @@ -403,7 +410,8 @@ func TestTableActorStart(t *testing.T) { return nil } tbl := &tableActor{ - globalVars: globalVars, + redoManager: redo.NewDisabledManager(), + globalVars: globalVars, changefeedVars: &cdcContext.ChangefeedVars{ ID: model.DefaultChangeFeedID("changefeed-id-test"), Info: &model.ChangeFeedInfo{ diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 614342a1494..c722cb1ed06 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -762,7 +762,7 @@ func (p *processor) createTablePipelineImpl( tableName := p.getTableName(ctx, tableID) - s, err := sink.NewTableSink(p.sink, tableID, p.metricsTableSinkTotalRows, p.redoManager) + s, err := sink.NewTableSink(p.sink, tableID, p.metricsTableSinkTotalRows) if err != nil { return nil, errors.Trace(err) } @@ -774,7 +774,7 @@ func (p *processor) createTablePipelineImpl( tableName, replicaInfo, s, - p.redoManager.Enabled(), + p.redoManager, p.changefeed.Info.GetTargetTs()) if err != nil { return nil, errors.Trace(err) diff --git a/cdc/redo/manager.go b/cdc/redo/manager.go index 08e02f57279..38229444609 100644 --- a/cdc/redo/manager.go +++ b/cdc/redo/manager.go @@ -219,6 +219,34 @@ func NewDisabledManager() *ManagerImpl { return &ManagerImpl{enabled: false} } +// NewMockManager returns a mock redo manager instance, used in test only +func NewMockManager(ctx context.Context) (*ManagerImpl, error) { + cfg := &config.ConsistentConfig{ + Level: string(ConsistentLevelEventual), + Storage: "blackhole://", + } + errCh := make(chan error, 1) + opts := &ManagerOptions{ + EnableBgRunner: true, + ErrCh: errCh, + } + logMgr, err := NewManager(ctx, cfg, opts) + if err != nil { + return nil, err + } + + go func() { + select { + case <-ctx.Done(): + return + case err := <-errCh: + log.Panic("log manager error: ", zap.Error(err)) + } + }() + + return logMgr, err +} + // Enabled returns whether this log manager is enabled func (m *ManagerImpl) Enabled() bool { return m.enabled diff --git a/cdc/redo/manager_test.go b/cdc/redo/manager_test.go index 03c473db962..4f82a1e862e 100644 --- a/cdc/redo/manager_test.go +++ b/cdc/redo/manager_test.go @@ -88,24 +88,15 @@ func TestLogManagerInProcessor(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + logMgr, err := NewMockManager(ctx) + require.Nil(t, err) + checkResovledTs := func(mgr LogManager, expectedRts uint64) { time.Sleep(time.Millisecond*200 + updateRtsInterval) resolvedTs := mgr.GetMinResolvedTs() require.Equal(t, expectedRts, resolvedTs) } - cfg := &config.ConsistentConfig{ - Level: string(ConsistentLevelEventual), - Storage: "blackhole://", - } - errCh := make(chan error, 1) - opts := &ManagerOptions{ - EnableBgRunner: true, - ErrCh: errCh, - } - logMgr, err := NewManager(ctx, cfg, opts) - require.Nil(t, err) - // check emit row changed events can move forward resolved ts tables := []model.TableID{53, 55, 57, 59} startTs := uint64(100) @@ -183,16 +174,8 @@ func TestUpdateResolvedTsWithDelayedTable(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) - cfg := &config.ConsistentConfig{ - Level: string(ConsistentLevelEventual), - Storage: "blackhole://", - } - errCh := make(chan error, 1) - opts := &ManagerOptions{ - EnableBgRunner: true, - ErrCh: errCh, - } - logMgr, err := NewManager(ctx, cfg, opts) + defer cancel() + logMgr, err := NewMockManager(ctx) require.Nil(t, err) var ( @@ -209,12 +192,6 @@ func TestUpdateResolvedTsWithDelayedTable(t *testing.T) { for _, tableID := range tables { logMgr.AddTable(tableID, startTs) } - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - logMgr.bgWriteLog(ctx, errCh) - }() // table 53 has new data, resolved-ts moves forward to 125 rows := []*model.RowChangedEvent{ @@ -244,9 +221,6 @@ func TestUpdateResolvedTsWithDelayedTable(t *testing.T) { err = logMgr.updateTableResolvedTs(ctx) require.Nil(t, err) require.Equal(t, table57Ts, logMgr.GetMinResolvedTs()) - - cancel() - wg.Wait() } // TestLogManagerInOwner tests how redo log manager is used in owner, @@ -256,14 +230,7 @@ func TestLogManagerInOwner(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cfg := &config.ConsistentConfig{ - Level: string(ConsistentLevelEventual), - Storage: "blackhole://", - } - opts := &ManagerOptions{ - EnableBgRunner: false, - } - logMgr, err := NewManager(ctx, cfg, opts) + logMgr, err := NewMockManager(ctx) require.Nil(t, err) ddl := &model.DDLEvent{StartTs: 100, CommitTs: 120, Query: "CREATE TABLE `TEST.T1`"} diff --git a/cdc/sink/table_sink.go b/cdc/sink/table_sink.go index a3307fc6abf..fc6e618e7b0 100644 --- a/cdc/sink/table_sink.go +++ b/cdc/sink/table_sink.go @@ -20,7 +20,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/redo" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -29,7 +28,6 @@ type tableSink struct { tableID model.TableID backendSink Sink buffer []*model.RowChangedEvent - redoManager redo.LogManager metricsTableSinkTotalRows prometheus.Counter } @@ -39,12 +37,11 @@ var _ Sink = (*tableSink)(nil) // NewTableSink creates a new table sink func NewTableSink( s Sink, tableID model.TableID, - totalRowsCounter prometheus.Counter, redoManager redo.LogManager, + totalRowsCounter prometheus.Counter, ) (*tableSink, error) { sink := &tableSink{ tableID: tableID, backendSink: s, - redoManager: redoManager, metricsTableSinkTotalRows: totalRowsCounter, } @@ -57,9 +54,6 @@ func NewTableSink( func (t *tableSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error { t.buffer = append(t.buffer, rows...) t.metricsTableSinkTotalRows.Add(float64(len(rows))) - if t.redoManager.Enabled() { - return t.redoManager.EmitRowChangedEvents(ctx, t.tableID, rows...) - } return nil } @@ -83,7 +77,7 @@ func (t *tableSink) FlushRowChangedEvents( return t.buffer[i].CommitTs > resolvedTs }) if i == 0 { - return t.flushResolvedTs(ctx, resolved) + return t.backendSink.FlushRowChangedEvents(ctx, t.tableID, resolved) } resolvedRows := t.buffer[:i] t.buffer = append(make([]*model.RowChangedEvent, 0, len(t.buffer[i:])), t.buffer[i:]...) @@ -92,31 +86,7 @@ func (t *tableSink) FlushRowChangedEvents( if err != nil { return model.NewResolvedTs(0), errors.Trace(err) } - return t.flushResolvedTs(ctx, resolved) -} - -func (t *tableSink) flushResolvedTs( - ctx context.Context, resolved model.ResolvedTs, -) (model.ResolvedTs, error) { - if t.redoManager.Enabled() { - if resolved.IsBatchMode() { - return model.NewResolvedTs(0), nil - } - err := t.redoManager.FlushLog(ctx, t.tableID, resolved.Ts) - if err != nil { - return model.NewResolvedTs(0), errors.Trace(err) - } - redoTs := t.redoManager.GetMinResolvedTs() - if redoTs < resolved.Ts { - resolved.Ts = redoTs - } - } - - checkpointTs, err := t.backendSink.FlushRowChangedEvents(ctx, t.tableID, resolved) - if err != nil { - return model.NewResolvedTs(0), errors.Trace(err) - } - return checkpointTs, nil + return t.backendSink.FlushRowChangedEvents(ctx, t.tableID, resolved) } func (t *tableSink) EmitCheckpointTs(_ context.Context, _ uint64, _ []model.TableName) error {