Skip to content

Commit

Permalink
owner(cdc): add a warning for owner DDL puller being stuck (#4039) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Apr 15, 2022
1 parent 51289da commit 2971ac3
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 0 deletions.
22 changes: 22 additions & 0 deletions cdc/owner/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package owner
import (
"context"
"sync"
"time"

"github.com/benbjohnson/clock"
"github.com/pingcap/errors"
"github.com/pingcap/log"
timodel "github.com/pingcap/parser/model"
Expand All @@ -31,6 +33,10 @@ import (
"golang.org/x/sync/errgroup"
)

const (
ownerDDLPullerStuckWarnTimeout = 30 * time.Second
)

// DDLPuller is a wrapper of the Puller interface for the owner
// DDLPuller starts a puller, listens to the DDL range, adds the received DDLs into an internal queue
type DDLPuller interface {
Expand All @@ -53,6 +59,8 @@ type ddlPullerImpl struct {
pendingDDLJobs []*timodel.Job
lastDDLJobID int64
cancel context.CancelFunc

clock clock.Clock
}

func newDDLPuller(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) {
Expand Down Expand Up @@ -80,6 +88,7 @@ func newDDLPuller(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) {
resolvedTS: startTs,
filter: f,
cancel: func() {},
clock: clock.New(),
}, nil
}

Expand All @@ -91,6 +100,7 @@ func (h *ddlPullerImpl) Run(ctx cdcContext.Context) error {
stdCtx = util.PutChangefeedIDInCtx(stdCtx, ctx.ChangefeedVars().ID)
stdCtx = util.PutRoleInCtx(stdCtx, util.RoleProcessor)
errg, stdCtx := errgroup.WithContext(stdCtx)
lastResolvedTsAdanvcedTime := h.clock.Now()

errg.Go(func() error {
return h.puller.Run(stdCtx)
Expand All @@ -106,6 +116,7 @@ func (h *ddlPullerImpl) Run(ctx cdcContext.Context) error {
h.mu.Lock()
defer h.mu.Unlock()
if rawDDL.CRTs > h.resolvedTS {
lastResolvedTsAdanvcedTime = h.clock.Now()
h.resolvedTS = rawDDL.CRTs
}
return nil
Expand All @@ -132,11 +143,22 @@ func (h *ddlPullerImpl) Run(ctx cdcContext.Context) error {
return nil
}

ticker := h.clock.Ticker(ownerDDLPullerStuckWarnTimeout)
defer ticker.Stop()

errg.Go(func() error {
for {
select {
case <-stdCtx.Done():
return stdCtx.Err()
case <-ticker.C:
duration := h.clock.Since(lastResolvedTsAdanvcedTime)
if duration > ownerDDLPullerStuckWarnTimeout {
log.Warn("ddl puller resolved ts has not advanced",
zap.String("changefeed-id", ctx.ChangefeedVars().ID),
zap.Duration("duration", duration),
zap.Uint64("resolved-ts", h.resolvedTS))
}
case e := <-rawDDLCh:
if err := receiveDDL(e); err != nil {
return errors.Trace(err)
Expand Down
70 changes: 70 additions & 0 deletions cdc/owner/ddl_puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@ import (
"encoding/json"
"sync"
"sync/atomic"
"time"

"github.com/benbjohnson/clock"
"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/log"
timodel "github.com/pingcap/parser/model"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tiflow/cdc/model"
cdcContext "github.com/pingcap/tiflow/pkg/context"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/util/testleak"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"
)

var _ = check.Suite(&ddlPullerSuite{})
Expand Down Expand Up @@ -220,6 +225,71 @@ func (s *ddlPullerSuite) TestPuller(c *check.C) {
c.Assert(ddl, check.IsNil)
}

func (*ddlPullerSuite) TestResolvedTsStuck(c *check.C) {
defer testleak.AfterTest(c)()
// For observing the logs
zapcore, logs := observer.New(zap.WarnLevel)
conf := &log.Config{Level: "warn", File: log.FileLogConfig{}}
_, r, _ := log.InitLogger(conf)
logger := zap.New(zapcore)
log.ReplaceGlobals(logger, r)
defer func() {
logger, r, err := log.InitLogger(conf)
c.Assert(err, check.IsNil)
log.ReplaceGlobals(logger, r)
}()

startTs := uint64(10)
mockPuller := newMockPuller(c, startTs)
ctx := cdcContext.NewBackendContext4Test(true)
p, err := newDDLPuller(ctx, startTs)
c.Assert(err, check.IsNil)

mockClock := clock.NewMock()
p.(*ddlPullerImpl).clock = mockClock

p.(*ddlPullerImpl).puller = mockPuller
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
err := p.Run(ctx)
if errors.Cause(err) == context.Canceled {
err = nil
}
c.Assert(err, check.IsNil)
}()
defer wg.Wait()
defer p.Close()

// test initialize state
resolvedTs, ddl := p.FrontDDL()
c.Assert(resolvedTs, check.Equals, startTs)
c.Assert(ddl, check.IsNil)
resolvedTs, ddl = p.PopFrontDDL()
c.Assert(resolvedTs, check.Equals, startTs)
c.Assert(ddl, check.IsNil)

mockPuller.appendResolvedTs(30)
waitResolvedTsGrowing(c, p, 30)
c.Assert(logs.Len(), check.Equals, 0)

mockClock.Add(2 * ownerDDLPullerStuckWarnTimeout)
for i := 0; i < 20; i++ {
mockClock.Add(time.Second)
if logs.Len() > 0 {
break
}
time.Sleep(10 * time.Millisecond)
if i == 19 {
c.Fatal("warning log not printed")
}
}

mockPuller.appendResolvedTs(40)
waitResolvedTsGrowing(c, p, 40)
}

// waitResolvedTsGrowing can wait the first DDL reaches targetTs or if no pending
// DDL, DDL resolved ts reaches targetTs.
func waitResolvedTsGrowing(c *check.C, p DDLPuller, targetTs model.Ts) {
Expand Down

0 comments on commit 2971ac3

Please sign in to comment.