Skip to content

Commit

Permalink
changefeedccl: Fix initial scan checkpointing
Browse files Browse the repository at this point in the history
An over than 2 year old change
(#71848)
that added support for checkpointing during backfill after schema change,
inadvertently broke initial scan checkpointing funcitonality

Exacerbating the problem, the existing test
`TestChangefeedBackfillCheckpoint` continued to work fine.
Treason why it was passing was because the test was looking
for a checkpoint whose timestamp matched bacfill timestamp.
The bug involved incorrect initialize/use of 0 timestamp.
It just so happens, that after initial scan completes, the
rangefeed starts, and the very first thing it does is to
generate a 0 timestamp checkpoint.  So, the test was
observing this event, and continued to work.
This PR does not have a dedicated test because the existing
tests work fine -- provided we ignore 0 timestamp checkpoint,
which is what this PR does in addition to addressing
the root cause of the bug.

Informs #96959

Release note (enterprise change): Fix a bug in changefeeds, where
long running initial scans will fail to generate checkpoint.
Failure to generate checkpoint is particularly bad if the
changefeed restarts for whatever reason.  Without checkpoints,
the changefeed will restart from the beginning, and in the worst
case, when exporting substantially sized tables, changefeed
initial scan may have hard time completing.
  • Loading branch information
Yevgeniy Miretskiy committed Feb 12, 2023
1 parent 5f310bc commit 495dc98
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 15 deletions.
40 changes: 31 additions & 9 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ type changeAggregator struct {
// eventConsumer consumes the event.
eventConsumer eventConsumer

// lastFlush and flushFrequency keep track of the flush frequency.
lastFlush time.Time
flushFrequency time.Duration
lastHighWaterFlush time.Time // last time high watermark was checkpointed.
flushFrequency time.Duration // how often high watermark can be checkpointed.
lastSpanFlush time.Time // last time expensive, span based checkpoint was written.

// frontier keeps track of resolved timestamps for spans along with schema change
// boundary information.
Expand Down Expand Up @@ -291,6 +291,9 @@ func (ca *changeAggregator) Start(ctx context.Context) {
ca.cancel()
return
}

// Generate expensive checkpoint only after we ran for a while.
ca.lastSpanFlush = timeutil.Now()
}

func (ca *changeAggregator) startKVFeed(
Expand Down Expand Up @@ -321,7 +324,6 @@ func (ca *changeAggregator) startKVFeed(
// Trying to call MoveToDraining here is racy (`MoveToDraining called in
// state stateTrailingMeta`), so return the error via a channel.
ca.errCh <- kvfeed.Run(ctx, kvfeedCfg)
ca.cancel()
}); err != nil {
// If err != nil then the RunAsyncTask closure never ran, which means we
// need to manually close ca.kvFeedDoneCh so `(*changeAggregator).close`
Expand Down Expand Up @@ -407,6 +409,12 @@ func (ca *changeAggregator) setupSpansAndFrontier() (spans []roachpb.Span, err e
if err != nil {
return nil, err
}
if initialHighWater.IsEmpty() {
// If we are performing initial scan, set frontier initialHighWater
// to the StatementTime -- this is the time we will be scanning spans.
// Spans that reach this time are eligible for checkpointing.
ca.frontier.initialHighWater = ca.spec.Feed.StatementTime
}

checkpointedSpanTs := ca.spec.Checkpoint.Timestamp

Expand Down Expand Up @@ -535,6 +543,13 @@ func (ca *changeAggregator) tick() error {
// changeAggregator node to the changeFrontier node to allow the changeFrontier
// to persist the overall changefeed's progress
func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error {
if resolved.Timestamp.IsEmpty() {
// @0.0 resolved timestamps could come in from rangefeed checkpoint.
// When rangefeed starts running, it emits @0.0 resolved timestamp.
// We don't care about those as far as checkpointing concerned.
return nil
}

advanced, err := ca.frontier.ForwardResolvedSpan(resolved)
if err != nil {
return err
Expand All @@ -543,18 +558,25 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error
forceFlush := resolved.BoundaryType != jobspb.ResolvedSpan_NONE

checkpointFrontier := advanced &&
(forceFlush || timeutil.Since(ca.lastFlush) > ca.flushFrequency)
(forceFlush || timeutil.Since(ca.lastHighWaterFlush) > ca.flushFrequency)

if checkpointFrontier {
defer func() {
ca.lastHighWaterFlush = timeutil.Now()
}()
return ca.flushFrontier()
}

// At a lower frequency we checkpoint specific spans in the job progress
// either in backfills or if the highwater mark is excessively lagging behind
checkpointSpans := ca.spec.JobID != 0 && /* enterprise changefeed */
(resolved.Timestamp.Equal(ca.frontier.BackfillTS()) ||
ca.frontier.hasLaggingSpans(ca.spec.Feed.StatementTime, &ca.flowCtx.Cfg.Settings.SV)) &&
canCheckpointSpans(&ca.flowCtx.Cfg.Settings.SV, ca.lastFlush)
canCheckpointSpans(&ca.flowCtx.Cfg.Settings.SV, ca.lastSpanFlush)

if checkpointFrontier || checkpointSpans {
if checkpointSpans {
defer func() {
ca.lastFlush = timeutil.Now()
ca.lastSpanFlush = timeutil.Now()
}()
return ca.flushFrontier()
}
Expand Down Expand Up @@ -1029,7 +1051,7 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad
cf.frontier.boundaryType == jobspb.ResolvedSpan_RESTART) {
var err error
endTime := cf.spec.Feed.EndTime
if endTime.IsEmpty() || endTime.Less(cf.frontier.boundaryTime.Next()) {
if endTime.IsEmpty() || endTime.Less(cf.frontier.boundaryTime) {
err = pgerror.Newf(pgcode.SchemaChangeOccurred,
"schema change occurred at %v", cf.frontier.boundaryTime.Next().AsOfSystemTime())

Expand Down
11 changes: 8 additions & 3 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ func (f *kvFeed) run(ctx context.Context) (err error) {

for i := 0; ; i++ {
initialScan := i == 0
scannedSpans, scannedTS, err := f.scanIfShould(ctx, initialScan, rangeFeedResumeFrontier.Frontier())
initialScanOnly := f.endTime.EqOrdering(f.initialHighWater)
scannedSpans, scannedTS, err := f.scanIfShould(ctx, initialScan, initialScanOnly, rangeFeedResumeFrontier.Frontier())
if err != nil {
return err
}
Expand All @@ -270,7 +271,6 @@ func (f *kvFeed) run(ctx context.Context) (err error) {
}
}

initialScanOnly := f.endTime.EqOrdering(f.initialHighWater)
if initialScanOnly {
if err := emitResolved(f.initialHighWater, jobspb.ResolvedSpan_EXIT); err != nil {
return err
Expand Down Expand Up @@ -358,7 +358,7 @@ func filterCheckpointSpans(spans []roachpb.Span, completed []roachpb.Span) []roa
}

func (f *kvFeed) scanIfShould(
ctx context.Context, initialScan bool, highWater hlc.Timestamp,
ctx context.Context, initialScan bool, initialScanOnly bool, highWater hlc.Timestamp,
) ([]roachpb.Span, hlc.Timestamp, error) {
scanTime := highWater.Next()

Expand Down Expand Up @@ -425,11 +425,16 @@ func (f *kvFeed) scanIfShould(
defer f.onBackfillCallback()()
}

boundaryType := jobspb.ResolvedSpan_NONE
if initialScanOnly {
boundaryType = jobspb.ResolvedSpan_EXIT
}
if err := f.scanner.Scan(ctx, f.writer, scanConfig{
Spans: spansToBackfill,
Timestamp: scanTime,
WithDiff: !isInitialScan && f.withDiff,
Knobs: f.knobs,
Boundary: boundaryType,
}); err != nil {
return nil, hlc.Timestamp{}, err
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/ccl/changefeedccl/kvfeed/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type scanConfig struct {
Timestamp hlc.Timestamp
WithDiff bool
Knobs TestingKnobs
Boundary jobspb.ResolvedSpan_BoundaryType
}

type kvScanner interface {
Expand Down Expand Up @@ -116,7 +117,7 @@ func (p *scanRequestScanner) Scan(ctx context.Context, sink kvevent.Writer, cfg
defer limAlloc.Release()
defer spanAlloc.Release(ctx)

err := p.exportSpan(ctx, span, cfg.Timestamp, cfg.WithDiff, sink, cfg.Knobs)
err := p.exportSpan(ctx, span, cfg.Timestamp, cfg.Boundary, cfg.WithDiff, sink, cfg.Knobs)
finished := atomic.AddInt64(&atomicFinished, 1)
if backfillDec != nil {
backfillDec()
Expand All @@ -134,6 +135,7 @@ func (p *scanRequestScanner) exportSpan(
ctx context.Context,
span roachpb.Span,
ts hlc.Timestamp,
boundaryType jobspb.ResolvedSpan_BoundaryType,
withDiff bool,
sink kvevent.Writer,
knobs TestingKnobs,
Expand Down Expand Up @@ -191,7 +193,7 @@ func (p *scanRequestScanner) exportSpan(
if res.ResumeSpan != nil {
consumed := roachpb.Span{Key: remaining.Key, EndKey: res.ResumeSpan.Key}
if err := sink.Add(
ctx, kvevent.NewBackfillResolvedEvent(consumed, ts, jobspb.ResolvedSpan_NONE),
ctx, kvevent.NewBackfillResolvedEvent(consumed, ts, boundaryType),
); err != nil {
return err
}
Expand All @@ -200,7 +202,7 @@ func (p *scanRequestScanner) exportSpan(
}
// p.metrics.PollRequestNanosHist.RecordValue(scanDuration.Nanoseconds())
if err := sink.Add(
ctx, kvevent.NewBackfillResolvedEvent(span, ts, jobspb.ResolvedSpan_NONE),
ctx, kvevent.NewBackfillResolvedEvent(span, ts, boundaryType),
); err != nil {
return err
}
Expand Down

0 comments on commit 495dc98

Please sign in to comment.