Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: Break gossip dependency #101096

Merged
merged 1 commit into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,8 @@ type distResolver struct {
func (r *distResolver) getRangesForSpans(
ctx context.Context, spans []roachpb.Span,
) ([]roachpb.Span, error) {
return kvfeed.AllRangeSpans(ctx, r.DistSender, spans)
spans, _, err := kvfeed.AllRangeSpans(ctx, r.DistSender, spans)
return spans, err
}

func rebalanceSpanPartitions(
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/kvfeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_library(
"//pkg/settings/cluster",
"//pkg/sql/covering",
"//pkg/storage/enginepb",
"//pkg/util",
"//pkg/util/admission/admissionpb",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
Expand Down
61 changes: 26 additions & 35 deletions pkg/ccl/changefeedccl/kvfeed/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/covering"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -70,7 +71,7 @@ func (p *scanRequestScanner) Scan(ctx context.Context, sink kvevent.Writer, cfg

sender := p.db.NonTransactionalSender()
distSender := sender.(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender)
spans, err := getSpansToProcess(ctx, distSender, cfg.Spans)
spans, numNodesHint, err := getRangesToProcess(ctx, distSender, cfg.Spans)
if err != nil {
return err
}
Expand All @@ -81,7 +82,7 @@ func (p *scanRequestScanner) Scan(ctx context.Context, sink kvevent.Writer, cfg
defer backfillClear()
}

maxConcurrentScans := maxConcurrentScanRequests(p.gossip, &p.settings.SV)
maxConcurrentScans := maxConcurrentScanRequests(numNodesHint, &p.settings.SV)
exportLim := limit.MakeConcurrentRequestLimiter("changefeedScanRequestLimiter", maxConcurrentScans)

lastScanLimitUserSetting := changefeedbase.ScanRequestLimit.Get(&p.settings.SV)
Expand All @@ -95,7 +96,7 @@ func (p *scanRequestScanner) Scan(ctx context.Context, sink kvevent.Writer, cfg
// If the user defined scan request limit has changed, recalculate it
if currentUserScanLimit := changefeedbase.ScanRequestLimit.Get(&p.settings.SV); currentUserScanLimit != lastScanLimitUserSetting {
lastScanLimitUserSetting = currentUserScanLimit
exportLim.SetLimit(maxConcurrentScanRequests(p.gossip, &p.settings.SV))
exportLim.SetLimit(maxConcurrentScanRequests(numNodesHint, &p.settings.SV))
}

limAlloc, err := exportLim.Begin(ctx)
Expand Down Expand Up @@ -249,12 +250,14 @@ func (p *scanRequestScanner) exportSpan(
return nil
}

func getSpansToProcess(
// getRangesToProcess returns the list of ranges covering input list of spans.
// Returns the number of nodes that are leaseholders for those spans.
func getRangesToProcess(
ctx context.Context, ds *kvcoord.DistSender, targetSpans []roachpb.Span,
) ([]roachpb.Span, error) {
ranges, err := AllRangeSpans(ctx, ds, targetSpans)
) ([]roachpb.Span, int, error) {
ranges, numNodes, err := AllRangeSpans(ctx, ds, targetSpans)
if err != nil {
return nil, err
return nil, 0, err
}

type spanMarker struct{}
Expand Down Expand Up @@ -289,7 +292,7 @@ func getSpansToProcess(
}
requests = append(requests, roachpb.Span{Key: chunk.Start, EndKey: chunk.End})
}
return requests, nil
return requests, numNodes, nil
}

// slurpScanResponse iterates the ScanResponse and inserts the contained kvs into
Expand Down Expand Up @@ -319,66 +322,54 @@ func slurpScanResponse(
return nil
}

// AllRangeSpans returns the list of all ranges that for the specified list of spans.
// AllRangeSpans returns the list of all ranges that cover input spans along with the
// nodeCountHint indicating the number of nodes that host those ranges.
func AllRangeSpans(
ctx context.Context, ds *kvcoord.DistSender, spans []roachpb.Span,
) ([]roachpb.Span, error) {

) (_ []roachpb.Span, nodeCountHint int, _ error) {
ranges := make([]roachpb.Span, 0, len(spans))

it := kvcoord.MakeRangeIterator(ds)
var replicas util.FastIntMap

for i := range spans {
rSpan, err := keys.SpanAddr(spans[i])
if err != nil {
return nil, err
return nil, 0, err
}
for it.Seek(ctx, rSpan.Key, kvcoord.Ascending); ; it.Next(ctx) {
if !it.Valid() {
return nil, it.Error()
return nil, 0, it.Error()
}
ranges = append(ranges, roachpb.Span{
Key: it.Desc().StartKey.AsRawKey(), EndKey: it.Desc().EndKey.AsRawKey(),
})
for _, r := range it.Desc().InternalReplicas {
replicas.Set(int(r.NodeID), 0)
}
if !it.NeedAnother(rSpan) {
break
}
}
}

return ranges, nil
}

// clusterNodeCount returns the approximate number of nodes in the cluster.
func clusterNodeCount(gw gossip.OptionalGossip) int {
g, err := gw.OptionalErr(47971)
if err != nil {
// can't count nodes in tenants
return 1
}
var nodes int
_ = g.IterateInfos(gossip.KeyNodeDescPrefix, func(_ string, _ gossip.Info) error {
nodes++
return nil
})
return nodes
return ranges, replicas.Len(), nil
}

// maxConcurrentScanRequests returns the number of concurrent scan requests.
func maxConcurrentScanRequests(gw gossip.OptionalGossip, sv *settings.Values) int {
func maxConcurrentScanRequests(numNodesHint int, sv *settings.Values) int {
// If the user specified ScanRequestLimit -- use that value.
if max := changefeedbase.ScanRequestLimit.Get(sv); max > 0 {
return int(max)
}
if numNodesHint < 1 {
return 1
}

// TODO(yevgeniy): Currently, issuing multiple concurrent updates scaled to the size of
// the cluster only make sense for the core change feeds. This configuration shoould
// be specified explicitly when creating scanner.
nodes := clusterNodeCount(gw)
// This is all hand-wavy: 3 per node used to be the default for a very long time.
// However, this could get out of hand if the clusters are large.
// So cap the max to an arbitrary value of a 100.
max := 3 * nodes
max := 3 * numNodesHint
if max > 100 {
max = 100
}
Expand Down