From 3c514ac712791ae9d27d03bb71dff62975c6c645 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 23 Nov 2021 14:09:50 +0800 Subject: [PATCH] processor,sink(cdc): let sink report resolved ts and do not skip buffer sink flush (#3540) (#3561) --- cdc/sink/manager.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/cdc/sink/manager.go b/cdc/sink/manager.go index 0e8d5df9617..6ef33f3a7de 100644 --- a/cdc/sink/manager.go +++ b/cdc/sink/manager.go @@ -110,9 +110,12 @@ func (m *Manager) flushBackendSink(ctx context.Context) (model.Ts, error) { // NOTICE: Because all table sinks will try to flush backend sink, // which will cause a lot of lock contention and blocking in high concurrency cases. // So here we use flushing as a lightweight lock to improve the lock competition problem. - if !atomic.CompareAndSwapInt64(&m.flushing, 0, 1) { - return m.getCheckpointTs(), nil - } + // + // Do not skip flushing for resolving #3503. + // TODO uncomment the following return. + // if !atomic.CompareAndSwapInt64(&m.flushing, 0, 1) { + // return m.getCheckpointTs(), nil + // } m.flushMu.Lock() defer func() { m.flushMu.Unlock()