diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index e57f6cba5939..28658b9c7edb 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -80,9 +80,9 @@ type changeAggregator struct { // eventConsumer consumes the event. eventConsumer eventConsumer - // lastFlush and flushFrequency keep track of the flush frequency. - lastFlush time.Time - flushFrequency time.Duration + lastHighWaterFlush time.Time // last time high watermark was checkpointed. + flushFrequency time.Duration // how often high watermark can be checkpointed. + lastSpanFlush time.Time // last time expensive, span based checkpoint was written. // frontier keeps track of resolved timestamps for spans along with schema change // boundary information. @@ -291,6 +291,9 @@ func (ca *changeAggregator) Start(ctx context.Context) { ca.cancel() return } + + // Generate expensive checkpoint only after we ran for a while. + ca.lastSpanFlush = timeutil.Now() } func (ca *changeAggregator) startKVFeed( @@ -321,7 +324,6 @@ func (ca *changeAggregator) startKVFeed( // Trying to call MoveToDraining here is racy (`MoveToDraining called in // state stateTrailingMeta`), so return the error via a channel. ca.errCh <- kvfeed.Run(ctx, kvfeedCfg) - ca.cancel() }); err != nil { // If err != nil then the RunAsyncTask closure never ran, which means we // need to manually close ca.kvFeedDoneCh so `(*changeAggregator).close` @@ -407,6 +409,12 @@ func (ca *changeAggregator) setupSpansAndFrontier() (spans []roachpb.Span, err e if err != nil { return nil, err } + if initialHighWater.IsEmpty() { + // If we are performing initial scan, set frontier initialHighWater + // to the StatementTime -- this is the time we will be scanning spans. + // Spans that reach this time are eligible for checkpointing. + ca.frontier.initialHighWater = ca.spec.Feed.StatementTime + } checkpointedSpanTs := ca.spec.Checkpoint.Timestamp @@ -536,6 +544,13 @@ func (ca *changeAggregator) tick() error { // changeAggregator node to the changeFrontier node to allow the changeFrontier // to persist the overall changefeed's progress func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error { + if resolved.Timestamp.IsEmpty() { + // @0.0 resolved timestamps could come in from rangefeed checkpoint. + // When rangefeed starts running, it emits @0.0 resolved timestamp. + // We don't care about those as far as checkpointing concerned. + return nil + } + advanced, err := ca.frontier.ForwardResolvedSpan(resolved) if err != nil { return err @@ -544,18 +559,25 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error forceFlush := resolved.BoundaryType != jobspb.ResolvedSpan_NONE checkpointFrontier := advanced && - (forceFlush || timeutil.Since(ca.lastFlush) > ca.flushFrequency) + (forceFlush || timeutil.Since(ca.lastHighWaterFlush) > ca.flushFrequency) + + if checkpointFrontier { + defer func() { + ca.lastHighWaterFlush = timeutil.Now() + }() + return ca.flushFrontier() + } // At a lower frequency we checkpoint specific spans in the job progress // either in backfills or if the highwater mark is excessively lagging behind checkpointSpans := ca.spec.JobID != 0 && /* enterprise changefeed */ (resolved.Timestamp.Equal(ca.frontier.BackfillTS()) || ca.frontier.hasLaggingSpans(ca.spec.Feed.StatementTime, &ca.flowCtx.Cfg.Settings.SV)) && - canCheckpointSpans(&ca.flowCtx.Cfg.Settings.SV, ca.lastFlush) + canCheckpointSpans(&ca.flowCtx.Cfg.Settings.SV, ca.lastSpanFlush) - if checkpointFrontier || checkpointSpans { + if checkpointSpans { defer func() { - ca.lastFlush = timeutil.Now() + ca.lastSpanFlush = timeutil.Now() }() return ca.flushFrontier() } @@ -1032,7 +1054,7 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad cf.frontier.boundaryType == jobspb.ResolvedSpan_RESTART) { var err error endTime := cf.spec.Feed.EndTime - if endTime.IsEmpty() || endTime.Less(cf.frontier.boundaryTime.Next()) { + if endTime.IsEmpty() || endTime.Less(cf.frontier.boundaryTime) { err = pgerror.Newf(pgcode.SchemaChangeOccurred, "schema change occurred at %v", cf.frontier.boundaryTime.Next().AsOfSystemTime()) diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 412c150231fb..6aeb9efbdd31 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -253,7 +253,8 @@ func (f *kvFeed) run(ctx context.Context) (err error) { for i := 0; ; i++ { initialScan := i == 0 - scannedSpans, scannedTS, err := f.scanIfShould(ctx, initialScan, rangeFeedResumeFrontier.Frontier()) + initialScanOnly := f.endTime.EqOrdering(f.initialHighWater) + scannedSpans, scannedTS, err := f.scanIfShould(ctx, initialScan, initialScanOnly, rangeFeedResumeFrontier.Frontier()) if err != nil { return err } @@ -266,7 +267,6 @@ func (f *kvFeed) run(ctx context.Context) (err error) { } } - initialScanOnly := f.endTime.EqOrdering(f.initialHighWater) if initialScanOnly { if err := emitResolved(f.initialHighWater, jobspb.ResolvedSpan_EXIT); err != nil { return err @@ -354,7 +354,7 @@ func filterCheckpointSpans(spans []roachpb.Span, completed []roachpb.Span) []roa } func (f *kvFeed) scanIfShould( - ctx context.Context, initialScan bool, highWater hlc.Timestamp, + ctx context.Context, initialScan bool, initialScanOnly bool, highWater hlc.Timestamp, ) ([]roachpb.Span, hlc.Timestamp, error) { scanTime := highWater.Next() @@ -421,11 +421,16 @@ func (f *kvFeed) scanIfShould( defer f.onBackfillCallback()() } + boundaryType := jobspb.ResolvedSpan_NONE + if initialScanOnly { + boundaryType = jobspb.ResolvedSpan_EXIT + } if err := f.scanner.Scan(ctx, f.writer, scanConfig{ Spans: spansToBackfill, Timestamp: scanTime, WithDiff: !isInitialScan && f.withDiff, Knobs: f.knobs, + Boundary: boundaryType, }); err != nil { return nil, hlc.Timestamp{}, err } diff --git a/pkg/ccl/changefeedccl/kvfeed/scanner.go b/pkg/ccl/changefeedccl/kvfeed/scanner.go index bc8cc6939d98..be8d7e44f1c2 100644 --- a/pkg/ccl/changefeedccl/kvfeed/scanner.go +++ b/pkg/ccl/changefeedccl/kvfeed/scanner.go @@ -39,6 +39,7 @@ type scanConfig struct { Timestamp hlc.Timestamp WithDiff bool Knobs TestingKnobs + Boundary jobspb.ResolvedSpan_BoundaryType } type kvScanner interface { @@ -116,7 +117,7 @@ func (p *scanRequestScanner) Scan(ctx context.Context, sink kvevent.Writer, cfg defer limAlloc.Release() defer spanAlloc.Release(ctx) - err := p.exportSpan(ctx, span, cfg.Timestamp, cfg.WithDiff, sink, cfg.Knobs) + err := p.exportSpan(ctx, span, cfg.Timestamp, cfg.Boundary, cfg.WithDiff, sink, cfg.Knobs) finished := atomic.AddInt64(&atomicFinished, 1) if backfillDec != nil { backfillDec() @@ -134,6 +135,7 @@ func (p *scanRequestScanner) exportSpan( ctx context.Context, span roachpb.Span, ts hlc.Timestamp, + boundaryType jobspb.ResolvedSpan_BoundaryType, withDiff bool, sink kvevent.Writer, knobs TestingKnobs, @@ -191,7 +193,7 @@ func (p *scanRequestScanner) exportSpan( if res.ResumeSpan != nil { consumed := roachpb.Span{Key: remaining.Key, EndKey: res.ResumeSpan.Key} if err := sink.Add( - ctx, kvevent.NewBackfillResolvedEvent(consumed, ts, jobspb.ResolvedSpan_NONE), + ctx, kvevent.NewBackfillResolvedEvent(consumed, ts, boundaryType), ); err != nil { return err } @@ -200,7 +202,7 @@ func (p *scanRequestScanner) exportSpan( } // p.metrics.PollRequestNanosHist.RecordValue(scanDuration.Nanoseconds()) if err := sink.Add( - ctx, kvevent.NewBackfillResolvedEvent(span, ts, jobspb.ResolvedSpan_NONE), + ctx, kvevent.NewBackfillResolvedEvent(span, ts, boundaryType), ); err != nil { return err }