diff --git a/cdc/owner/ddl_puller.go b/cdc/owner/ddl_puller.go index a7a38821ce0..44fed098be3 100644 --- a/cdc/owner/ddl_puller.go +++ b/cdc/owner/ddl_puller.go @@ -16,7 +16,9 @@ package owner import ( "context" "sync" + "time" + "github.com/benbjohnson/clock" "github.com/pingcap/errors" "github.com/pingcap/log" timodel "github.com/pingcap/parser/model" @@ -31,6 +33,10 @@ import ( "golang.org/x/sync/errgroup" ) +const ( + ownerDDLPullerStuckWarnTimeout = 30 * time.Second +) + // DDLPuller is a wrapper of the Puller interface for the owner // DDLPuller starts a puller, listens to the DDL range, adds the received DDLs into an internal queue type DDLPuller interface { @@ -53,6 +59,8 @@ type ddlPullerImpl struct { pendingDDLJobs []*timodel.Job lastDDLJobID int64 cancel context.CancelFunc + + clock clock.Clock } func newDDLPuller(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) { @@ -80,6 +88,7 @@ func newDDLPuller(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) { resolvedTS: startTs, filter: f, cancel: func() {}, + clock: clock.New(), }, nil } @@ -91,6 +100,7 @@ func (h *ddlPullerImpl) Run(ctx cdcContext.Context) error { stdCtx = util.PutChangefeedIDInCtx(stdCtx, ctx.ChangefeedVars().ID) stdCtx = util.PutRoleInCtx(stdCtx, util.RoleProcessor) errg, stdCtx := errgroup.WithContext(stdCtx) + lastResolvedTsAdanvcedTime := h.clock.Now() errg.Go(func() error { return h.puller.Run(stdCtx) @@ -106,6 +116,7 @@ func (h *ddlPullerImpl) Run(ctx cdcContext.Context) error { h.mu.Lock() defer h.mu.Unlock() if rawDDL.CRTs > h.resolvedTS { + lastResolvedTsAdanvcedTime = h.clock.Now() h.resolvedTS = rawDDL.CRTs } return nil @@ -132,11 +143,22 @@ func (h *ddlPullerImpl) Run(ctx cdcContext.Context) error { return nil } + ticker := h.clock.Ticker(ownerDDLPullerStuckWarnTimeout) + defer ticker.Stop() + errg.Go(func() error { for { select { case <-stdCtx.Done(): return stdCtx.Err() + case <-ticker.C: + duration := h.clock.Since(lastResolvedTsAdanvcedTime) + if duration > ownerDDLPullerStuckWarnTimeout { + log.Warn("ddl puller resolved ts has not advanced", + zap.String("changefeed-id", ctx.ChangefeedVars().ID), + zap.Duration("duration", duration), + zap.Uint64("resolved-ts", h.resolvedTS)) + } case e := <-rawDDLCh: if err := receiveDDL(e); err != nil { return errors.Trace(err) diff --git a/cdc/owner/ddl_puller_test.go b/cdc/owner/ddl_puller_test.go index 4be256a4c79..4c91457b287 100644 --- a/cdc/owner/ddl_puller_test.go +++ b/cdc/owner/ddl_puller_test.go @@ -18,15 +18,20 @@ import ( "encoding/json" "sync" "sync/atomic" + "time" + "github.com/benbjohnson/clock" "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/pingcap/log" timodel "github.com/pingcap/parser/model" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tiflow/cdc/model" cdcContext "github.com/pingcap/tiflow/pkg/context" "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/util/testleak" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" ) var _ = check.Suite(&ddlPullerSuite{}) @@ -220,6 +225,71 @@ func (s *ddlPullerSuite) TestPuller(c *check.C) { c.Assert(ddl, check.IsNil) } +func (*ddlPullerSuite) TestResolvedTsStuck(c *check.C) { + defer testleak.AfterTest(c)() + // For observing the logs + zapcore, logs := observer.New(zap.WarnLevel) + conf := &log.Config{Level: "warn", File: log.FileLogConfig{}} + _, r, _ := log.InitLogger(conf) + logger := zap.New(zapcore) + log.ReplaceGlobals(logger, r) + defer func() { + logger, r, err := log.InitLogger(conf) + c.Assert(err, check.IsNil) + log.ReplaceGlobals(logger, r) + }() + + startTs := uint64(10) + mockPuller := newMockPuller(c, startTs) + ctx := cdcContext.NewBackendContext4Test(true) + p, err := newDDLPuller(ctx, startTs) + c.Assert(err, check.IsNil) + + mockClock := clock.NewMock() + p.(*ddlPullerImpl).clock = mockClock + + p.(*ddlPullerImpl).puller = mockPuller + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + err := p.Run(ctx) + if errors.Cause(err) == context.Canceled { + err = nil + } + c.Assert(err, check.IsNil) + }() + defer wg.Wait() + defer p.Close() + + // test initialize state + resolvedTs, ddl := p.FrontDDL() + c.Assert(resolvedTs, check.Equals, startTs) + c.Assert(ddl, check.IsNil) + resolvedTs, ddl = p.PopFrontDDL() + c.Assert(resolvedTs, check.Equals, startTs) + c.Assert(ddl, check.IsNil) + + mockPuller.appendResolvedTs(30) + waitResolvedTsGrowing(c, p, 30) + c.Assert(logs.Len(), check.Equals, 0) + + mockClock.Add(2 * ownerDDLPullerStuckWarnTimeout) + for i := 0; i < 20; i++ { + mockClock.Add(time.Second) + if logs.Len() > 0 { + break + } + time.Sleep(10 * time.Millisecond) + if i == 19 { + c.Fatal("warning log not printed") + } + } + + mockPuller.appendResolvedTs(40) + waitResolvedTsGrowing(c, p, 40) +} + // waitResolvedTsGrowing can wait the first DDL reaches targetTs or if no pending // DDL, DDL resolved ts reaches targetTs. func waitResolvedTsGrowing(c *check.C, p DDLPuller, targetTs model.Ts) {