Skip to content

Commit

Permalink
Merge #37009 #37030
Browse files Browse the repository at this point in the history
37009: changefeedccl: fix job progress regressing r=nvanbenschoten a=danhhz

When the ChangeFrontier processor for a changefeed is started up, it
creates a spanFrontier to track the overall progression for the feed.
Each watched span is set to the empty timestamp so we can tell the
difference between a rangefeed being behind and one still initializing.

However, rangefeed will happily return resolved timestamps less than the
one it was started with. If we feed this into the spanFrontier, it can
report a timestamp behind the one everything started up at. In the case
of a new changefeed, this is fine (though perhaps odd). In the case of a
changefeed being restarted from the job's high-water, this means we can
overwrite the high-water progress with the lower value, regressing it.
At the least, this is very surprising UX, but it's easy to argue that
it's a correctness issue.

Touches #36879

Release note (bug fix): Fixed a bug where `CHANGEFEED` job progress
would regress when the job was restarted.

37030: changefeedccl: log more details when returning with non-retryable errors r=tbg a=danhhz

The `%v` and `%+v` were switched from what I intended in the line that
logs retryable errors and the one that handles non-retryable errors.

Release note: None

Co-authored-by: Daniel Harrison <daniel.harrison@gmail.com>
  • Loading branch information
craig[bot] and danhhz committed Apr 23, 2019
3 parents ec4728a + 6ee3261 + ac74be9 commit 7424c53
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 145 deletions.
24 changes: 24 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,10 @@ type changeFrontier struct {
// jobProgressedFn, if non-nil, is called to checkpoint the changefeed's
// progress in the corresponding system job entry.
jobProgressedFn func(context.Context, jobs.HighWaterProgressedFn) error
// highWaterAtStart is the greater of the job high-water and the timestamp the
// CHANGEFEED statement was run at. It's used in an assertion that we never
// regress the job high-water.
highWaterAtStart hlc.Timestamp
// passthroughBuf, in some but not all flows, contains changed row data to
// pass through unchanged to the gateway node.
passthroughBuf encDatumRowBuffer
Expand Down Expand Up @@ -441,13 +445,19 @@ func (cf *changeFrontier) Start(ctx context.Context) context.Context {
cf.sink = makeMetricsSink(cf.metrics, cf.sink)
cf.sink = &errorWrapperSink{wrapped: cf.sink}

cf.highWaterAtStart = cf.spec.Feed.StatementTime
if cf.spec.JobID != 0 {
job, err := cf.flowCtx.JobRegistry.LoadJob(ctx, cf.spec.JobID)
if err != nil {
cf.MoveToDraining(err)
return ctx
}
cf.jobProgressedFn = job.HighWaterProgressed

p := job.Progress()
if ts := p.GetHighWater(); ts != nil {
cf.highWaterAtStart.Forward(*ts)
}
}

cf.metrics.mu.Lock()
Expand Down Expand Up @@ -535,6 +545,20 @@ func (cf *changeFrontier) noteResolvedSpan(d sqlbase.EncDatum) error {
`unmarshalling resolved span: %x`, raw)
}

// Inserting a timestamp less than the one the changefeed flow started at
// could potentially regress the job progress. This is not expected, but it
// was a bug at one point, so assert to prevent regressions.
//
// TODO(dan): This is much more naturally expressed as an assert inside the
// job progress update closure, but it currently doesn't pass along the info
// we'd need to do it that way.
if !resolved.Timestamp.IsEmpty() && resolved.Timestamp.Less(cf.highWaterAtStart) {
log.ReportOrPanic(cf.Ctx, &cf.flowCtx.Settings.SV,
`got a span level timestamp %s for %s that is less than the initial high-water %s`,
log.Safe(resolved.Timestamp), resolved.Span, log.Safe(cf.highWaterAtStart))
return nil
}

frontierChanged := cf.sf.Forward(resolved.Span, resolved.Timestamp)
if frontierChanged {
newResolved := cf.sf.Frontier()
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,11 +456,11 @@ func (b *changefeedResumer) Resume(
return nil
}
if !IsRetryableError(err) {
log.Warningf(ctx, `CHANGEFEED job %d returning with error: %v`, jobID, err)
log.Warningf(ctx, `CHANGEFEED job %d returning with error: %+v`, jobID, err)
return err
}

log.Warningf(ctx, `CHANGEFEED job %d encountered retryable error: %+v`, jobID, err)
log.Warningf(ctx, `CHANGEFEED job %d encountered retryable error: %v`, jobID, err)
if metrics, ok := execCfg.JobRegistry.MetricsStruct().Changefeed.(*Metrics); ok {
metrics.ErrorRetries.Inc(1)
}
Expand Down
18 changes: 10 additions & 8 deletions pkg/ccl/changefeedccl/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,16 +279,12 @@ func (p *poller) rangefeedImpl(ctx context.Context) error {
// the faster-to-implement solution for now.
frontier := makeSpanFrontier(spans...)

rangeFeedStartTS := lastHighwater
for _, span := range p.spans {
req := &roachpb.RangeFeedRequest{
Header: roachpb.Header{
Timestamp: lastHighwater,
},
Span: span,
}
frontier.Forward(span, lastHighwater)
span := span
frontier.Forward(span, rangeFeedStartTS)
g.GoCtx(func(ctx context.Context) error {
return ds.RangeFeed(ctx, req, eventC)
return ds.RangeFeed(ctx, span, rangeFeedStartTS, eventC)
})
}
g.GoCtx(func(ctx context.Context) error {
Expand All @@ -302,6 +298,12 @@ func (p *poller) rangefeedImpl(ctx context.Context) error {
return err
}
case *roachpb.RangeFeedCheckpoint:
if !t.ResolvedTS.IsEmpty() && t.ResolvedTS.Less(rangeFeedStartTS) {
// RangeFeed happily forwards any closed timestamps it receives as
// soon as there are no outstanding intents under them.
// Changefeeds don't care about these at all, so throw them out.
continue
}
if err := memBuf.AddResolved(ctx, t.Span, t.ResolvedTS); err != nil {
return err
}
Expand Down
11 changes: 7 additions & 4 deletions pkg/kv/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,21 @@ type singleRangeInfo struct {
// RangeFeed divides a RangeFeed request on range boundaries and establishes a
// RangeFeed to each of the individual ranges. It streams back results on the
// provided channel.
//
// Note that the timestamps in RangeFeedCheckpoint events that are streamed back
// may be lower than the timestamp given here.
func (ds *DistSender) RangeFeed(
ctx context.Context, args *roachpb.RangeFeedRequest, eventCh chan<- *roachpb.RangeFeedEvent,
ctx context.Context, span roachpb.Span, ts hlc.Timestamp, eventCh chan<- *roachpb.RangeFeedEvent,
) error {
ctx = ds.AnnotateCtx(ctx)
ctx, sp := tracing.EnsureChildSpan(ctx, ds.AmbientContext.Tracer, "dist sender")
defer sp.Finish()

startRKey, err := keys.Addr(args.Span.Key)
startRKey, err := keys.Addr(span.Key)
if err != nil {
return err
}
endRKey, err := keys.Addr(args.Span.EndKey)
endRKey, err := keys.Addr(span.EndKey)
if err != nil {
return err
}
Expand All @@ -76,7 +79,7 @@ func (ds *DistSender) RangeFeed(

// Kick off the initial set of ranges.
g.GoCtx(func(ctx context.Context) error {
return ds.divideAndSendRangeFeedToRanges(ctx, rs, args.Timestamp, rangeCh)
return ds.divideAndSendRangeFeedToRanges(ctx, rs, ts, rangeCh)
})

return g.Wait()
Expand Down
Loading

0 comments on commit 7424c53

Please sign in to comment.