Skip to content

Commit

Permalink
(sink/cdc): fix some bugs introduced by #8949 (#9010)
Browse files Browse the repository at this point in the history
ref #8657
  • Loading branch information
hicqu authored May 22, 2023
1 parent 30ab8ae commit fbb363a
Show file tree
Hide file tree
Showing 18 changed files with 245 additions and 116 deletions.
3 changes: 1 addition & 2 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,7 @@ func (p *processor) RemoveTableSpan(span tablepb.Span) bool {
zap.Stringer("span", &span))
return true
}
p.sinkManager.r.AsyncStopTable(span)
return true
return p.sinkManager.r.AsyncStopTable(span)
}

// IsAddTableSpanFinished implements TableExecutor interface.
Expand Down
58 changes: 38 additions & 20 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package sinkmanager

import (
"context"
"math"
"sync"
"time"

Expand Down Expand Up @@ -108,8 +109,9 @@ type SinkManager struct {
redoMemQuota *memquota.MemQuota

// To control lifetime of all sub-goroutines.
managerCtx context.Context
ready chan struct{}
managerCtx context.Context
managerCancel context.CancelFunc
ready chan struct{}

// To control lifetime of sink and redo tasks.
sinkEg *errgroup.Group
Expand Down Expand Up @@ -170,10 +172,11 @@ func New(

// Run implements util.Runnable.
func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err error) {
var managerCancel context.CancelFunc
m.managerCtx, managerCancel = context.WithCancel(ctx)
m.managerCtx, m.managerCancel = context.WithCancel(ctx)
m.wg.Add(1) // So `SinkManager.Close` will also wait the function.
defer func() {
managerCancel()
m.managerCancel()
m.wg.Done()
m.wg.Wait()
log.Info("Sink manager exists",
zap.String("namespace", m.changefeedID.Namespace),
Expand All @@ -193,7 +196,7 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
if m.sinkEg == nil {
var sinkCtx context.Context
m.sinkEg, sinkCtx = errgroup.WithContext(m.managerCtx)
m.startSinkWorkers(sinkCtx, splitTxn, enableOldValue)
m.startSinkWorkers(sinkCtx, m.sinkEg, splitTxn, enableOldValue)
m.sinkEg.Go(func() error { return m.generateSinkTasks(sinkCtx) })
m.wg.Add(1)
go func() {
Expand All @@ -213,7 +216,7 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
if m.redoDMLMgr != nil && m.redoEg == nil {
var redoCtx context.Context
m.redoEg, redoCtx = errgroup.WithContext(m.managerCtx)
m.startRedoWorkers(redoCtx, enableOldValue)
m.startRedoWorkers(redoCtx, m.redoEg, enableOldValue)
m.redoEg.Go(func() error { return m.generateRedoTasks(redoCtx) })
m.wg.Add(1)
go func() {
Expand Down Expand Up @@ -248,7 +251,7 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er

select {
case <-m.managerCtx.Done():
return errors.Trace(ctx.Err())
return errors.Trace(m.managerCtx.Err())
case err = <-gcErrors:
return errors.Trace(err)
case err = <-sinkErrors:
Expand Down Expand Up @@ -310,13 +313,18 @@ func (m *SinkManager) clearSinkFactory() {
m.sinkFactoryMu.Lock()
defer m.sinkFactoryMu.Unlock()
if m.sinkFactory != nil {
log.Info("Sink manager closing sink factory",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID))
m.sinkFactory.Close()
m.sinkFactory = nil
log.Info("Sink manager has closed sink factory",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID))
}
}

func (m *SinkManager) startSinkWorkers(ctx context.Context, splitTxn bool, enableOldValue bool) {
eg, ctx := errgroup.WithContext(ctx)
func (m *SinkManager) startSinkWorkers(ctx context.Context, eg *errgroup.Group, splitTxn bool, enableOldValue bool) {
for i := 0; i < sinkWorkerNum; i++ {
w := newSinkWorker(m.changefeedID, m.sourceManager,
m.sinkMemQuota, m.redoMemQuota,
Expand All @@ -326,8 +334,7 @@ func (m *SinkManager) startSinkWorkers(ctx context.Context, splitTxn bool, enabl
}
}

func (m *SinkManager) startRedoWorkers(ctx context.Context, enableOldValue bool) {
eg, ctx := errgroup.WithContext(ctx)
func (m *SinkManager) startRedoWorkers(ctx context.Context, eg *errgroup.Group, enableOldValue bool) {
for i := 0; i < redoWorkerNum; i++ {
w := newRedoWorker(m.changefeedID, m.sourceManager, m.redoMemQuota,
m.redoDMLMgr, m.eventCache, enableOldValue)
Expand Down Expand Up @@ -407,7 +414,8 @@ func (m *SinkManager) generateSinkTasks(ctx context.Context) error {
tableSinkUpperBoundTs model.Ts,
) engine.Position {
schemaTs := m.schemaStorage.ResolvedTs()
if tableSinkUpperBoundTs > schemaTs+1 {
if schemaTs != math.MaxUint64 && tableSinkUpperBoundTs > schemaTs+1 {
// schemaTs == math.MaxUint64 means it's in tests.
tableSinkUpperBoundTs = schemaTs + 1
}
return engine.Position{StartTs: tableSinkUpperBoundTs - 1, CommitTs: tableSinkUpperBoundTs}
Expand Down Expand Up @@ -815,11 +823,6 @@ func (m *SinkManager) StartTable(span tablepb.Span, startTs model.Ts) error {

// AsyncStopTable sets the table(TableSink) state to stopped.
func (m *SinkManager) AsyncStopTable(span tablepb.Span) bool {
log.Info("Async stop table sink",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &span))

tableSink, ok := m.tableSinks.Load(span)
if !ok {
// Just warn, because the table sink may be removed by another goroutine.
Expand Down Expand Up @@ -887,15 +890,29 @@ func (m *SinkManager) GetAllCurrentTableSpansCount() int {

// GetTableState returns the table(TableSink) state.
func (m *SinkManager) GetTableState(span tablepb.Span) (tablepb.TableState, bool) {
tableSink, ok := m.tableSinks.Load(span)
wrapper, ok := m.tableSinks.Load(span)
if !ok {
log.Debug("Table sink not found when getting table state",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &span))
return tablepb.TableStateAbsent, false
}
return tableSink.(*tableSinkWrapper).getState(), true

// NOTE(qupeng): I'm not sure whether `SinkManager.AsyncStopTable` will be called
// again or not if it returns false. So we must retry `tableSink.asyncClose` here
// if necessary. It's better to remove the dirty logic in the future.
tableSink := wrapper.(*tableSinkWrapper)
if tableSink.getState() == tablepb.TableStateStopping && tableSink.asyncClose() {
cleanedBytes := m.sinkMemQuota.Clean(span)
cleanedBytes += m.redoMemQuota.Clean(span)
log.Debug("MemoryQuotaTracing: Clean up memory quota for table sink task when removing table",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &span),
zap.Uint64("memory", cleanedBytes))
}
return tableSink.getState(), true
}

// GetTableStats returns the state of the table.
Expand Down Expand Up @@ -954,6 +971,7 @@ func (m *SinkManager) Close() {
zap.String("changefeed", m.changefeedID.ID))

start := time.Now()
m.managerCancel()
m.wg.Wait()
m.sinkMemQuota.Close()
m.redoMemQuota.Close()
Expand Down
42 changes: 42 additions & 0 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ package sinkmanager

import (
"context"
"math"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
Expand Down Expand Up @@ -314,3 +317,42 @@ func TestUpdateReceivedSorterResolvedTsOfNonExistTable(t *testing.T) {

manager.UpdateReceivedSorterResolvedTs(spanz.TableIDToComparableSpan(1), 1)
}

// Sink worker errors should cancel the sink manager correctly.
func TestSinkManagerRunWithErrors(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
errCh := make(chan error, 16)
changefeedInfo := getChangefeedInfo()
manager, source, _ := CreateManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, errCh)
defer func() {
cancel()
manager.Close()
}()

_ = failpoint.Enable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskError", "return")
defer func() {
_ = failpoint.Disable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskError")
}()

span := spanz.TableIDToComparableSpan(1)

source.AddTable(span, "test", 100)
manager.AddTable(span, 100, math.MaxUint64)
manager.StartTable(span, 100)
source.Add(span, model.NewResolvedPolymorphicEvent(0, 101))
manager.UpdateReceivedSorterResolvedTs(span, 101)
manager.UpdateBarrierTs(101, nil)

timer := time.NewTimer(5 * time.Second)
select {
case <-errCh:
if !timer.Stop() {
<-timer.C
}
return
case <-timer.C:
log.Panic("must get an error instead of a timeout")
}
}
9 changes: 3 additions & 6 deletions cdc/processor/sinkmanager/redo_log_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e
zap.Any("lowerBound", lowerBound),
zap.Any("upperBound", upperBound),
zap.Any("lastPos", advancer.lastPos),
zap.Float64("lag", time.Since(
oracle.GetTimeFromTS(advancer.lastPos.CommitTs)).Seconds()))
zap.Float64("lag", time.Since(oracle.GetTimeFromTS(advancer.lastPos.CommitTs)).Seconds()),
zap.Error(finalErr))

if finalErr == nil {
// Otherwise we can't ensure all events before `lastPos` are emitted.
Expand Down Expand Up @@ -181,8 +181,5 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e

// Even if task is canceled we still call this again, to avoid something
// are left and leak forever.
return advancer.advance(
ctx,
cachedSize,
)
return advancer.advance(ctx, cachedSize)
}
4 changes: 3 additions & 1 deletion cdc/processor/sinkmanager/table_sink_advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ func newTableSinkAdvancer(
func (a *tableSinkAdvancer) advance(isLastTime bool) (err error) {
// Append the events to the table sink first.
if len(a.events) > 0 {
a.task.tableSink.appendRowChangedEvents(a.events...)
if err = a.task.tableSink.appendRowChangedEvents(a.events...); err != nil {
return
}
a.events = a.events[:0]
if cap(a.events) > bufferSize {
a.events = make([]*model.RowChangedEvent, 0, bufferSize)
Expand Down
12 changes: 11 additions & 1 deletion cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@ package sinkmanager
import (
"context"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/memquota"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine"
"github.com/pingcap/tiflow/cdc/sink/tablesink"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -104,6 +107,9 @@ func (w *sinkWorker) handleTasks(ctx context.Context, taskChan <-chan *sinkTask)
return ctx.Err()
case task := <-taskChan:
err := w.handleTask(ctx, task)
failpoint.Inject("SinkWorkerTaskError", func() {
err = errors.New("SinkWorkerTaskError")
})
if err != nil {
return err
}
Expand Down Expand Up @@ -168,7 +174,9 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e
zap.Any("lowerBound", lowerBound),
zap.Any("upperBound", upperBound),
zap.Bool("splitTxn", w.splitTxn),
zap.Int("receivedEvents", allEventCount),
zap.Any("lastPos", advancer.lastPos),
zap.Float64("lag", time.Since(oracle.GetTimeFromTS(advancer.lastPos.CommitTs)).Seconds()),
zap.Error(finalErr))

// Otherwise we can't ensure all events before `lastPos` are emitted.
Expand Down Expand Up @@ -264,7 +272,9 @@ func (w *sinkWorker) fetchFromCache(
task.tableSink.receivedEventCount.Add(int64(popRes.pushCount))
w.metricOutputEventCountKV.Add(float64(popRes.pushCount))
w.metricRedoEventCacheHit.Add(float64(popRes.size))
task.tableSink.appendRowChangedEvents(popRes.events...)
if err = task.tableSink.appendRowChangedEvents(popRes.events...); err != nil {
return
}
}

// Get a resolvedTs so that we can record it into sink memory quota.
Expand Down
35 changes: 19 additions & 16 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,17 @@ func (t *tableSinkWrapper) start(ctx context.Context, startTs model.Ts) (err err
return nil
}

func (t *tableSinkWrapper) appendRowChangedEvents(events ...*model.RowChangedEvent) {
func (t *tableSinkWrapper) appendRowChangedEvents(events ...*model.RowChangedEvent) error {
t.tableSinkMu.Lock()
defer t.tableSinkMu.Unlock()
// If it's nil it means it's closed.
if t.tableSink != nil {
t.tableSink.AppendRowChangedEvents(events...)
} else {
// If it's nil it means it's closed.
return tablesink.NewSinkInternalError(errors.New("table sink cleared"))
}
return nil
}

func (t *tableSinkWrapper) updateReceivedSorterResolvedTs(ts model.Ts) {
Expand All @@ -194,11 +198,13 @@ func (t *tableSinkWrapper) updateReceivedSorterCommitTs(ts model.Ts) {
func (t *tableSinkWrapper) updateResolvedTs(ts model.ResolvedTs) error {
t.tableSinkMu.Lock()
defer t.tableSinkMu.Unlock()
// If it's nil it means it's closed.
if t.tableSink != nil {
if err := t.tableSink.UpdateResolvedTs(ts); err != nil {
return errors.Trace(err)
}
} else {
// If it's nil it means it's closed.
return tablesink.NewSinkInternalError(errors.New("table sink cleared"))
}
return nil
}
Expand Down Expand Up @@ -252,19 +258,26 @@ func (t *tableSinkWrapper) markAsClosing() {
break
}
if t.state.CompareAndSwap(curr, tablepb.TableStateStopping) {
log.Info("Sink is closing",
zap.String("namespace", t.changefeed.Namespace),
zap.String("changefeed", t.changefeed.ID),
zap.Stringer("span", &t.span))
break
}
}
}

func (t *tableSinkWrapper) markAsClosed() (modified bool) {
func (t *tableSinkWrapper) markAsClosed() {
for {
curr := t.state.Load()
if curr == tablepb.TableStateStopped {
return
}
if t.state.CompareAndSwap(curr, tablepb.TableStateStopped) {
modified = true
log.Info("Sink is closed",
zap.String("namespace", t.changefeed.Namespace),
zap.String("changefeed", t.changefeed.ID),
zap.Stringer("span", &t.span))
return
}
}
Expand All @@ -273,12 +286,7 @@ func (t *tableSinkWrapper) markAsClosed() (modified bool) {
func (t *tableSinkWrapper) asyncClose() bool {
t.markAsClosing()
if t.asyncClearTableSink() {
if t.markAsClosed() {
log.Info("Sink is closed",
zap.String("namespace", t.changefeed.Namespace),
zap.String("changefeed", t.changefeed.ID),
zap.Stringer("span", &t.span))
}
t.markAsClosed()
return true
}
return false
Expand All @@ -287,12 +295,7 @@ func (t *tableSinkWrapper) asyncClose() bool {
func (t *tableSinkWrapper) close() {
t.markAsClosing()
t.clearTableSink()
if t.markAsClosed() {
log.Info("Sink is closed",
zap.String("namespace", t.changefeed.Namespace),
zap.String("changefeed", t.changefeed.ID),
zap.Stringer("span", &t.span))
}
t.markAsClosed()
}

// Return true means the internal table sink has been initialized.
Expand Down
Loading

0 comments on commit fbb363a

Please sign in to comment.