Skip to content

Commit

Permalink
Merge pull request #37091 from danhhz/backport19.1-37009
Browse files Browse the repository at this point in the history
release-19.1: changefeedccl: fix job progress regressing
  • Loading branch information
danhhz authored May 6, 2019
2 parents 16b13ad + 1d09910 commit 6ce75dc
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 2 deletions.
24 changes: 24 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,10 @@ type changeFrontier struct {
// jobProgressedFn, if non-nil, is called to checkpoint the changefeed's
// progress in the corresponding system job entry.
jobProgressedFn func(context.Context, jobs.HighWaterProgressedFn) error
// highWaterAtStart is the greater of the job high-water and the timestamp the
// CHANGEFEED statement was run at. It's used in an assertion that we never
// regress the job high-water.
highWaterAtStart hlc.Timestamp
// passthroughBuf, in some but not all flows, contains changed row data to
// pass through unchanged to the gateway node.
passthroughBuf encDatumRowBuffer
Expand Down Expand Up @@ -440,13 +444,19 @@ func (cf *changeFrontier) Start(ctx context.Context) context.Context {
cf.sink = makeMetricsSink(cf.metrics, cf.sink)
cf.sink = &errorWrapperSink{wrapped: cf.sink}

cf.highWaterAtStart = cf.spec.Feed.StatementTime
if cf.spec.JobID != 0 {
job, err := cf.flowCtx.JobRegistry.LoadJob(ctx, cf.spec.JobID)
if err != nil {
cf.MoveToDraining(err)
return ctx
}
cf.jobProgressedFn = job.HighWaterProgressed

p := job.Progress()
if ts := p.GetHighWater(); ts != nil {
cf.highWaterAtStart.Forward(*ts)
}
}

cf.metrics.mu.Lock()
Expand Down Expand Up @@ -533,6 +543,20 @@ func (cf *changeFrontier) noteResolvedSpan(d sqlbase.EncDatum) error {
return errors.Wrapf(err, `unmarshalling resolved span: %x`, raw)
}

// Inserting a timestamp less than the one the changefeed flow started at
// could potentially regress the job progress. This is not expected, but it
// was a bug at one point, so assert to prevent regressions.
//
// TODO(dan): This is much more naturally expressed as an assert inside the
// job progress update closure, but it currently doesn't pass along the info
// we'd need to do it that way.
if !resolved.Timestamp.IsEmpty() && resolved.Timestamp.Less(cf.highWaterAtStart) {
log.ReportOrPanic(cf.Ctx, &cf.flowCtx.Settings.SV,
`got a span level timestamp %s for %s that is less than the initial high-water %s`,
log.Safe(resolved.Timestamp), resolved.Span, log.Safe(cf.highWaterAtStart))
return nil
}

frontierChanged := cf.sf.Forward(resolved.Span, resolved.Timestamp)
if frontierChanged {
newResolved := cf.sf.Frontier()
Expand Down
11 changes: 9 additions & 2 deletions pkg/ccl/changefeedccl/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,14 +279,15 @@ func (p *poller) rangefeedImpl(ctx context.Context) error {
// the faster-to-implement solution for now.
frontier := makeSpanFrontier(spans...)

rangeFeedStartTS := lastHighwater
for _, span := range p.spans {
req := &roachpb.RangeFeedRequest{
Header: roachpb.Header{
Timestamp: lastHighwater,
Timestamp: rangeFeedStartTS,
},
Span: span,
}
frontier.Forward(span, lastHighwater)
frontier.Forward(span, rangeFeedStartTS)
g.GoCtx(func(ctx context.Context) error {
return ds.RangeFeed(ctx, req, eventC)
})
Expand All @@ -302,6 +303,12 @@ func (p *poller) rangefeedImpl(ctx context.Context) error {
return err
}
case *roachpb.RangeFeedCheckpoint:
if !t.ResolvedTS.IsEmpty() && t.ResolvedTS.Less(rangeFeedStartTS) {
// RangeFeed happily forwards any closed timestamps it receives as
// soon as there are no outstanding intents under them.
// Changefeeds don't care about these at all, so throw them out.
continue
}
if err := memBuf.AddResolved(ctx, t.Span, t.ResolvedTS); err != nil {
return err
}
Expand Down

0 comments on commit 6ce75dc

Please sign in to comment.