Skip to content

Commit

Permalink
kvcoord: Replace withDiff argument with WithDiff option
Browse files Browse the repository at this point in the history
Refactor RangeFeed call to take `kvcoord.WithDiff` option
instead of stray boolean.

Epic: None
Release note: None
  • Loading branch information
Yevgeniy Miretskiy committed Mar 16, 2023
1 parent 8c4018a commit 439b1e3
Show file tree
Hide file tree
Showing 11 changed files with 34 additions and 34 deletions.
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,6 @@ type rawEventFeed []kvpb.RangeFeedEvent
func (f rawEventFeed) run(
ctx context.Context,
spans []kvcoord.SpanTimePair,
withDiff bool,
eventC chan<- kvcoord.RangeFeedMessage,
opts ...kvcoord.RangeFeedOption,
) error {
Expand Down
6 changes: 4 additions & 2 deletions pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ type rangeFeedConfig struct {
type rangefeedFactory func(
ctx context.Context,
spans []kvcoord.SpanTimePair,
withDiff bool,
eventC chan<- kvcoord.RangeFeedMessage,
opts ...kvcoord.RangeFeedOption,
) error
Expand Down Expand Up @@ -79,9 +78,12 @@ func (p rangefeedFactory) Run(ctx context.Context, sink kvevent.Writer, cfg rang
if cfg.UseMux {
rfOpts = append(rfOpts, kvcoord.WithMuxRangeFeed())
}
if cfg.WithDiff {
rfOpts = append(rfOpts, kvcoord.WithDiff())
}

g.GoCtx(func(ctx context.Context) error {
return p(ctx, cfg.Spans, cfg.WithDiff, feed.eventC, rfOpts...)
return p(ctx, cfg.Spans, feed.eventC, rfOpts...)
})
return g.Wait()
}
Expand Down
13 changes: 9 additions & 4 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ func WithSystemTablePriority() RangeFeedOption {
})
}

// WithDiff turns on "diff" option for the rangefeed.
func WithDiff() RangeFeedOption {
return optionFunc(func(c *rangeFeedConfig) {
c.withDiff = true
})
}

// A "kill switch" to disable multiplexing rangefeed if severe issues discovered with new implementation.
var enableMuxRangeFeed = envutil.EnvOrDefaultBool("COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED", true)

Expand All @@ -134,7 +141,6 @@ func (ds *DistSender) RangeFeed(
ctx context.Context,
spans []roachpb.Span,
startAfter hlc.Timestamp, // exclusive
withDiff bool,
eventCh chan<- RangeFeedMessage,
opts ...RangeFeedOption,
) error {
Expand All @@ -145,7 +151,7 @@ func (ds *DistSender) RangeFeed(
StartAfter: startAfter,
})
}
return ds.RangeFeedSpans(ctx, timedSpans, withDiff, eventCh, opts...)
return ds.RangeFeedSpans(ctx, timedSpans, eventCh, opts...)
}

// SpanTimePair is a pair of span along with its starting time. The starting
Expand All @@ -161,7 +167,6 @@ type SpanTimePair struct {
func (ds *DistSender) RangeFeedSpans(
ctx context.Context,
spans []SpanTimePair,
withDiff bool,
eventCh chan<- RangeFeedMessage,
opts ...RangeFeedOption,
) error {
Expand All @@ -178,7 +183,7 @@ func (ds *DistSender) RangeFeedSpans(
ctx, sp := tracing.EnsureChildSpan(ctx, ds.AmbientContext.Tracer, "dist sender")
defer sp.Finish()

rr := newRangeFeedRegistry(ctx, withDiff)
rr := newRangeFeedRegistry(ctx, cfg.withDiff)
ds.activeRangeFeeds.Store(rr, nil)
defer ds.activeRangeFeeds.Delete(rr)

Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,7 @@ func TestDistSenderRangeFeedRetryOnTransportErrors(t *testing.T) {
if useMuxRangeFeed {
opts = append(opts, WithMuxRangeFeed())
}
err := ds.RangeFeed(ctx, []roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}, hlc.Timestamp{},
false, nil, opts...)
err := ds.RangeFeed(ctx, []roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}, hlc.Timestamp{}, nil, opts...)
require.Error(t, err)
})
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvclient/rangefeed/db_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,10 @@ func (dbc *dbAdapter) RangeFeed(
ctx context.Context,
spans []roachpb.Span,
startFrom hlc.Timestamp,
withDiff bool,
eventC chan<- kvcoord.RangeFeedMessage,
opts ...kvcoord.RangeFeedOption,
) error {
return dbc.distSender.RangeFeed(ctx, spans, startFrom, withDiff, eventC, opts...)
return dbc.distSender.RangeFeed(ctx, spans, startFrom, eventC, opts...)
}

// concurrentBoundAccount is a thread safe bound account.
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvclient/rangefeed/mocks_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions pkg/kv/kvclient/rangefeed/rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ type DB interface {
ctx context.Context,
spans []roachpb.Span,
startFrom hlc.Timestamp,
withDiff bool,
eventC chan<- kvcoord.RangeFeedMessage,
opts ...kvcoord.RangeFeedOption,
) error
Expand Down Expand Up @@ -299,6 +298,9 @@ func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) {
if useMuxRangeFeed {
rangefeedOpts = append(rangefeedOpts, kvcoord.WithMuxRangeFeed())
}
if f.withDiff {
rangefeedOpts = append(rangefeedOpts, kvcoord.WithDiff())
}

for i := 0; r.Next(); i++ {
ts := frontier.Frontier()
Expand All @@ -309,7 +311,7 @@ func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) {
start := timeutil.Now()

rangeFeedTask := func(ctx context.Context) error {
return f.client.RangeFeed(ctx, f.spans, ts, f.withDiff, eventCh, rangefeedOpts...)
return f.client.RangeFeed(ctx, f.spans, ts, eventCh, rangefeedOpts...)
}
processEventsTask := func(ctx context.Context) error {
return f.processEvents(ctx, frontier, eventCh)
Expand Down
10 changes: 3 additions & 7 deletions pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type mockClient struct {
ctx context.Context,
spans []roachpb.Span,
startFrom hlc.Timestamp,
withDiff bool,
eventC chan<- kvcoord.RangeFeedMessage,
) error

Expand All @@ -53,11 +52,10 @@ func (m *mockClient) RangeFeed(
ctx context.Context,
spans []roachpb.Span,
startFrom hlc.Timestamp,
withDiff bool,
eventC chan<- kvcoord.RangeFeedMessage,
opts ...kvcoord.RangeFeedOption,
) error {
return m.rangefeed(ctx, spans, startFrom, withDiff, eventC)
return m.rangefeed(ctx, spans, startFrom, eventC)
}

func (m *mockClient) Scan(
Expand Down Expand Up @@ -165,9 +163,8 @@ func TestRangeFeedMock(t *testing.T) {
return nil
},
rangefeed: func(
ctx context.Context, spans []roachpb.Span, startFrom hlc.Timestamp, withDiff bool, eventC chan<- kvcoord.RangeFeedMessage,
ctx context.Context, spans []roachpb.Span, startFrom hlc.Timestamp, eventC chan<- kvcoord.RangeFeedMessage,
) error {
assert.False(t, withDiff) // it was not set
sendEvent := func(ts hlc.Timestamp) {
eventC <- kvcoord.RangeFeedMessage{
RangeFeedEvent: &kvpb.RangeFeedEvent{
Expand Down Expand Up @@ -269,9 +266,8 @@ func TestRangeFeedMock(t *testing.T) {
return nil
},
rangefeed: func(
ctx context.Context, spans []roachpb.Span, startFrom hlc.Timestamp, withDiff bool, eventC chan<- kvcoord.RangeFeedMessage,
ctx context.Context, spans []roachpb.Span, startFrom hlc.Timestamp, eventC chan<- kvcoord.RangeFeedMessage,
) error {
assert.True(t, withDiff)
eventC <- kvcoord.RangeFeedMessage{
RangeFeedEvent: &kvpb.RangeFeedEvent{
Val: &kvpb.RangeFeedValue{
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvnemesis/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func Watch(ctx context.Context, env *Env, dbs []*kv.DB, dataSpan roachpb.Span) (
w.mu.Unlock()

ds := dss[i]
err := ds.RangeFeed(ctx, []roachpb.Span{dataSpan}, ts, true /* withDiff */, eventC)
err := ds.RangeFeed(ctx, []roachpb.Span{dataSpan}, ts, eventC, kvcoord.WithDiff())
if isRetryableRangeFeedErr(err) {
log.Infof(ctx, "got retryable RangeFeed error: %+v", err)
continue
Expand Down
8 changes: 3 additions & 5 deletions pkg/kv/kvserver/client_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestRangefeedWorksOnSystemRangesUnconditionally(t *testing.T) {
rangefeedErrChan := make(chan error, 1)
ctxToCancel, cancel := context.WithCancel(ctx)
go func() {
rangefeedErrChan <- ds.RangeFeed(ctxToCancel, []roachpb.Span{descTableSpan}, startTS, false /* withDiff */, evChan)
rangefeedErrChan <- ds.RangeFeed(ctxToCancel, []roachpb.Span{descTableSpan}, startTS, evChan)
}()

// Note: 42 is a system descriptor.
Expand Down Expand Up @@ -137,7 +137,7 @@ func TestRangefeedWorksOnSystemRangesUnconditionally(t *testing.T) {
})
evChan := make(chan kvcoord.RangeFeedMessage)
require.Regexp(t, `rangefeeds require the kv\.rangefeed.enabled setting`,
ds.RangeFeed(ctx, []roachpb.Span{scratchSpan}, startTS, false /* withDiff */, evChan))
ds.RangeFeed(ctx, []roachpb.Span{scratchSpan}, startTS, evChan))
})
}

Expand Down Expand Up @@ -191,7 +191,6 @@ func TestMergeOfRangeEventTableWhileRunningRangefeed(t *testing.T) {
rangefeedErrChan <- ds.RangeFeed(rangefeedCtx,
[]roachpb.Span{lhsRepl.Desc().RSpan().AsRawSpanWithNoLocals()},
start,
false, /* withDiff */
eventCh)
}()

Expand Down Expand Up @@ -256,7 +255,6 @@ func TestRangefeedIsRoutedToNonVoter(t *testing.T) {
rangefeedCtx,
[]roachpb.Span{desc.RSpan().AsRawSpanWithNoLocals()},
startTS,
false, /* withDiff */
eventCh,
)
}()
Expand Down Expand Up @@ -310,7 +308,7 @@ func TestRangefeedWorksOnLivenessRange(t *testing.T) {
eventC := make(chan kvcoord.RangeFeedMessage)
errC := make(chan error, 1)
go func() {
errC <- ds.RangeFeed(ctx, []roachpb.Span{keys.NodeLivenessSpan}, startTS, false /* withDiff */, eventC)
errC <- ds.RangeFeed(ctx, []roachpb.Span{keys.NodeLivenessSpan}, startTS, eventC)
}()

// Wait for a liveness update.
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/replica_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1160,7 +1160,7 @@ func TestReplicaRangefeedPushesTransactions(t *testing.T) {
span := roachpb.Span{
Key: desc.StartKey.AsRawKey(), EndKey: desc.EndKey.AsRawKey(),
}
rangeFeedErrC <- ds.RangeFeed(rangeFeedCtx, []roachpb.Span{span}, ts1, false /* withDiff */, rangeFeedCh)
rangeFeedErrC <- ds.RangeFeed(rangeFeedCtx, []roachpb.Span{span}, ts1, rangeFeedCh)
}()
}

Expand Down Expand Up @@ -1309,7 +1309,7 @@ func TestRangefeedCheckpointsRecoverFromLeaseExpiration(t *testing.T) {
span := roachpb.Span{
Key: desc.StartKey.AsRawKey(), EndKey: desc.EndKey.AsRawKey(),
}
rangeFeedErrC <- ds.RangeFeed(rangeFeedCtx, []roachpb.Span{span}, ts1, false /* withDiff */, rangeFeedCh)
rangeFeedErrC <- ds.RangeFeed(rangeFeedCtx, []roachpb.Span{span}, ts1, rangeFeedCh)
}()

// Wait for a checkpoint above ts.
Expand Down Expand Up @@ -1490,7 +1490,7 @@ func TestNewRangefeedForceLeaseRetry(t *testing.T) {
}
startRangefeed := func() {
span := rangefeedSpan
rangeFeedErrC <- ds.RangeFeed(rangeFeedCtx, []roachpb.Span{span}, ts1, false /* withDiff */, rangeFeedCh)
rangeFeedErrC <- ds.RangeFeed(rangeFeedCtx, []roachpb.Span{span}, ts1, rangeFeedCh)
}

// Wait for a checkpoint above ts.
Expand Down

0 comments on commit 439b1e3

Please sign in to comment.