Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: deflake raceTransport after pass-by-ref BatchRequest #91031

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1934,13 +1934,11 @@ func noMoreReplicasErr(ambiguousErr, lastAttemptErr error) error {
func (ds *DistSender) sendToReplicas(
ctx context.Context, ba *roachpb.BatchRequest, routing rangecache.EvictionToken, withCommit bool,
) (*roachpb.BatchResponse, error) {
desc := routing.Desc()
ba.RangeID = desc.RangeID

// If this request can be sent to a follower to perform a consistent follower
// read under the closed timestamp, promote its routing policy to NEAREST.
if ba.RoutingPolicy == roachpb.RoutingPolicy_LEASEHOLDER &&
CanSendToFollower(ds.logicalClusterID.Get(), ds.st, ds.clock, routing.ClosedTimestampPolicy(), ba) {
ba = ba.ShallowCopy()
ba.RoutingPolicy = roachpb.RoutingPolicy_NEAREST
}

Expand All @@ -1956,6 +1954,7 @@ func (ds *DistSender) sendToReplicas(
default:
log.Fatalf(ctx, "unknown routing policy: %s", ba.RoutingPolicy)
}
desc := routing.Desc()
leaseholder := routing.Leaseholder()
replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, leaseholder, replicaFilter)
if err != nil {
Expand Down Expand Up @@ -2057,6 +2056,10 @@ func (ds *DistSender) sendToReplicas(
}
}
prevReplica = curReplica

ba = ba.ShallowCopy()
ba.Replica = curReplica
ba.RangeID = desc.RangeID
// Communicate to the server the information our cache has about the
// range. If it's stale, the serve will return an update.
ba.ClientRangeInfo = roachpb.ClientRangeInfo{
Expand Down
2 changes: 0 additions & 2 deletions pkg/kv/kvclient/kvcoord/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,6 @@ func (gt *grpcTransport) SendNext(
if err != nil {
return nil, err
}

ba.Replica = r
return gt.sendBatch(ctx, r.NodeID, iface, ba)
}

Expand Down
7 changes: 2 additions & 5 deletions pkg/kv/kvclient/kvcoord/transport_race.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type raceTransport struct {
}

func (tr raceTransport) SendNext(
ctx context.Context, ba roachpb.BatchRequest,
ctx context.Context, ba *roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
// Make a copy of the requests slice, and shallow copies of the requests.
// The caller is allowed to mutate the request after the call returns. Since
Expand All @@ -69,10 +69,7 @@ func (tr raceTransport) SendNext(
}
ba.Requests = requestsCopy
select {
// We have a shallow copy here and so the top level scalar fields can't
// really race, but making more copies doesn't make anything more
// transparent, so from now on we operate on a pointer.
case incoming <- &ba:
case incoming <- ba:
default:
// Avoid slowing down the tests if we're backed up.
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/rpc/nodedialer/nodedialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,12 @@ type TracingInternalClient struct {

// Batch overrides the Batch RPC client method and fills in tracing information.
func (tic TracingInternalClient) Batch(
ctx context.Context, req *roachpb.BatchRequest, opts ...grpc.CallOption,
ctx context.Context, ba *roachpb.BatchRequest, opts ...grpc.CallOption,
) (*roachpb.BatchResponse, error) {
sp := tracing.SpanFromContext(ctx)
if sp != nil && !sp.IsNoop() {
req.TraceInfo = sp.Meta().ToProto()
ba = ba.ShallowCopy()
ba.TraceInfo = sp.Meta().ToProto()
}
return tic.InternalClient.Batch(ctx, req, opts...)
return tic.InternalClient.Batch(ctx, ba, opts...)
}