-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kvcoord: MuxRangeFeed client uses 1 go routine per node #97957
Conversation
ec571c8
to
25f0257
Compare
ebb9af5
to
1570ea6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to review the mux client more thoroughly, but I'm all reviewed out for today. Flushing the comments I have so far.
@@ -173,6 +180,8 @@ func (ds *DistSender) RangeFeedSpans( | |||
for _, opt := range opts { | |||
opt.set(&cfg) | |||
} | |||
// TODO(yevgeniy): Drop withDiff argument in favor of explicit RangeFeedOption. | |||
cfg.withDiff = withDiff |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we just do this now?
} | ||
} | ||
return ctx.Err() | ||
} | ||
|
||
type rangefeedErrorInfo struct { | ||
transient bool // true if error is transient and should be retried. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We never actually use this field for anything. Is the idea that errors must have an explicit action? If so, let's assert that either this or restart
is set. Otherwise, let's drop it.
} | ||
} | ||
return ctx.Err() | ||
} | ||
|
||
type rangefeedErrorInfo struct { | ||
transient bool // true if error is transient and should be retried. | ||
restart bool // true if the rangefeed for this span needs to be restarted. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
restart
feels like a misnomer, since we'll restart the rangefeed even when this isn't set too. I think the crucial property here is that we refresh the range descriptors because the range structure has changed, so consider e.g. refreshDescriptors
or refreshSpan
or something that indicates this.
|
||
{ | ||
ptr, exists := m.muxClients.Load(int64(nodeID)) | ||
if !exists { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we call LoadOrStore
immediately here? Is the future construction expensive enough to matter?
recvErr = nil | ||
} | ||
|
||
if _, err := handleRangefeedError(ctx, recvErr); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we properly handle cache eviction here, to avoid unnecessary cache invalidation on unrelated errors?
client rpc.RestrictedInternalClient, | ||
nodeID roachpb.NodeID, | ||
) error { | ||
conn *future.Future[connOrError], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm finding it really confusing that we're passing in both client
and conn
here. Consider lifting all of the stream setup stuff above here, and make it clear that conn
is not an input per sé but rather a result that will be populated from client
. Maybe also consider using stream
rather than conn
for each individual multiplexed stream, since conn
reads more like the underlying gRPC transport connection rather than the logical multiplexed stream (which logically wouldn't be possible since client
depends on the transport connection).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No major issues at an initial glance, but let's resolve the comments first.
I'd encourage @aliher1911 to do another review pass over the mux rangefeed machinery as part of the upcoming rangefeed work, since he'll presumably be more intimately familiar with this area by then.
// must be protected by this mutex (streams itself is thread safe, | ||
// the only reason why this mu is required to be held is to ensure | ||
// correct synchronization between start of a new rangefeed feed, and | ||
// mux node connection tear down). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we move streams
under mu
then?
err error | ||
} | ||
active.release() | ||
active.token.Evict(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above, we should only evict this when we need to since it affects foreground tail latencies.
if err != nil { | ||
return err | ||
} | ||
return divideSpanOnRangeBoundaries(ctx, m.ds, rs, active.startAfter, m.startSingleRangeFeed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can avoid the meta iteration by checking rangefeedErrorInfo.restart
.
if err := m.restartActiveRangeFeed(ctx, active, t.Error.GoError()); err != nil { | ||
return err | ||
} | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you write up the overall lifecycle somewhere? It isn't immediately clear to me why it's safe to reuse conn
and conn.receiver
here.
} | ||
case *kvpb.RangeFeedSSTable: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: stray case? Should we explicitly handle all event types here and error out on unknown events?
if timeutil.Now().Before(nextStuckCheck) { | ||
if threshold := stuckThreshold(); threshold > 0 { | ||
if err := conn.eachStream(func(id int64, a *activeMuxRangeFeed) error { | ||
if !a.startAfter.IsEmpty() && timeutil.Since(a.startAfter.GoTime()) > stuckThreshold() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this misfire if the eventCh
send above is slow? Consider moving it higher up so that it primarily reacts to server events. Although I guess it doesn't matter all that much since we'll have head-of-line blocking between streams anyway which can cause false positives.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @erikgrinaker)
pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
line 106 at r4 (raw file):
Previously, erikgrinaker (Erik Grinaker) wrote…
Should we move
streams
undermu
then?
Possible; I had it under mu at some point (an used regular map). I'm a bit concerned that
we will have N go routines (number of nodes) access this almost static map every time they
receive an event (which is ... a lot of times).
I think this is the exact case when IntMap should perform well, but I'm don't have hard data
to back this up.
pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
line 270 at r4 (raw file):
Previously, erikgrinaker (Erik Grinaker) wrote…
Why don't we call
LoadOrStore
immediately here? Is the future construction expensive enough to matter?
You're right; future construction is no longer expensive.
pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
line 298 at r4 (raw file):
Previously, erikgrinaker (Erik Grinaker) wrote…
I'm finding it really confusing that we're passing in both
client
andconn
here. Consider lifting all of the stream setup stuff above here, and make it clear thatconn
is not an input per sé but rather a result that will be populated fromclient
. Maybe also consider usingstream
rather thanconn
for each individual multiplexed stream, sinceconn
reads more like the underlying gRPC transport connection rather than the logical multiplexed stream (which logically wouldn't be possible sinceclient
depends on the transport connection).
I see your point re conn; renamed struct, variables around "muxStream". Hopefully, this naming removes some of the ambiguity.
pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
line 347 at r4 (raw file):
Previously, erikgrinaker (Erik Grinaker) wrote…
Shouldn't we properly handle cache eviction here, to avoid unnecessary cache invalidation on unrelated errors?
Done.
pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
line 415 at r4 (raw file):
Previously, erikgrinaker (Erik Grinaker) wrote…
nit: stray case? Should we explicitly handle all event types here and error out on unknown events?
Hmm.. regular rangefeed also has that -- that's where the code was copied from.
I wonder why we had it there -- just ignoring it? That doesn't seem right. Perhaps it was added when you were adding these new event types, with the intention of rangefeed library being the only consumer of those types of events, and you added it to dist sender rangefeed during implementation?
Do you recall?
pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
line 429 at r4 (raw file):
Previously, erikgrinaker (Erik Grinaker) wrote…
Can you write up the overall lifecycle somewhere? It isn't immediately clear to me why it's safe to reuse
conn
andconn.receiver
here.
Good idea; added large comment on muxStream struct describing sender/receiver semantics.
pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
line 448 at r4 (raw file):
Previously, erikgrinaker (Erik Grinaker) wrote…
Won't this misfire if the
eventCh
send above is slow? Consider moving it higher up so that it primarily reacts to server events. Although I guess it doesn't matter all that much since we'll have head-of-line blocking between streams anyway which can cause false positives.
I think this is sort of the problem with the existing stuck watcher mechanism. It really does depend on event channel being responsive. I don't know -- perhaps even getting this in WITHOUT automatic restart might be beneficial. I don't think the current implementation was used too successfully either since it can misfire when evenCh is slow.
pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
line 472 at r4 (raw file):
Previously, erikgrinaker (Erik Grinaker) wrote…
Same comment as above, we should only evict this when we need to since it affects foreground tail latencies.
Done.
pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go
line 479 at r4 (raw file):
Previously, erikgrinaker (Erik Grinaker) wrote…
We can avoid the meta iteration by checking
rangefeedErrorInfo.restart
.
Done.
pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
line 184 at r4 (raw file):
Previously, erikgrinaker (Erik Grinaker) wrote…
nit: Can we just do this now?
Okay; but done as a separate change in this PR -- there is fair number of callers that needed to be updated.
pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
line 497 at r4 (raw file):
Previously, erikgrinaker (Erik Grinaker) wrote…
We never actually use this field for anything. Is the idea that errors must have an explicit action? If so, let's assert that either this or
restart
is set. Otherwise, let's drop it.
Ack; I used to have more complex setup, but now this field is not needed. non-transient errors are indicated by returning an error from handleRangefeedError function.
pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
line 498 at r4 (raw file):
refreshDescriptors
I'm going to go with resolveSpan instead -- because that's what happens.
Refactor RangeFeed call to take `kvcoord.WithDiff` option instead of stray boolean. Epic: None Release note: None
bors r- |
Canceled. |
Rewrite MuxRangeFeed client to use 1 Go routine per node, instead of 1 Go routine per range. Prior to this change, MuxRangeFeed client was structured so that it was entirely compatible with the execution model of the regular range feed. As a result, 1 Go routine was used per range. This rewrite replaces old implementation with an almost clean slate implementation which uses 1 Go routine per node. Where possible, relatively small and targetted modifications to the rangefeed library were made to extract common methods (such as range splitting). The reduction in the number of Go routines created by rangefeed has direct impact on the cluster performance, and most importantly SQL latency. This is mostly due to the fact that with this PR, the number of Go routines started by MuxRangeFeed is down to 2 per range (on the rangefeed server side) vs 5 for the regular rangefeed. When running changefeeds against tables with 10s-100s of thousands of ranges, this significant difference in the Go routine count has direct impact on Go scheduler latency, the number of runnable Go routines, and ultimately, on the SQL latency. Epic: none Release note (enterprise change) : MuxRangeFeed client (enabled via `changefeed.mux_rangefeed.enabled` setting) is more efficient when running against large scale workloads.
Bors r+ |
Build failed (retrying...): |
Bors r+ |
Already running a review |
Bors r- |
Canceled. |
Bors r+ |
Build failed (retrying...): |
Build failed: |
Bors r+ |
Build succeeded: |
Wonder if this build failure could be related, reproduces readily locally:
|
possible. I'll investigate |
Thanks @erikgrinaker for bringing it up. We have, on occasion seen this pop up. However, as far as mux rangefeed concerned, I just reran this test over 8000 times w/ mux rangefeed -- w/out problems. |
I was bisecting that failure independently (before seeing the thread above), and the result pointed to this PR. The failure reproduces pretty readily on master, and reverting this PR makes it go away. "Fail" and "go away" relative to the following simple stress command:
|
I got repro on roachprod stress |
Recent changes to rangefeed library (cockroachdb#97957) introduced a silly bug (incorrect code completion/copy paste). Use correct timestamp when resuming range feed. Issue: None Epic: None Release note: None
98522: ccl/oidcccl: support principal matching on list claims r=dhartunian a=cameronnunez Previously, matching on ID token claims was not possible if the claim key specified had a corresponding value that was a list, not a string. With this change, matching can now occur on claims that are list valued in order to add login capabilities to DB Console. It is important to note that this change does NOT offer the user the ability to choose between possible matches; it simply selects the first match to log the user in. This change also adds more verbose logging about ID token details. Epic: none Fixes: #97301, #97468 Release note (enterprise change): The cluster setting `server.oidc_authentication.claim_json_key` for DB Console SSO now accepts list-valued token claims. Release note (general change): Increasing the logging verbosity is more helpful with troubleshooting DB Console SSO issues. 98739: sql: simplify V23_1ExternalConnectionsTableHasOwnerIDColumn gating r=adityamaru a=andyyang890 Informs #87079 Release note: None 98892: kvcoord: Use correct timestamp when restarting range r=miretskiy a=miretskiy Recent changes to rangefeed library (#97957) introduced a silly bug (incorrect code completion/copy paste). Use correct timestamp when resuming range feed. Issue: None Epic: None Release note: None Co-authored-by: Cameron Nunez <cameron@cockroachlabs.com> Co-authored-by: Andy Yang <yang@cockroachlabs.com> Co-authored-by: Yevgeniy Miretskiy <yevgeniy@cockroachlabs.com>
Rewrite MuxRangeFeed client to use 1 Go routine per node,
instead of 1 Go routine per range.
Prior to this change, MuxRangeFeed client was structured
so that it was entirely compatible with the execution
model of the regular range feed. As a result,
1 Go routine was used per range. This rewrite replaces
old implementation with an almost clean slate implementation
which uses 1 Go routine per node.
Where possible, relatively small and targetted modifications
to the rangefeed library were made to extract common methods
(such as range splitting).
The reduction in the number of Go routines created by rangefeed
has direct impact on the cluster performance, and most importantly
SQL latency. This is mostly due to the fact that with this PR,
the number of Go routines started by MuxRangeFeed is down to
2 per range (on the rangefeed server side) vs 5 for the regular
rangefeed. When running changefeeds against tables with
10s-100s of thousands of ranges, this significant difference
in the Go routine count has direct impact on Go scheduler latency,
the number of runnable Go routines, and ultimately, on the SQL
latency.
Epic: CRDB-25044
Release note (enterprise change) : MuxRangeFeed client (enabled via
changefeed.mux_rangefeed.enabled
setting) is more efficientwhen running against large scale workloads.