From d5026d182b3f59836d4fbc772fb3e607b853ceb7 Mon Sep 17 00:00:00 2001 From: qupeng Date: Sun, 21 May 2023 16:32:07 +0800 Subject: [PATCH 1/7] sink or redo errors should cancel SinkManager correctly Signed-off-by: qupeng --- cdc/processor/sinkmanager/manager.go | 14 ++++----- cdc/processor/sinkmanager/manager_test.go | 30 +++++++++++++++++++ cdc/processor/sinkmanager/redo_log_worker.go | 9 ++---- .../sinkmanager/table_sink_worker.go | 7 +++++ 4 files changed, 47 insertions(+), 13 deletions(-) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index fa25fada07a..d5a7c0fe8c6 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -15,6 +15,7 @@ package sinkmanager import ( "context" + "math" "sync" "time" @@ -193,7 +194,7 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er if m.sinkEg == nil { var sinkCtx context.Context m.sinkEg, sinkCtx = errgroup.WithContext(m.managerCtx) - m.startSinkWorkers(sinkCtx, splitTxn, enableOldValue) + m.startSinkWorkers(sinkCtx, m.sinkEg, splitTxn, enableOldValue) m.sinkEg.Go(func() error { return m.generateSinkTasks(sinkCtx) }) m.wg.Add(1) go func() { @@ -213,7 +214,7 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er if m.redoDMLMgr != nil && m.redoEg == nil { var redoCtx context.Context m.redoEg, redoCtx = errgroup.WithContext(m.managerCtx) - m.startRedoWorkers(redoCtx, enableOldValue) + m.startRedoWorkers(redoCtx, m.redoEg, enableOldValue) m.redoEg.Go(func() error { return m.generateRedoTasks(redoCtx) }) m.wg.Add(1) go func() { @@ -315,8 +316,7 @@ func (m *SinkManager) clearSinkFactory() { } } -func (m *SinkManager) startSinkWorkers(ctx context.Context, splitTxn bool, enableOldValue bool) { - eg, ctx := errgroup.WithContext(ctx) +func (m *SinkManager) startSinkWorkers(ctx context.Context, eg *errgroup.Group, splitTxn bool, enableOldValue bool) { for i := 0; i < sinkWorkerNum; i++ { w := newSinkWorker(m.changefeedID, m.sourceManager, m.sinkMemQuota, m.redoMemQuota, @@ -326,8 +326,7 @@ func (m *SinkManager) startSinkWorkers(ctx context.Context, splitTxn bool, enabl } } -func (m *SinkManager) startRedoWorkers(ctx context.Context, enableOldValue bool) { - eg, ctx := errgroup.WithContext(ctx) +func (m *SinkManager) startRedoWorkers(ctx context.Context, eg *errgroup.Group, enableOldValue bool) { for i := 0; i < redoWorkerNum; i++ { w := newRedoWorker(m.changefeedID, m.sourceManager, m.redoMemQuota, m.redoDMLMgr, m.eventCache, enableOldValue) @@ -407,7 +406,8 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error { tableSinkUpperBoundTs model.Ts, ) engine.Position { schemaTs := m.schemaStorage.ResolvedTs() - if tableSinkUpperBoundTs > schemaTs+1 { + if schemaTs != math.MaxUint64 && tableSinkUpperBoundTs > schemaTs+1 { + // schemaTs == math.MaxUint64 means it's in tests. tableSinkUpperBoundTs = schemaTs + 1 } return engine.Position{StartTs: tableSinkUpperBoundTs - 1, CommitTs: tableSinkUpperBoundTs} diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index 1ebf8eec3ad..fa8fbc47577 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -15,9 +15,11 @@ package sinkmanager import ( "context" + "math" "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" "github.com/pingcap/tiflow/cdc/processor/tablepb" @@ -314,3 +316,31 @@ func TestUpdateReceivedSorterResolvedTsOfNonExistTable(t *testing.T) { manager.UpdateReceivedSorterResolvedTs(spanz.TableIDToComparableSpan(1), 1) } + +func TestSinkManagerRunWithErrors(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + errCh := make(chan error, 16) + changefeedInfo := getChangefeedInfo() + manager, source, _ := CreateManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, errCh) + defer func() { + cancel() + manager.Close() + }() + + _ = failpoint.Enable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskError", "return") + defer func() { + _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskError") + }() + + span := spanz.TableIDToComparableSpan(1) + + source.AddTable(span, "test", 100) + manager.AddTable(span, 100, math.MaxUint64) + manager.StartTable(span, 100) + source.Add(span, model.NewResolvedPolymorphicEvent(0, 101)) + manager.UpdateReceivedSorterResolvedTs(span, 101) + manager.UpdateBarrierTs(101, nil) + err := <-errCh +} diff --git a/cdc/processor/sinkmanager/redo_log_worker.go b/cdc/processor/sinkmanager/redo_log_worker.go index 540cc8b7613..2adb9dc02f4 100644 --- a/cdc/processor/sinkmanager/redo_log_worker.go +++ b/cdc/processor/sinkmanager/redo_log_worker.go @@ -107,8 +107,8 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e zap.Any("lowerBound", lowerBound), zap.Any("upperBound", upperBound), zap.Any("lastPos", advancer.lastPos), - zap.Float64("lag", time.Since( - oracle.GetTimeFromTS(advancer.lastPos.CommitTs)).Seconds())) + zap.Float64("lag", time.Since(oracle.GetTimeFromTS(advancer.lastPos.CommitTs)).Seconds()), + zap.Error(finalErr)) if finalErr == nil { // Otherwise we can't ensure all events before `lastPos` are emitted. @@ -181,8 +181,5 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e // Even if task is canceled we still call this again, to avoid something // are left and leak forever. - return advancer.advance( - ctx, - cachedSize, - ) + return advancer.advance(ctx, cachedSize) } diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index 0e4c5b4010f..2fae0a1ab85 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -16,8 +16,10 @@ package sinkmanager import ( "context" "sync/atomic" + "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/memquota" @@ -25,6 +27,7 @@ import ( "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" "github.com/pingcap/tiflow/cdc/sink/tablesink" "github.com/prometheus/client_golang/prometheus" + "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" ) @@ -104,6 +107,9 @@ func (w *sinkWorker) handleTasks(ctx context.Context, taskChan <-chan *sinkTask) return ctx.Err() case task := <-taskChan: err := w.handleTask(ctx, task) + failpoint.Inject("SinkWorkerTaskError", func() { + err = errors.New("SinkWorkerTaskError") + }) if err != nil { return err } @@ -169,6 +175,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e zap.Any("upperBound", upperBound), zap.Bool("splitTxn", w.splitTxn), zap.Any("lastPos", advancer.lastPos), + zap.Float64("lag", time.Since(oracle.GetTimeFromTS(advancer.lastPos.CommitTs)).Seconds()), zap.Error(finalErr)) // Otherwise we can't ensure all events before `lastPos` are emitted. From 0398ed262625e8e26cbdda83379ac171ee47ca3e Mon Sep 17 00:00:00 2001 From: qupeng Date: Sun, 21 May 2023 16:57:48 +0800 Subject: [PATCH 2/7] don't call EventSink.Close and EventSink.WriteEvents concurrently Signed-off-by: qupeng --- cdc/processor/sinkmanager/manager.go | 21 ++++++++++++++++++--- cdc/processor/sinkmanager/manager_test.go | 3 ++- cdc/sink/dmlsink/event_sink.go | 2 +- cdc/sink/dmlsink/factory/factory.go | 18 ++++++++++++++++++ 4 files changed, 39 insertions(+), 5 deletions(-) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index d5a7c0fe8c6..65a2ac4761b 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -310,9 +310,24 @@ func (m *SinkManager) initSinkFactory(errCh chan error) error { func (m *SinkManager) clearSinkFactory() { m.sinkFactoryMu.Lock() defer m.sinkFactoryMu.Unlock() - if m.sinkFactory != nil { - m.sinkFactory.Close() - m.sinkFactory = nil + if !m.sinkFactory.IsDead() { + log.Panic("clearSinkFactory while it's alive", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID)) + } + + // Firstly replace m.sinkFactory to nil so new added tables won't use it. + // Then clear all table sinks so dmlsink.EventSink.WriteEvents won't be called any more. + // Finally close the sinkFactory. + sinkFactory := m.sinkFactory + m.sinkFactory = nil + if sinkFactory != nil { + m.tableSinks.Range(func(_ tablepb.Span, value interface{}) bool { + wrapper := value.(*tableSinkWrapper) + wrapper.clearTableSink() + return true + }) + sinkFactory.Close() } } diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index fa8fbc47577..1c3fe0bf465 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -317,6 +317,7 @@ func TestUpdateReceivedSorterResolvedTsOfNonExistTable(t *testing.T) { manager.UpdateReceivedSorterResolvedTs(spanz.TableIDToComparableSpan(1), 1) } +// Sink worker errors should cancel the sink manager correctly. func TestSinkManagerRunWithErrors(t *testing.T) { t.Parallel() @@ -342,5 +343,5 @@ func TestSinkManagerRunWithErrors(t *testing.T) { source.Add(span, model.NewResolvedPolymorphicEvent(0, 101)) manager.UpdateReceivedSorterResolvedTs(span, 101) manager.UpdateBarrierTs(101, nil) - err := <-errCh + <-errCh } diff --git a/cdc/sink/dmlsink/event_sink.go b/cdc/sink/dmlsink/event_sink.go index a219ac62727..fa497e6176c 100644 --- a/cdc/sink/dmlsink/event_sink.go +++ b/cdc/sink/dmlsink/event_sink.go @@ -18,7 +18,7 @@ type EventSink[E TableEvent] interface { // WriteEvents writes events to the sink. // This is an asynchronously and thread-safe method. WriteEvents(events ...*CallbackableEvent[E]) error - // Close closes the sink. + // Close closes the sink. Should never be called with `WriteEvents` concurrently. Close() // The EventSink meets internal errors and has been dead already. Dead() <-chan struct{} diff --git a/cdc/sink/dmlsink/factory/factory.go b/cdc/sink/dmlsink/factory/factory.go index 3d3ea2c8a20..1c4e1e55b44 100644 --- a/cdc/sink/dmlsink/factory/factory.go +++ b/cdc/sink/dmlsink/factory/factory.go @@ -131,6 +131,24 @@ func (s *SinkFactory) CreateTableSinkForConsumer( &dmlsink.RowChangeEventAppender{}, totalRowsCounter) } +// IsDead checks whether the sink factory is dead or not. +func (s *SinkFactory) IsDead() bool { + var deadCheckCh <-chan struct{} + if s.rowSink != nil { + deadCheckCh = s.rowSink.Dead() + } else if s.txnSink != nil { + deadCheckCh = s.txnSink.Dead() + } else { + return true + } + select { + case <-deadCheckCh: + return true + default: + return false + } +} + // Close closes the sink. func (s *SinkFactory) Close() { if s.rowSink != nil && s.txnSink != nil { From d6cc45290ad4de081e496e981ea94720a01ec305 Mon Sep 17 00:00:00 2001 From: qupeng Date: Sun, 21 May 2023 17:46:01 +0800 Subject: [PATCH 3/7] a little fix Signed-off-by: qupeng --- cdc/processor/sinkmanager/manager.go | 38 ++++++++++++----------- cdc/processor/sinkmanager/manager_test.go | 13 +++++++- cdc/sink/dmlsink/factory/factory.go | 18 ----------- 3 files changed, 32 insertions(+), 37 deletions(-) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 65a2ac4761b..813354fee24 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -109,8 +109,9 @@ type SinkManager struct { redoMemQuota *memquota.MemQuota // To control lifetime of all sub-goroutines. - managerCtx context.Context - ready chan struct{} + managerCtx context.Context + managerCancel context.CancelFunc + ready chan struct{} // To control lifetime of sink and redo tasks. sinkEg *errgroup.Group @@ -171,10 +172,11 @@ func New( // Run implements util.Runnable. func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err error) { - var managerCancel context.CancelFunc - m.managerCtx, managerCancel = context.WithCancel(ctx) + m.managerCtx, m.managerCancel = context.WithCancel(ctx) + m.wg.Add(1) // So `SinkManager.Close` will also wait the function. defer func() { - managerCancel() + m.managerCancel() + m.wg.Done() m.wg.Wait() log.Info("Sink manager exists", zap.String("namespace", m.changefeedID.Namespace), @@ -249,7 +251,7 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er select { case <-m.managerCtx.Done(): - return errors.Trace(ctx.Err()) + return errors.Trace(m.managerCtx.Err()) case err = <-gcErrors: return errors.Trace(err) case err = <-sinkErrors: @@ -310,24 +312,23 @@ func (m *SinkManager) initSinkFactory(errCh chan error) error { func (m *SinkManager) clearSinkFactory() { m.sinkFactoryMu.Lock() defer m.sinkFactoryMu.Unlock() - if !m.sinkFactory.IsDead() { - log.Panic("clearSinkFactory while it's alive", + if m.sinkFactory != nil { + log.Info("Sink manager closing sink factory", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID)) - } - // Firstly replace m.sinkFactory to nil so new added tables won't use it. - // Then clear all table sinks so dmlsink.EventSink.WriteEvents won't be called any more. - // Finally close the sinkFactory. - sinkFactory := m.sinkFactory - m.sinkFactory = nil - if sinkFactory != nil { + // Firstly clear all table sinks so dmlsink.EventSink.WriteEvents won't be called any more. + // Then dmlsink.EventSink.Close can be closed safety. m.tableSinks.Range(func(_ tablepb.Span, value interface{}) bool { - wrapper := value.(*tableSinkWrapper) - wrapper.clearTableSink() + value.(*tableSinkWrapper).clearTableSink() return true }) - sinkFactory.Close() + m.sinkFactory.Close() + m.sinkFactory = nil + + log.Info("Sink manager has closed sink factory", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID)) } } @@ -969,6 +970,7 @@ func (m *SinkManager) Close() { zap.String("changefeed", m.changefeedID.ID)) start := time.Now() + m.managerCancel() m.wg.Wait() m.sinkMemQuota.Close() m.redoMemQuota.Close() diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index 1c3fe0bf465..077e54806b9 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -20,6 +20,7 @@ import ( "time" "github.com/pingcap/failpoint" + "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" "github.com/pingcap/tiflow/cdc/processor/tablepb" @@ -343,5 +344,15 @@ func TestSinkManagerRunWithErrors(t *testing.T) { source.Add(span, model.NewResolvedPolymorphicEvent(0, 101)) manager.UpdateReceivedSorterResolvedTs(span, 101) manager.UpdateBarrierTs(101, nil) - <-errCh + + timer := time.NewTimer(5 * time.Second) + select { + case <-errCh: + if !timer.Stop() { + <-timer.C + } + return + case <-timer.C: + log.Panic("must get an error instead of a timeout") + } } diff --git a/cdc/sink/dmlsink/factory/factory.go b/cdc/sink/dmlsink/factory/factory.go index 1c4e1e55b44..3d3ea2c8a20 100644 --- a/cdc/sink/dmlsink/factory/factory.go +++ b/cdc/sink/dmlsink/factory/factory.go @@ -131,24 +131,6 @@ func (s *SinkFactory) CreateTableSinkForConsumer( &dmlsink.RowChangeEventAppender{}, totalRowsCounter) } -// IsDead checks whether the sink factory is dead or not. -func (s *SinkFactory) IsDead() bool { - var deadCheckCh <-chan struct{} - if s.rowSink != nil { - deadCheckCh = s.rowSink.Dead() - } else if s.txnSink != nil { - deadCheckCh = s.txnSink.Dead() - } else { - return true - } - select { - case <-deadCheckCh: - return true - default: - return false - } -} - // Close closes the sink. func (s *SinkFactory) Close() { if s.rowSink != nil && s.txnSink != nil { From 57dc99111f7f2a254bcc01eb13affb4b337f60fe Mon Sep 17 00:00:00 2001 From: qupeng Date: Mon, 22 May 2023 12:20:11 +0800 Subject: [PATCH 4/7] set sorter.max-memory-percentage default 10 Signed-off-by: qupeng --- pkg/config/config_test_data.go | 2 +- pkg/config/server_config.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index eaebc731cb9..d12ad516186 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -92,7 +92,7 @@ const ( "owner-flush-interval": 50000000, "processor-flush-interval": 50000000, "sorter": { - "max-memory-percentage": 0, + "max-memory-percentage": 10, "sort-dir": "/tmp/sorter", "max-memory-consumption": 0, "num-workerpool-goroutine": 0, diff --git a/pkg/config/server_config.go b/pkg/config/server_config.go index f1e1819d8bc..6c08c0da1bd 100644 --- a/pkg/config/server_config.go +++ b/pkg/config/server_config.go @@ -108,7 +108,7 @@ var defaultServerConfig = &ServerConfig{ Sorter: &SorterConfig{ // Disable block-cache by default. TiCDC only scans events instead of // accessing them randomly, so block-cache is unnecessary. - MaxMemoryPercentage: 0, + MaxMemoryPercentage: 10, SortDir: DefaultSortDir, }, Security: &SecurityConfig{}, From bf8cede4d59bb77a4a97164bb95f122c5fea0d7a Mon Sep 17 00:00:00 2001 From: qupeng Date: Mon, 22 May 2023 15:36:38 +0800 Subject: [PATCH 5/7] some fixes Signed-off-by: qupeng --- cdc/processor/sinkmanager/manager.go | 8 --- .../sinkmanager/table_sink_advancer.go | 4 +- .../sinkmanager/table_sink_worker.go | 5 +- .../sinkmanager/table_sink_wrapper.go | 10 ++- .../cloudstorage/cloud_storage_dml_sink.go | 28 +++++--- cdc/sink/dmlsink/event_sink.go | 2 +- cdc/sink/dmlsink/mq/mq_dml_sink.go | 69 +++++++++++-------- cdc/sink/dmlsink/mq/mq_dml_sink_test.go | 2 +- cdc/sink/dmlsink/txn/txn_dml_sink.go | 38 +++++----- cdc/sink/tablesink/progress_tracker.go | 40 +++++++---- cdc/sink/tablesink/table_sink.go | 5 ++ tests/integration_tests/sink_hang/run.sh | 3 +- 12 files changed, 135 insertions(+), 79 deletions(-) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 813354fee24..a747e668994 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -316,16 +316,8 @@ func (m *SinkManager) clearSinkFactory() { log.Info("Sink manager closing sink factory", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID)) - - // Firstly clear all table sinks so dmlsink.EventSink.WriteEvents won't be called any more. - // Then dmlsink.EventSink.Close can be closed safety. - m.tableSinks.Range(func(_ tablepb.Span, value interface{}) bool { - value.(*tableSinkWrapper).clearTableSink() - return true - }) m.sinkFactory.Close() m.sinkFactory = nil - log.Info("Sink manager has closed sink factory", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID)) diff --git a/cdc/processor/sinkmanager/table_sink_advancer.go b/cdc/processor/sinkmanager/table_sink_advancer.go index 8ff08c37bb8..553a605d53b 100644 --- a/cdc/processor/sinkmanager/table_sink_advancer.go +++ b/cdc/processor/sinkmanager/table_sink_advancer.go @@ -76,7 +76,9 @@ func newTableSinkAdvancer( func (a *tableSinkAdvancer) advance(isLastTime bool) (err error) { // Append the events to the table sink first. if len(a.events) > 0 { - a.task.tableSink.appendRowChangedEvents(a.events...) + if err = a.task.tableSink.appendRowChangedEvents(a.events...); err != nil { + return + } a.events = a.events[:0] if cap(a.events) > bufferSize { a.events = make([]*model.RowChangedEvent, 0, bufferSize) diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index 2fae0a1ab85..651f03ba3f8 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -174,6 +174,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e zap.Any("lowerBound", lowerBound), zap.Any("upperBound", upperBound), zap.Bool("splitTxn", w.splitTxn), + zap.Int("receivedEvents", allEventCount), zap.Any("lastPos", advancer.lastPos), zap.Float64("lag", time.Since(oracle.GetTimeFromTS(advancer.lastPos.CommitTs)).Seconds()), zap.Error(finalErr)) @@ -271,7 +272,9 @@ func (w *sinkWorker) fetchFromCache( task.tableSink.receivedEventCount.Add(int64(popRes.pushCount)) w.metricOutputEventCountKV.Add(float64(popRes.pushCount)) w.metricRedoEventCacheHit.Add(float64(popRes.size)) - task.tableSink.appendRowChangedEvents(popRes.events...) + if err = task.tableSink.appendRowChangedEvents(popRes.events...); err != nil { + return + } } // Get a resolvedTs so that we can record it into sink memory quota. diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index 6514dc17ebf..a097c7703b3 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -161,13 +161,17 @@ func (t *tableSinkWrapper) start(ctx context.Context, startTs model.Ts) (err err return nil } -func (t *tableSinkWrapper) appendRowChangedEvents(events ...*model.RowChangedEvent) { +func (t *tableSinkWrapper) appendRowChangedEvents(events ...*model.RowChangedEvent) error { t.tableSinkMu.Lock() defer t.tableSinkMu.Unlock() // If it's nil it means it's closed. if t.tableSink != nil { t.tableSink.AppendRowChangedEvents(events...) + } else { + // If it's nil it means it's closed. + return tablesink.NewSinkInternalError(errors.New("table sink cleared")) } + return nil } func (t *tableSinkWrapper) updateReceivedSorterResolvedTs(ts model.Ts) { @@ -194,11 +198,13 @@ func (t *tableSinkWrapper) updateReceivedSorterCommitTs(ts model.Ts) { func (t *tableSinkWrapper) updateResolvedTs(ts model.ResolvedTs) error { t.tableSinkMu.Lock() defer t.tableSinkMu.Unlock() - // If it's nil it means it's closed. if t.tableSink != nil { if err := t.tableSink.UpdateResolvedTs(ts); err != nil { return errors.Trace(err) } + } else { + // If it's nil it means it's closed. + return tablesink.NewSinkInternalError(errors.New("table sink cleared")) } return nil } diff --git a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go index 507efa879f1..58d7bd69b4f 100644 --- a/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go +++ b/cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go @@ -68,8 +68,6 @@ type DMLSink struct { changefeedID model.ChangeFeedID // last sequence number lastSeqNum uint64 - // msgCh is a channel to hold eventFragment. - msgCh chan eventFragment // encodingWorkers defines a group of workers for encoding events. encodingWorkers []*encodingWorker // defragmenter is used to defragment the out-of-order encoded messages and @@ -78,12 +76,18 @@ type DMLSink struct { // workers defines a group of workers for writing events to external storage. workers []*dmlWorker + alive struct { + sync.RWMutex + // msgCh is a channel to hold eventFragment. + msgCh chan eventFragment + isDead bool + } + statistics *metrics.Statistics cancel func() wg sync.WaitGroup dead chan struct{} - isDead atomic.Bool } // NewDMLSink creates a cloud storage sink. @@ -127,20 +131,21 @@ func NewDMLSink(ctx context.Context, wgCtx, wgCancel := context.WithCancel(ctx) s := &DMLSink{ changefeedID: contextutil.ChangefeedIDFromCtx(wgCtx), - msgCh: make(chan eventFragment, defaultChannelSize), encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency), workers: make([]*dmlWorker, cfg.WorkerCount), statistics: metrics.NewStatistics(wgCtx, sink.TxnSink), cancel: wgCancel, dead: make(chan struct{}), } + s.alive.msgCh = make(chan eventFragment, defaultChannelSize) + encodedCh := make(chan eventFragment, defaultChannelSize) workerChannels := make([]*chann.DrainableChann[eventFragment], cfg.WorkerCount) // create a group of encoding workers. for i := 0; i < defaultEncodingConcurrency; i++ { encoder := encoderBuilder.Build() - s.encodingWorkers[i] = newEncodingWorker(i, s.changefeedID, encoder, s.msgCh, encodedCh) + s.encodingWorkers[i] = newEncodingWorker(i, s.changefeedID, encoder, s.alive.msgCh, encodedCh) } // create defragmenter. s.defragmenter = newDefragmenter(encodedCh, workerChannels) @@ -157,8 +162,13 @@ func NewDMLSink(ctx context.Context, go func() { defer s.wg.Done() err := s.run(wgCtx) - s.isDead.Store(true) + + s.alive.Lock() + s.alive.isDead = true + close(s.alive.msgCh) + s.alive.Unlock() close(s.dead) + if err != nil && errors.Cause(err) != context.Canceled { select { case <-wgCtx.Done(): @@ -199,7 +209,9 @@ func (s *DMLSink) run(ctx context.Context) error { // WriteEvents write events to cloud storage sink. func (s *DMLSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTableTxn]) error { - if s.isDead.Load() { + s.alive.RLock() + defer s.alive.RUnlock() + if s.alive.isDead { return errors.Trace(errors.New("dead dmlSink")) } @@ -217,7 +229,7 @@ func (s *DMLSink) WriteEvents(txns ...*dmlsink.CallbackableEvent[*model.SingleTa } seq := atomic.AddUint64(&s.lastSeqNum, 1) // emit a TxnCallbackableEvent encoupled with a sequence number starting from one. - s.msgCh <- eventFragment{ + s.alive.msgCh <- eventFragment{ seqNumber: seq, versionedTable: tbl, event: txn, diff --git a/cdc/sink/dmlsink/event_sink.go b/cdc/sink/dmlsink/event_sink.go index fa497e6176c..0de7e79f060 100644 --- a/cdc/sink/dmlsink/event_sink.go +++ b/cdc/sink/dmlsink/event_sink.go @@ -18,7 +18,7 @@ type EventSink[E TableEvent] interface { // WriteEvents writes events to the sink. // This is an asynchronously and thread-safe method. WriteEvents(events ...*CallbackableEvent[E]) error - // Close closes the sink. Should never be called with `WriteEvents` concurrently. + // Close closes the sink. Can be called with `WriteEvents` concurrently. Close() // The EventSink meets internal errors and has been dead already. Dead() <-chan struct{} diff --git a/cdc/sink/dmlsink/mq/mq_dml_sink.go b/cdc/sink/dmlsink/mq/mq_dml_sink.go index cc926b9e86a..e525e6201cf 100644 --- a/cdc/sink/dmlsink/mq/mq_dml_sink.go +++ b/cdc/sink/dmlsink/mq/mq_dml_sink.go @@ -15,7 +15,7 @@ package mq import ( "context" - "sync/atomic" + "sync" "github.com/pingcap/errors" "github.com/pingcap/tiflow/cdc/contextutil" @@ -45,12 +45,16 @@ type dmlSink struct { // protocol indicates the protocol used by this sink. protocol config.Protocol - worker *worker - // eventRouter used to route events to the right topic and partition. - eventRouter *dispatcher.EventRouter - // topicManager used to manage topics. - // It is also responsible for creating topics. - topicManager manager.TopicManager + alive struct { + sync.RWMutex + // eventRouter used to route events to the right topic and partition. + eventRouter *dispatcher.EventRouter + // topicManager used to manage topics. + // It is also responsible for creating topics. + topicManager manager.TopicManager + worker *worker + isDead bool + } // adminClient is used to query kafka cluster information, it's shared among // multiple place, it's sink's responsibility to close it. @@ -58,8 +62,9 @@ type dmlSink struct { ctx context.Context cancel context.CancelFunc - dead chan struct{} - isDead atomic.Bool + + wg sync.WaitGroup + dead chan struct{} } func newDMLSink( @@ -83,23 +88,31 @@ func newDMLSink( statistics := metrics.NewStatistics(ctx, sink.RowSink) worker := newWorker(changefeedID, encoderConfig.Protocol, encoderBuilder, encoderConcurrency, producer, statistics) + s := &dmlSink{ - id: changefeedID, - protocol: encoderConfig.Protocol, - worker: worker, - eventRouter: eventRouter, - topicManager: topicManager, - adminClient: adminClient, - ctx: ctx, - cancel: cancel, - dead: make(chan struct{}), + id: changefeedID, + protocol: encoderConfig.Protocol, + adminClient: adminClient, + ctx: ctx, + cancel: cancel, + dead: make(chan struct{}), } + s.alive.eventRouter = eventRouter + s.alive.topicManager = topicManager + s.alive.worker = worker // Spawn a goroutine to send messages by the worker. + s.wg.Add(1) go func() { - err := s.worker.run(ctx) - s.isDead.Store(true) + defer s.wg.Done() + err := s.alive.worker.run(ctx) + + s.alive.Lock() + s.alive.isDead = true + s.alive.worker.close() + s.alive.Unlock() close(s.dead) + if err != nil && errors.Cause(err) != context.Canceled { select { case <-ctx.Done(): @@ -114,7 +127,9 @@ func newDMLSink( // WriteEvents writes events to the sink. // This is an asynchronously and thread-safe method. func (s *dmlSink) WriteEvents(rows ...*dmlsink.RowChangeCallbackableEvent) error { - if s.isDead.Load() { + s.alive.RLock() + defer s.alive.RUnlock() + if s.alive.isDead { return errors.Trace(errors.New("dead dmlSink")) } @@ -125,14 +140,14 @@ func (s *dmlSink) WriteEvents(rows ...*dmlsink.RowChangeCallbackableEvent) error row.Callback() continue } - topic := s.eventRouter.GetTopicForRowChange(row.Event) - partitionNum, err := s.topicManager.GetPartitionNum(s.ctx, topic) + topic := s.alive.eventRouter.GetTopicForRowChange(row.Event) + partitionNum, err := s.alive.topicManager.GetPartitionNum(s.ctx, topic) if err != nil { return errors.Trace(err) } - partition := s.eventRouter.GetPartitionForRowChange(row.Event, partitionNum) + partition := s.alive.eventRouter.GetPartitionForRowChange(row.Event, partitionNum) // This never be blocked because this is an unbounded channel. - s.worker.msgChan.In() <- mqEvent{ + s.alive.worker.msgChan.In() <- mqEvent{ key: TopicPartitionKey{ Topic: topic, Partition: partition, }, @@ -148,10 +163,8 @@ func (s *dmlSink) Close() { if s.cancel != nil { s.cancel() } + s.wg.Wait() - if s.worker != nil { - s.worker.close() - } if s.adminClient != nil { s.adminClient.Close() } diff --git a/cdc/sink/dmlsink/mq/mq_dml_sink_test.go b/cdc/sink/dmlsink/mq/mq_dml_sink_test.go index 70a3449a8af..720178c36d6 100644 --- a/cdc/sink/dmlsink/mq/mq_dml_sink_test.go +++ b/cdc/sink/dmlsink/mq/mq_dml_sink_test.go @@ -120,6 +120,6 @@ func TestWriteEvents(t *testing.T) { time.Sleep(time.Second) require.Nil(t, err) require.Len(t, errCh, 0) - require.Len(t, s.worker.producer.(*dmlproducer.MockDMLProducer).GetAllEvents(), 3000) + require.Len(t, s.alive.worker.producer.(*dmlproducer.MockDMLProducer).GetAllEvents(), 3000) s.Close() } diff --git a/cdc/sink/dmlsink/txn/txn_dml_sink.go b/cdc/sink/dmlsink/txn/txn_dml_sink.go index 1e64160d998..759074e9eb1 100644 --- a/cdc/sink/dmlsink/txn/txn_dml_sink.go +++ b/cdc/sink/dmlsink/txn/txn_dml_sink.go @@ -17,7 +17,6 @@ import ( "context" "net/url" "sync" - "sync/atomic" "github.com/pingcap/errors" "github.com/pingcap/tiflow/cdc/model" @@ -42,13 +41,17 @@ var _ dmlsink.EventSink[*model.SingleTableTxn] = (*dmlSink)(nil) // dmlSink is the dmlSink for SingleTableTxn. type dmlSink struct { - conflictDetector *causality.ConflictDetector[*worker, *txnEvent] - workers []*worker - cancel func() + alive struct { + sync.RWMutex + conflictDetector *causality.ConflictDetector[*worker, *txnEvent] + isDead bool + } + + workers []*worker + cancel func() - wg sync.WaitGroup - dead chan struct{} - isDead atomic.Bool + wg sync.WaitGroup + dead chan struct{} statistics *metrics.Statistics } @@ -104,12 +107,19 @@ func newSink(ctx context.Context, backends []backend, sink.workers = append(sink.workers, w) } + sink.alive.conflictDetector = causality.NewConflictDetector[*worker, *txnEvent](sink.workers, conflictDetectorSlots) + sink.wg.Add(1) go func() { defer sink.wg.Done() err := g.Wait() - sink.isDead.Store(true) + + sink.alive.Lock() + sink.alive.isDead = true + sink.alive.conflictDetector.Close() + sink.alive.Unlock() close(sink.dead) + if err != nil && errors.Cause(err) != context.Canceled { select { case <-ctx.Done(): @@ -118,13 +128,14 @@ func newSink(ctx context.Context, backends []backend, } }() - sink.conflictDetector = causality.NewConflictDetector[*worker, *txnEvent](sink.workers, conflictDetectorSlots) return sink } // WriteEvents writes events to the dmlSink. func (s *dmlSink) WriteEvents(txnEvents ...*dmlsink.TxnCallbackableEvent) error { - if s.isDead.Load() { + s.alive.RLock() + defer s.alive.RUnlock() + if s.alive.isDead { return errors.Trace(errors.New("dead dmlSink")) } @@ -135,8 +146,7 @@ func (s *dmlSink) WriteEvents(txnEvents ...*dmlsink.TxnCallbackableEvent) error txn.Callback() continue } - - s.conflictDetector.Add(newTxnEvent(txn)) + s.alive.conflictDetector.Add(newTxnEvent(txn)) } return nil } @@ -151,10 +161,6 @@ func (s *dmlSink) Close() { for _, w := range s.workers { w.close() } - // workers could call callback, which will send data to channel in conflict - // detector, so we can't close conflict detector until all workers are closed. - s.conflictDetector.Close() - if s.statistics != nil { s.statistics.Close() } diff --git a/cdc/sink/tablesink/progress_tracker.go b/cdc/sink/tablesink/progress_tracker.go index e6c35d3086f..34b7631b772 100644 --- a/cdc/sink/tablesink/progress_tracker.go +++ b/cdc/sink/tablesink/progress_tracker.go @@ -30,7 +30,7 @@ const ( // It used for closing the table sink. waitingInterval = 100 * time.Millisecond // warnDuration is the duration to warn the progress tracker is not closed. - warnDuration = 1 * time.Minute + warnDuration = 30 * time.Second // A progressTracker contains several internal fixed-length buffers. // NOTICE: the buffer size must be aligned to 8 bytes. // It shouldn't be too large, otherwise it will consume too much memory. @@ -84,6 +84,8 @@ type progressTracker struct { resolvedTsCache []pendingResolvedTs lastMinResolvedTs model.ResolvedTs + + lastCheckClosed atomic.Int64 } // newProgressTracker is used to create a new progress tracker. @@ -262,12 +264,11 @@ func (r *progressTracker) freezeProcess() { r.mu.Lock() defer r.mu.Unlock() r.frozen = true + r.lastCheckClosed.Store(time.Now().Unix()) } // close is used to close the progress tracker. func (r *progressTracker) waitClosed(backendDead <-chan struct{}) { - blockTicker := time.NewTicker(warnDuration) - defer blockTicker.Stop() waitingTicker := time.NewTicker(waitingInterval) defer waitingTicker.Stop() for { @@ -276,15 +277,9 @@ func (r *progressTracker) waitClosed(backendDead <-chan struct{}) { r.advance() return case <-waitingTicker.C: - r.advance() - if r.trackingCount() == 0 { + if r.doCheckClosed() { return } - case <-blockTicker.C: - log.Warn("Close process doesn't return in time, may be stuck", - zap.Stringer("span", &r.span), - zap.Int("trackingCount", r.trackingCount()), - zap.Any("lastMinResolvedTs", r.advance())) } } } @@ -295,7 +290,28 @@ func (r *progressTracker) checkClosed(backendDead <-chan struct{}) bool { r.advance() return true default: - r.advance() - return r.trackingCount() == 0 + return r.doCheckClosed() + } +} + +func (r *progressTracker) doCheckClosed() bool { + resolvedTs := r.advance() + trackingCount := r.trackingCount() + if trackingCount == 0 { + return true + } + + now := time.Now().Unix() + lastCheck := r.lastCheckClosed.Load() + for now > lastCheck+int64(warnDuration.Seconds()) { + if r.lastCheckClosed.CompareAndSwap(lastCheck, now) { + log.Warn("Close table doesn't return in time, may be stuck", + zap.Stringer("span", &r.span), + zap.Int("trackingCount", trackingCount), + zap.Any("lastMinResolvedTs", resolvedTs)) + break + } + lastCheck = r.lastCheckClosed.Load() } + return false } diff --git a/cdc/sink/tablesink/table_sink.go b/cdc/sink/tablesink/table_sink.go index a1c6459796d..588b30aabf9 100644 --- a/cdc/sink/tablesink/table_sink.go +++ b/cdc/sink/tablesink/table_sink.go @@ -48,3 +48,8 @@ type SinkInternalError struct { func (e SinkInternalError) Error() string { return e.err.Error() } + +// NewSinkInternalError creates a SinkInternalError. +func NewSinkInternalError(err error) SinkInternalError { + return SinkInternalError{err} +} diff --git a/tests/integration_tests/sink_hang/run.sh b/tests/integration_tests/sink_hang/run.sh index 3c33ad17f08..0de76e0a3aa 100644 --- a/tests/integration_tests/sink_hang/run.sh +++ b/tests/integration_tests/sink_hang/run.sh @@ -41,7 +41,8 @@ function run() { run_sql "CREATE table sink_hang.t2(id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT} run_sql "BEGIN; INSERT INTO sink_hang.t1 VALUES (),(),(); INSERT INTO sink_hang.t2 VALUES (),(),(); COMMIT" ${UP_TIDB_HOST} ${UP_TIDB_PORT} - ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "normal" "null" "" + ensure $MAX_RETRIES check_changefeed_status 127.0.0.1:8300 $changefeed_id "normal" "last_error" "null" + ensure $MAX_RETRIES check_changefeed_status 127.0.0.1:8300 $changefeed_id "normal" "last_warning" "null" check_table_exists "sink_hang.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} check_table_exists "sink_hang.t2" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} From 0fba80567fabe49cf737b32a17661b9a78e51f26 Mon Sep 17 00:00:00 2001 From: qupeng Date: Mon, 22 May 2023 15:54:50 +0800 Subject: [PATCH 6/7] print more logs Signed-off-by: qupeng --- cdc/sink/tablesink/progress_tracker.go | 6 ++++-- cdc/sink/tablesink/table_sink_impl.go | 3 +++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/cdc/sink/tablesink/progress_tracker.go b/cdc/sink/tablesink/progress_tracker.go index 34b7631b772..b9ee4a73f7a 100644 --- a/cdc/sink/tablesink/progress_tracker.go +++ b/cdc/sink/tablesink/progress_tracker.go @@ -263,8 +263,10 @@ func (r *progressTracker) trackingCount() int { func (r *progressTracker) freezeProcess() { r.mu.Lock() defer r.mu.Unlock() - r.frozen = true - r.lastCheckClosed.Store(time.Now().Unix()) + if !r.frozen { + r.frozen = true + r.lastCheckClosed.Store(time.Now().Unix()) + } } // close is used to close the progress tracker. diff --git a/cdc/sink/tablesink/table_sink_impl.go b/cdc/sink/tablesink/table_sink_impl.go index 3cda7bc291d..3ad2454ab4f 100644 --- a/cdc/sink/tablesink/table_sink_impl.go +++ b/cdc/sink/tablesink/table_sink_impl.go @@ -129,6 +129,9 @@ func (e *EventTableSink[E, P]) UpdateResolvedTs(resolvedTs model.ResolvedTs) err // GetCheckpointTs returns the checkpoint ts of the table sink. func (e *EventTableSink[E, P]) GetCheckpointTs() model.ResolvedTs { + if e.state.Load() == state.TableSinkStopping { + e.progressTracker.checkClosed(e.backendSink.Dead()) + } return e.progressTracker.advance() } From 9b1497c7fba1d290443011f1d7d1e4305a125e1d Mon Sep 17 00:00:00 2001 From: qupeng Date: Mon, 22 May 2023 18:14:18 +0800 Subject: [PATCH 7/7] fix Signed-off-by: qupeng --- cdc/processor/processor.go | 3 +-- cdc/processor/sinkmanager/manager.go | 23 +++++++++++------ .../sinkmanager/table_sink_wrapper.go | 25 ++++++++----------- 3 files changed, 28 insertions(+), 23 deletions(-) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index d6bcc0244db..ac83c5e9ed5 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -210,8 +210,7 @@ func (p *processor) RemoveTableSpan(span tablepb.Span) bool { zap.Stringer("span", &span)) return true } - p.sinkManager.r.AsyncStopTable(span) - return true + return p.sinkManager.r.AsyncStopTable(span) } // IsAddTableSpanFinished implements TableExecutor interface. diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index a747e668994..9c5a9bc08cf 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -823,11 +823,6 @@ func (m *SinkManager) StartTable(span tablepb.Span, startTs model.Ts) error { // AsyncStopTable sets the table(TableSink) state to stopped. func (m *SinkManager) AsyncStopTable(span tablepb.Span) bool { - log.Info("Async stop table sink", - zap.String("namespace", m.changefeedID.Namespace), - zap.String("changefeed", m.changefeedID.ID), - zap.Stringer("span", &span)) - tableSink, ok := m.tableSinks.Load(span) if !ok { // Just warn, because the table sink may be removed by another goroutine. @@ -895,7 +890,7 @@ func (m *SinkManager) GetAllCurrentTableSpansCount() int { // GetTableState returns the table(TableSink) state. func (m *SinkManager) GetTableState(span tablepb.Span) (tablepb.TableState, bool) { - tableSink, ok := m.tableSinks.Load(span) + wrapper, ok := m.tableSinks.Load(span) if !ok { log.Debug("Table sink not found when getting table state", zap.String("namespace", m.changefeedID.Namespace), @@ -903,7 +898,21 @@ func (m *SinkManager) GetTableState(span tablepb.Span) (tablepb.TableState, bool zap.Stringer("span", &span)) return tablepb.TableStateAbsent, false } - return tableSink.(*tableSinkWrapper).getState(), true + + // NOTE(qupeng): I'm not sure whether `SinkManager.AsyncStopTable` will be called + // again or not if it returns false. So we must retry `tableSink.asyncClose` here + // if necessary. It's better to remove the dirty logic in the future. + tableSink := wrapper.(*tableSinkWrapper) + if tableSink.getState() == tablepb.TableStateStopping && tableSink.asyncClose() { + cleanedBytes := m.sinkMemQuota.Clean(span) + cleanedBytes += m.redoMemQuota.Clean(span) + log.Debug("MemoryQuotaTracing: Clean up memory quota for table sink task when removing table", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Stringer("span", &span), + zap.Uint64("memory", cleanedBytes)) + } + return tableSink.getState(), true } // GetTableStats returns the state of the table. diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index a097c7703b3..c6ddfde3c2d 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -258,19 +258,26 @@ func (t *tableSinkWrapper) markAsClosing() { break } if t.state.CompareAndSwap(curr, tablepb.TableStateStopping) { + log.Info("Sink is closing", + zap.String("namespace", t.changefeed.Namespace), + zap.String("changefeed", t.changefeed.ID), + zap.Stringer("span", &t.span)) break } } } -func (t *tableSinkWrapper) markAsClosed() (modified bool) { +func (t *tableSinkWrapper) markAsClosed() { for { curr := t.state.Load() if curr == tablepb.TableStateStopped { return } if t.state.CompareAndSwap(curr, tablepb.TableStateStopped) { - modified = true + log.Info("Sink is closed", + zap.String("namespace", t.changefeed.Namespace), + zap.String("changefeed", t.changefeed.ID), + zap.Stringer("span", &t.span)) return } } @@ -279,12 +286,7 @@ func (t *tableSinkWrapper) markAsClosed() (modified bool) { func (t *tableSinkWrapper) asyncClose() bool { t.markAsClosing() if t.asyncClearTableSink() { - if t.markAsClosed() { - log.Info("Sink is closed", - zap.String("namespace", t.changefeed.Namespace), - zap.String("changefeed", t.changefeed.ID), - zap.Stringer("span", &t.span)) - } + t.markAsClosed() return true } return false @@ -293,12 +295,7 @@ func (t *tableSinkWrapper) asyncClose() bool { func (t *tableSinkWrapper) close() { t.markAsClosing() t.clearTableSink() - if t.markAsClosed() { - log.Info("Sink is closed", - zap.String("namespace", t.changefeed.Namespace), - zap.String("changefeed", t.changefeed.ID), - zap.Stringer("span", &t.span)) - } + t.markAsClosed() } // Return true means the internal table sink has been initialized.