Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.2: changefeedccl: Fix initial scan checkpointing #97049

Merged
merged 1 commit into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 31 additions & 9 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ type changeAggregator struct {
// eventConsumer consumes the event.
eventConsumer eventConsumer

// lastFlush and flushFrequency keep track of the flush frequency.
lastFlush time.Time
flushFrequency time.Duration
lastHighWaterFlush time.Time // last time high watermark was checkpointed.
flushFrequency time.Duration // how often high watermark can be checkpointed.
lastSpanFlush time.Time // last time expensive, span based checkpoint was written.

// frontier keeps track of resolved timestamps for spans along with schema change
// boundary information.
Expand Down Expand Up @@ -291,6 +291,9 @@ func (ca *changeAggregator) Start(ctx context.Context) {
ca.cancel()
return
}

// Generate expensive checkpoint only after we ran for a while.
ca.lastSpanFlush = timeutil.Now()
}

func (ca *changeAggregator) startKVFeed(
Expand Down Expand Up @@ -321,7 +324,6 @@ func (ca *changeAggregator) startKVFeed(
// Trying to call MoveToDraining here is racy (`MoveToDraining called in
// state stateTrailingMeta`), so return the error via a channel.
ca.errCh <- kvfeed.Run(ctx, kvfeedCfg)
ca.cancel()
}); err != nil {
// If err != nil then the RunAsyncTask closure never ran, which means we
// need to manually close ca.kvFeedDoneCh so `(*changeAggregator).close`
Expand Down Expand Up @@ -407,6 +409,12 @@ func (ca *changeAggregator) setupSpansAndFrontier() (spans []roachpb.Span, err e
if err != nil {
return nil, err
}
if initialHighWater.IsEmpty() {
// If we are performing initial scan, set frontier initialHighWater
// to the StatementTime -- this is the time we will be scanning spans.
// Spans that reach this time are eligible for checkpointing.
ca.frontier.initialHighWater = ca.spec.Feed.StatementTime
}

checkpointedSpanTs := ca.spec.Checkpoint.Timestamp

Expand Down Expand Up @@ -536,6 +544,13 @@ func (ca *changeAggregator) tick() error {
// changeAggregator node to the changeFrontier node to allow the changeFrontier
// to persist the overall changefeed's progress
func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error {
if resolved.Timestamp.IsEmpty() {
// @0.0 resolved timestamps could come in from rangefeed checkpoint.
// When rangefeed starts running, it emits @0.0 resolved timestamp.
// We don't care about those as far as checkpointing concerned.
return nil
}

advanced, err := ca.frontier.ForwardResolvedSpan(resolved)
if err != nil {
return err
Expand All @@ -544,18 +559,25 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error
forceFlush := resolved.BoundaryType != jobspb.ResolvedSpan_NONE

checkpointFrontier := advanced &&
(forceFlush || timeutil.Since(ca.lastFlush) > ca.flushFrequency)
(forceFlush || timeutil.Since(ca.lastHighWaterFlush) > ca.flushFrequency)

if checkpointFrontier {
defer func() {
ca.lastHighWaterFlush = timeutil.Now()
}()
return ca.flushFrontier()
}

// At a lower frequency we checkpoint specific spans in the job progress
// either in backfills or if the highwater mark is excessively lagging behind
checkpointSpans := ca.spec.JobID != 0 && /* enterprise changefeed */
(resolved.Timestamp.Equal(ca.frontier.BackfillTS()) ||
ca.frontier.hasLaggingSpans(ca.spec.Feed.StatementTime, &ca.flowCtx.Cfg.Settings.SV)) &&
canCheckpointSpans(&ca.flowCtx.Cfg.Settings.SV, ca.lastFlush)
canCheckpointSpans(&ca.flowCtx.Cfg.Settings.SV, ca.lastSpanFlush)

if checkpointFrontier || checkpointSpans {
if checkpointSpans {
defer func() {
ca.lastFlush = timeutil.Now()
ca.lastSpanFlush = timeutil.Now()
}()
return ca.flushFrontier()
}
Expand Down Expand Up @@ -1032,7 +1054,7 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad
cf.frontier.boundaryType == jobspb.ResolvedSpan_RESTART) {
var err error
endTime := cf.spec.Feed.EndTime
if endTime.IsEmpty() || endTime.Less(cf.frontier.boundaryTime.Next()) {
if endTime.IsEmpty() || endTime.Less(cf.frontier.boundaryTime) {
err = pgerror.Newf(pgcode.SchemaChangeOccurred,
"schema change occurred at %v", cf.frontier.boundaryTime.Next().AsOfSystemTime())

Expand Down
11 changes: 8 additions & 3 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ func (f *kvFeed) run(ctx context.Context) (err error) {

for i := 0; ; i++ {
initialScan := i == 0
scannedSpans, scannedTS, err := f.scanIfShould(ctx, initialScan, rangeFeedResumeFrontier.Frontier())
initialScanOnly := f.endTime.EqOrdering(f.initialHighWater)
scannedSpans, scannedTS, err := f.scanIfShould(ctx, initialScan, initialScanOnly, rangeFeedResumeFrontier.Frontier())
if err != nil {
return err
}
Expand All @@ -266,7 +267,6 @@ func (f *kvFeed) run(ctx context.Context) (err error) {
}
}

initialScanOnly := f.endTime.EqOrdering(f.initialHighWater)
if initialScanOnly {
if err := emitResolved(f.initialHighWater, jobspb.ResolvedSpan_EXIT); err != nil {
return err
Expand Down Expand Up @@ -354,7 +354,7 @@ func filterCheckpointSpans(spans []roachpb.Span, completed []roachpb.Span) []roa
}

func (f *kvFeed) scanIfShould(
ctx context.Context, initialScan bool, highWater hlc.Timestamp,
ctx context.Context, initialScan bool, initialScanOnly bool, highWater hlc.Timestamp,
) ([]roachpb.Span, hlc.Timestamp, error) {
scanTime := highWater.Next()

Expand Down Expand Up @@ -421,11 +421,16 @@ func (f *kvFeed) scanIfShould(
defer f.onBackfillCallback()()
}

boundaryType := jobspb.ResolvedSpan_NONE
if initialScanOnly {
boundaryType = jobspb.ResolvedSpan_EXIT
}
if err := f.scanner.Scan(ctx, f.writer, scanConfig{
Spans: spansToBackfill,
Timestamp: scanTime,
WithDiff: !isInitialScan && f.withDiff,
Knobs: f.knobs,
Boundary: boundaryType,
}); err != nil {
return nil, hlc.Timestamp{}, err
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/ccl/changefeedccl/kvfeed/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type scanConfig struct {
Timestamp hlc.Timestamp
WithDiff bool
Knobs TestingKnobs
Boundary jobspb.ResolvedSpan_BoundaryType
}

type kvScanner interface {
Expand Down Expand Up @@ -116,7 +117,7 @@ func (p *scanRequestScanner) Scan(ctx context.Context, sink kvevent.Writer, cfg
defer limAlloc.Release()
defer spanAlloc.Release(ctx)

err := p.exportSpan(ctx, span, cfg.Timestamp, cfg.WithDiff, sink, cfg.Knobs)
err := p.exportSpan(ctx, span, cfg.Timestamp, cfg.Boundary, cfg.WithDiff, sink, cfg.Knobs)
finished := atomic.AddInt64(&atomicFinished, 1)
if backfillDec != nil {
backfillDec()
Expand All @@ -134,6 +135,7 @@ func (p *scanRequestScanner) exportSpan(
ctx context.Context,
span roachpb.Span,
ts hlc.Timestamp,
boundaryType jobspb.ResolvedSpan_BoundaryType,
withDiff bool,
sink kvevent.Writer,
knobs TestingKnobs,
Expand Down Expand Up @@ -191,7 +193,7 @@ func (p *scanRequestScanner) exportSpan(
if res.ResumeSpan != nil {
consumed := roachpb.Span{Key: remaining.Key, EndKey: res.ResumeSpan.Key}
if err := sink.Add(
ctx, kvevent.NewBackfillResolvedEvent(consumed, ts, jobspb.ResolvedSpan_NONE),
ctx, kvevent.NewBackfillResolvedEvent(consumed, ts, boundaryType),
); err != nil {
return err
}
Expand All @@ -200,7 +202,7 @@ func (p *scanRequestScanner) exportSpan(
}
// p.metrics.PollRequestNanosHist.RecordValue(scanDuration.Nanoseconds())
if err := sink.Add(
ctx, kvevent.NewBackfillResolvedEvent(span, ts, jobspb.ResolvedSpan_NONE),
ctx, kvevent.NewBackfillResolvedEvent(span, ts, boundaryType),
); err != nil {
return err
}
Expand Down