Skip to content

Commit

Permalink
kv,roachpb: document rangefeed resolved timestamp guarantees
Browse files Browse the repository at this point in the history
I was surprised that rangefeeds would return resolved timestamps from
before the request timestamp and this expectation mismatch was a
component in a changefeed bug. It's not clear yet if the contract is
wrong or simply underdocumented, so document it louder for now and leave
a TODO to reconsider the contract when rangefeeds have more users.

Also a very small cleanup on the `(*DistSender).RangeFeed` api while I'm
in here.

Release note: None
  • Loading branch information
danhhz committed Apr 23, 2019
1 parent 253126e commit 6ee3261
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 143 deletions.
12 changes: 4 additions & 8 deletions pkg/ccl/changefeedccl/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,16 +279,12 @@ func (p *poller) rangefeedImpl(ctx context.Context) error {
// the faster-to-implement solution for now.
frontier := makeSpanFrontier(spans...)

rangeFeedStartTS := lastHighwater
for _, span := range p.spans {
req := &roachpb.RangeFeedRequest{
Header: roachpb.Header{
Timestamp: lastHighwater,
},
Span: span,
}
frontier.Forward(span, lastHighwater)
span := span
frontier.Forward(span, rangeFeedStartTS)
g.GoCtx(func(ctx context.Context) error {
return ds.RangeFeed(ctx, req, eventC)
return ds.RangeFeed(ctx, span, rangeFeedStartTS, eventC)
})
}
g.GoCtx(func(ctx context.Context) error {
Expand Down
11 changes: 7 additions & 4 deletions pkg/kv/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,21 @@ type singleRangeInfo struct {
// RangeFeed divides a RangeFeed request on range boundaries and establishes a
// RangeFeed to each of the individual ranges. It streams back results on the
// provided channel.
//
// Note that the timestamps in RangeFeedCheckpoint events that are streamed back
// may be lower than the timestamp given here.
func (ds *DistSender) RangeFeed(
ctx context.Context, args *roachpb.RangeFeedRequest, eventCh chan<- *roachpb.RangeFeedEvent,
ctx context.Context, span roachpb.Span, ts hlc.Timestamp, eventCh chan<- *roachpb.RangeFeedEvent,
) error {
ctx = ds.AnnotateCtx(ctx)
ctx, sp := tracing.EnsureChildSpan(ctx, ds.AmbientContext.Tracer, "dist sender")
defer sp.Finish()

startRKey, err := keys.Addr(args.Span.Key)
startRKey, err := keys.Addr(span.Key)
if err != nil {
return err
}
endRKey, err := keys.Addr(args.Span.EndKey)
endRKey, err := keys.Addr(span.EndKey)
if err != nil {
return err
}
Expand All @@ -76,7 +79,7 @@ func (ds *DistSender) RangeFeed(

// Kick off the initial set of ranges.
g.GoCtx(func(ctx context.Context) error {
return ds.divideAndSendRangeFeedToRanges(ctx, rs, args.Timestamp, rangeCh)
return ds.divideAndSendRangeFeedToRanges(ctx, rs, ts, rangeCh)
})

return g.Wait()
Expand Down
Loading

0 comments on commit 6ee3261

Please sign in to comment.