Skip to content

Commit

Permalink
changefeedccl: fix job progress regressing
Browse files Browse the repository at this point in the history
When the ChangeFrontier processor for a changefeed is started up, it
creates a spanFrontier to track the overall progression for the feed.
Each watched span is set to the empty timestamp so we can tell the
difference between a rangefeed being behind and one still initializing.

However, rangefeed will happily return resolved timestamps less than the
one it was started with. If we feed this into the spanFrontier, it can
report a timestamp behind the one everything started up at. In the case
of a new changefeed, this is fine (though perhaps odd). In the case of a
changefeed being restarted from the job's high-water, this means we can
overwrite the high-water progress with the lower value, regressing it.
At the least, this is very surprising UX, but it's easy to argue that
it's a correctness issue.

Touches cockroachdb#36879

Release note (bug fix): Fixed a bug where `CHANGEFEED` job progress
would regress when the job was restarted.
  • Loading branch information
danhhz committed Apr 22, 2019
1 parent b46cf9e commit 253126e
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 0 deletions.
24 changes: 24 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,10 @@ type changeFrontier struct {
// jobProgressedFn, if non-nil, is called to checkpoint the changefeed's
// progress in the corresponding system job entry.
jobProgressedFn func(context.Context, jobs.HighWaterProgressedFn) error
// highWaterAtStart is the greater of the job high-water and the timestamp the
// CHANGEFEED statement was run at. It's used in an assertion that we never
// regress the job high-water.
highWaterAtStart hlc.Timestamp
// passthroughBuf, in some but not all flows, contains changed row data to
// pass through unchanged to the gateway node.
passthroughBuf encDatumRowBuffer
Expand Down Expand Up @@ -441,13 +445,19 @@ func (cf *changeFrontier) Start(ctx context.Context) context.Context {
cf.sink = makeMetricsSink(cf.metrics, cf.sink)
cf.sink = &errorWrapperSink{wrapped: cf.sink}

cf.highWaterAtStart = cf.spec.Feed.StatementTime
if cf.spec.JobID != 0 {
job, err := cf.flowCtx.JobRegistry.LoadJob(ctx, cf.spec.JobID)
if err != nil {
cf.MoveToDraining(err)
return ctx
}
cf.jobProgressedFn = job.HighWaterProgressed

p := job.Progress()
if ts := p.GetHighWater(); ts != nil {
cf.highWaterAtStart.Forward(*ts)
}
}

cf.metrics.mu.Lock()
Expand Down Expand Up @@ -535,6 +545,20 @@ func (cf *changeFrontier) noteResolvedSpan(d sqlbase.EncDatum) error {
`unmarshalling resolved span: %x`, raw)
}

// Inserting a timestamp less than the one the changefeed flow started at
// could potentially regress the job progress. This is not expected, but it
// was a bug at one point, so assert to prevent regressions.
//
// TODO(dan): This is much more naturally expressed as an assert inside the
// job progress update closure, but it currently doesn't pass along the info
// we'd need to do it that way.
if !resolved.Timestamp.IsEmpty() && resolved.Timestamp.Less(cf.highWaterAtStart) {
log.ReportOrPanic(cf.Ctx, &cf.flowCtx.Settings.SV,
`got a span level timestamp %s for %s that is less than the initial high-water %s`,
log.Safe(resolved.Timestamp), resolved.Span, log.Safe(cf.highWaterAtStart))
return nil
}

frontierChanged := cf.sf.Forward(resolved.Span, resolved.Timestamp)
if frontierChanged {
newResolved := cf.sf.Frontier()
Expand Down
6 changes: 6 additions & 0 deletions pkg/ccl/changefeedccl/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,12 @@ func (p *poller) rangefeedImpl(ctx context.Context) error {
return err
}
case *roachpb.RangeFeedCheckpoint:
if !t.ResolvedTS.IsEmpty() && t.ResolvedTS.Less(rangeFeedStartTS) {
// RangeFeed happily forwards any closed timestamps it receives as
// soon as there are no outstanding intents under them.
// Changefeeds don't care about these at all, so throw them out.
continue
}
if err := memBuf.AddResolved(ctx, t.Span, t.ResolvedTS); err != nil {
return err
}
Expand Down

0 comments on commit 253126e

Please sign in to comment.