Skip to content

Commit

Permalink
processor,sink(cdc): let sink report resolved ts and do not skip buff…
Browse files Browse the repository at this point in the history
…er sink flush (#3540) (#3561)
  • Loading branch information
ti-chi-bot authored Nov 23, 2021
1 parent 248a2c0 commit 3c514ac
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions cdc/sink/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,12 @@ func (m *Manager) flushBackendSink(ctx context.Context) (model.Ts, error) {
// NOTICE: Because all table sinks will try to flush backend sink,
// which will cause a lot of lock contention and blocking in high concurrency cases.
// So here we use flushing as a lightweight lock to improve the lock competition problem.
if !atomic.CompareAndSwapInt64(&m.flushing, 0, 1) {
return m.getCheckpointTs(), nil
}
//
// Do not skip flushing for resolving #3503.
// TODO uncomment the following return.
// if !atomic.CompareAndSwapInt64(&m.flushing, 0, 1) {
// return m.getCheckpointTs(), nil
// }
m.flushMu.Lock()
defer func() {
m.flushMu.Unlock()
Expand Down

0 comments on commit 3c514ac

Please sign in to comment.