diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 5794d9c1cd50..d8c1bdc5f9c0 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -1934,13 +1934,11 @@ func noMoreReplicasErr(ambiguousErr, lastAttemptErr error) error { func (ds *DistSender) sendToReplicas( ctx context.Context, ba *roachpb.BatchRequest, routing rangecache.EvictionToken, withCommit bool, ) (*roachpb.BatchResponse, error) { - desc := routing.Desc() - ba.RangeID = desc.RangeID - // If this request can be sent to a follower to perform a consistent follower // read under the closed timestamp, promote its routing policy to NEAREST. if ba.RoutingPolicy == roachpb.RoutingPolicy_LEASEHOLDER && CanSendToFollower(ds.logicalClusterID.Get(), ds.st, ds.clock, routing.ClosedTimestampPolicy(), ba) { + ba = ba.ShallowCopy() ba.RoutingPolicy = roachpb.RoutingPolicy_NEAREST } @@ -1956,6 +1954,7 @@ func (ds *DistSender) sendToReplicas( default: log.Fatalf(ctx, "unknown routing policy: %s", ba.RoutingPolicy) } + desc := routing.Desc() leaseholder := routing.Leaseholder() replicas, err := NewReplicaSlice(ctx, ds.nodeDescs, desc, leaseholder, replicaFilter) if err != nil { @@ -2057,6 +2056,10 @@ func (ds *DistSender) sendToReplicas( } } prevReplica = curReplica + + ba = ba.ShallowCopy() + ba.Replica = curReplica + ba.RangeID = desc.RangeID // Communicate to the server the information our cache has about the // range. If it's stale, the serve will return an update. ba.ClientRangeInfo = roachpb.ClientRangeInfo{ diff --git a/pkg/kv/kvclient/kvcoord/transport.go b/pkg/kv/kvclient/kvcoord/transport.go index 9397793ed432..ad074b811629 100644 --- a/pkg/kv/kvclient/kvcoord/transport.go +++ b/pkg/kv/kvclient/kvcoord/transport.go @@ -185,8 +185,6 @@ func (gt *grpcTransport) SendNext( if err != nil { return nil, err } - - ba.Replica = r return gt.sendBatch(ctx, r.NodeID, iface, ba) } diff --git a/pkg/kv/kvclient/kvcoord/transport_race.go b/pkg/kv/kvclient/kvcoord/transport_race.go index 9615535b2faa..86473b8bab72 100644 --- a/pkg/kv/kvclient/kvcoord/transport_race.go +++ b/pkg/kv/kvclient/kvcoord/transport_race.go @@ -54,7 +54,7 @@ type raceTransport struct { } func (tr raceTransport) SendNext( - ctx context.Context, ba roachpb.BatchRequest, + ctx context.Context, ba *roachpb.BatchRequest, ) (*roachpb.BatchResponse, error) { // Make a copy of the requests slice, and shallow copies of the requests. // The caller is allowed to mutate the request after the call returns. Since @@ -69,10 +69,7 @@ func (tr raceTransport) SendNext( } ba.Requests = requestsCopy select { - // We have a shallow copy here and so the top level scalar fields can't - // really race, but making more copies doesn't make anything more - // transparent, so from now on we operate on a pointer. - case incoming <- &ba: + case incoming <- ba: default: // Avoid slowing down the tests if we're backed up. } diff --git a/pkg/rpc/nodedialer/nodedialer.go b/pkg/rpc/nodedialer/nodedialer.go index 9eaeec2bc1c7..d7329017b92e 100644 --- a/pkg/rpc/nodedialer/nodedialer.go +++ b/pkg/rpc/nodedialer/nodedialer.go @@ -308,11 +308,12 @@ type TracingInternalClient struct { // Batch overrides the Batch RPC client method and fills in tracing information. func (tic TracingInternalClient) Batch( - ctx context.Context, req *roachpb.BatchRequest, opts ...grpc.CallOption, + ctx context.Context, ba *roachpb.BatchRequest, opts ...grpc.CallOption, ) (*roachpb.BatchResponse, error) { sp := tracing.SpanFromContext(ctx) if sp != nil && !sp.IsNoop() { - req.TraceInfo = sp.Meta().ToProto() + ba = ba.ShallowCopy() + ba.TraceInfo = sp.Meta().ToProto() } - return tic.InternalClient.Batch(ctx, req, opts...) + return tic.InternalClient.Batch(ctx, ba, opts...) }