Skip to content

Commit

Permalink
kv: pass BatchRequest by reference, not value
Browse files Browse the repository at this point in the history
This commit switches the sender interface from:
```go
type Sender interface {
	Send(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)
}
```
to
```go
type Sender interface {
	Send(context.Context, *roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)
}
```

In doing so, the entire KV layer (client-side and server-side) switches from
passing `BatchRequest` objects by pointer instead of by value. These objects are
over 200 bytes in size and are passed to hundreds of functions throughout the KV
layer in service of a single request, so this is a dramatic reduction in memory
copies. Furthermore, these objects were often already escaping to the heap, so
this actually reduces heap allocations by being more deliberate about when we
heap allocate and when we perform a shallow clone.

\### Macro benchmarks
```
name                           old ops/s    new ops/s    delta
kv95/enc=false/nodes=3/cpu=32    120k ± 4%    122k ± 3%  +2.28%  (p=0.052 n=10+10)

name                           old avg(ms)  new avg(ms)  delta
kv95/enc=false/nodes=3/cpu=32    1.60 ± 0%    1.60 ± 0%    ~     (all equal)

name                           old p99(ms)  new p99(ms)  delta
kv95/enc=false/nodes=3/cpu=32    7.79 ± 4%    7.60 ± 0%  -2.44%  (p=0.013 n=10+8)
```

This commit is based on #86957. Together, they have the following impact on macro
benchmark performance:
```
name                           old ops/s    new ops/s    delta
kv95/enc=false/nodes=3/cpu=32    109k ±10%    122k ± 3%  +12.35%  (p=0.000 n=10+10)

name                           old avg(ms)  new avg(ms)  delta
kv95/enc=false/nodes=3/cpu=32    1.79 ±12%    1.60 ± 0%  -10.61%  (p=0.000 n=10+8)

name                           old p99(ms)  new p99(ms)  delta
kv95/enc=false/nodes=3/cpu=32    8.53 ±17%    7.60 ± 0%  -10.90%  (p=0.000 n=10+8)
```

\### Micro benchmarks
```
name                        old time/op    new time/op    delta
KV/Scan/Native/rows=1-10      17.1µs ± 1%    16.5µs ± 2%  -3.36%  (p=0.000 n=8+10)
KV/Update/Native/rows=1-10    66.9µs ± 2%    65.2µs ± 2%  -2.52%  (p=0.000 n=9+10)
KV/Insert/Native/rows=1-10    42.1µs ± 3%    41.3µs ± 3%  -1.95%  (p=0.015 n=10+10)
KV/Delete/Native/rows=1-10    41.9µs ± 3%    41.2µs ± 2%  -1.69%  (p=0.035 n=10+10)
KV/Insert/SQL/rows=1-10        125µs ± 2%     126µs ± 6%    ~     (p=0.631 n=10+10)
KV/Update/SQL/rows=1-10        171µs ± 4%     170µs ± 3%    ~     (p=0.579 n=10+10)
KV/Delete/SQL/rows=1-10        138µs ± 4%     138µs ± 4%    ~     (p=0.393 n=10+10)
KV/Scan/SQL/rows=1-10         94.4µs ± 4%    93.6µs ± 3%    ~     (p=0.579 n=10+10)

name                        old alloc/op   new alloc/op   delta
KV/Update/Native/rows=1-10    22.3kB ± 0%    21.6kB ± 0%  -2.83%  (p=0.000 n=9+10)
KV/Insert/Native/rows=1-10    15.8kB ± 0%    15.5kB ± 0%  -1.86%  (p=0.000 n=9+10)
KV/Delete/Native/rows=1-10    15.5kB ± 1%    15.2kB ± 0%  -1.83%  (p=0.000 n=10+10)
KV/Update/SQL/rows=1-10       51.6kB ± 0%    51.0kB ± 0%  -1.11%  (p=0.000 n=10+8)
KV/Insert/SQL/rows=1-10       44.8kB ± 0%    44.4kB ± 0%  -0.80%  (p=0.000 n=10+9)
KV/Scan/Native/rows=1-10      7.57kB ± 0%    7.52kB ± 0%  -0.70%  (p=0.000 n=9+10)
KV/Delete/SQL/rows=1-10       51.6kB ± 0%    51.3kB ± 0%  -0.54%  (p=0.000 n=10+10)
KV/Scan/SQL/rows=1-10         24.3kB ± 0%    24.3kB ± 0%  -0.14%  (p=0.009 n=8+10)

name                        old allocs/op  new allocs/op  delta
KV/Update/Native/rows=1-10       182 ± 0%       181 ± 0%  -0.89%  (p=0.000 n=9+10)
KV/Delete/Native/rows=1-10       127 ± 0%       126 ± 0%  -0.79%  (p=0.000 n=10+10)
KV/Insert/Native/rows=1-10       128 ± 0%       127 ± 0%  -0.78%  (p=0.000 n=9+10)
KV/Update/SQL/rows=1-10          521 ± 0%       519 ± 0%  -0.39%  (p=0.000 n=10+8)
KV/Delete/SQL/rows=1-10          386 ± 0%       385 ± 0%  -0.26%  (p=0.000 n=8+8)
KV/Insert/SQL/rows=1-10          359 ± 0%       359 ± 0%  -0.22%  (p=0.009 n=10+10)
KV/Scan/Native/rows=1-10        55.0 ± 0%      55.0 ± 0%    ~     (all equal)
KV/Scan/SQL/rows=1-10            281 ± 0%       281 ± 0%    ~     (all equal)
```

Release justification: None. Don't merge until after the branch cut.
  • Loading branch information
nvanbenschoten committed Oct 31, 2022
1 parent 3dd5e68 commit e822649
Show file tree
Hide file tree
Showing 127 changed files with 870 additions and 837 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_intents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestCleanupIntentsDuringBackupPerformanceRegression(t *testing.T) {

// Interceptor catches requests that cleanup transactions of size 1000 which are
// test data transactions. All other transaction commits pass though.
interceptor := func(ctx context.Context, req roachpb.BatchRequest) *roachpb.Error {
interceptor := func(ctx context.Context, req *roachpb.BatchRequest) *roachpb.Error {
endTxn := req.Requests[0].GetEndTxn()
if endTxn != nil && !endTxn.Commit && len(endTxn.LockSpans) == perTransactionRowCount {
// If this is a rollback of one the test's SQL transactions, allow the
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6170,7 +6170,7 @@ func TestRestoreErrorPropagates(t *testing.T) {
jobsTableKey := keys.SystemSQLCodec.TablePrefix(uint32(systemschema.JobsTable.GetID()))
var shouldFail, failures int64
params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error {
TestingRequestFilter: func(ctx context.Context, ba *roachpb.BatchRequest) *roachpb.Error {
// Intercept Put and ConditionalPut requests to the jobs table
// and, if shouldFail is positive, increment failures and return an
// injected error.
Expand Down Expand Up @@ -6298,7 +6298,7 @@ func TestPaginatedBackupTenant(t *testing.T) {
r.EndKey.Equal(r.Key.PrefixEnd())
}
params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, request roachpb.BatchRequest) *roachpb.Error {
TestingRequestFilter: func(ctx context.Context, request *roachpb.BatchRequest) *roachpb.Error {
for _, ru := range request.Requests {
if exportRequest, ok := ru.GetInner().(*roachpb.ExportRequest); ok &&
!isLeasingExportRequest(exportRequest) {
Expand All @@ -6316,7 +6316,7 @@ func TestPaginatedBackupTenant(t *testing.T) {
}
return nil
},
TestingResponseFilter: func(ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
TestingResponseFilter: func(ctx context.Context, ba *roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
for i, ru := range br.Responses {
if exportRequest, ok := ba.Requests[i].GetInner().(*roachpb.ExportRequest); ok &&
!isLeasingExportRequest(exportRequest) {
Expand Down Expand Up @@ -7156,7 +7156,7 @@ func TestClientDisconnect(t *testing.T) {
blockBackupOrRestore(ctx)
}}},
Store: &kvserver.StoreTestingKnobs{
TestingResponseFilter: func(ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
TestingResponseFilter: func(ctx context.Context, ba *roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
for _, ru := range br.Responses {
switch ru.GetInner().(type) {
case *roachpb.ExportResponse:
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4830,7 +4830,7 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
}
}
requestFilter = kvserverbase.ReplicaRequestFilter(func(
ctx context.Context, ba roachpb.BatchRequest,
ctx context.Context, ba *roachpb.BatchRequest,
) *roachpb.Error {
if ba.Txn == nil || ba.Txn.Name != "changefeed backfill" {
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/kvccl/kvfollowerreadsccl/followerreads.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func canSendToFollower(
st *cluster.Settings,
clock *hlc.Clock,
ctPolicy roachpb.RangeClosedTimestampPolicy,
ba roachpb.BatchRequest,
ba *roachpb.BatchRequest,
) bool {
return kvserver.BatchCanBeEvaluatedOnFollower(ba) &&
closedTimestampLikelySufficient(st, clock, ctPolicy, ba.RequiredFrontier()) &&
Expand Down
12 changes: 7 additions & 5 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,25 +108,25 @@ func TestCanSendToFollower(t *testing.T) {
txn.GlobalUncertaintyLimit = ts
return txn
}
batch := func(txn *roachpb.Transaction, req roachpb.Request) roachpb.BatchRequest {
var ba roachpb.BatchRequest
batch := func(txn *roachpb.Transaction, req roachpb.Request) *roachpb.BatchRequest {
ba := &roachpb.BatchRequest{}
ba.Txn = txn
ba.Add(req)
return ba
}
withBatchTimestamp := func(ba roachpb.BatchRequest, ts hlc.Timestamp) roachpb.BatchRequest {
withBatchTimestamp := func(ba *roachpb.BatchRequest, ts hlc.Timestamp) *roachpb.BatchRequest {
ba.Timestamp = ts
return ba
}
withServerSideBatchTimestamp := func(ba roachpb.BatchRequest, ts hlc.Timestamp) roachpb.BatchRequest {
withServerSideBatchTimestamp := func(ba *roachpb.BatchRequest, ts hlc.Timestamp) *roachpb.BatchRequest {
ba = withBatchTimestamp(ba, ts)
ba.TimestampFromServerClock = (*hlc.ClockTimestamp)(&ts)
return ba
}

testCases := []struct {
name string
ba roachpb.BatchRequest
ba *roachpb.BatchRequest
ctPolicy roachpb.RangeClosedTimestampPolicy
disabledEnterprise bool
disabledFollowerReads bool
Expand Down Expand Up @@ -441,11 +441,13 @@ func TestCanSendToFollower(t *testing.T) {
},
{
name: "non-enterprise",
ba: withBatchTimestamp(batch(nil, &roachpb.GetRequest{}), stale),
disabledEnterprise: true,
exp: false,
},
{
name: "follower reads disabled",
ba: withBatchTimestamp(batch(nil, &roachpb.GetRequest{}), stale),
disabledFollowerReads: true,
exp: false,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) {
},
}
params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error {
TestingRequestFilter: func(_ context.Context, ba *roachpb.BatchRequest) *roachpb.Error {
for _, req := range ba.Requests {
switch r := req.GetInner().(type) {
case *roachpb.RevertRangeRequest:
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/zip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func TestUnavailableZip(t *testing.T) {
close(closedCh)
unavailableCh.Store(closedCh)
knobs := &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, _ roachpb.BatchRequest) *roachpb.Error {
TestingRequestFilter: func(ctx context.Context, _ *roachpb.BatchRequest) *roachpb.Error {
select {
case <-unavailableCh.Load().(chan struct{}):
case <-ctx.Done():
Expand Down
4 changes: 2 additions & 2 deletions pkg/internal/client/requestbatcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,8 +526,8 @@ func (b *batch) rangeID() roachpb.RangeID {
return b.reqs[0].rangeID
}

func (b *batch) batchRequest(cfg *Config) roachpb.BatchRequest {
req := roachpb.BatchRequest{
func (b *batch) batchRequest(cfg *Config) *roachpb.BatchRequest {
req := &roachpb.BatchRequest{
// Preallocate the Requests slice.
Requests: make([]roachpb.RequestUnion, 0, len(b.reqs)),
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/internal/client/requestbatcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ type batchResp struct {

type batchSend struct {
ctx context.Context
ba roachpb.BatchRequest
ba *roachpb.BatchRequest
respChan chan<- batchResp
}

type chanSender chan batchSend

func (c chanSender) Send(
ctx context.Context, ba roachpb.BatchRequest,
ctx context.Context, ba *roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
respChan := make(chan batchResp, 1)
select {
Expand Down Expand Up @@ -501,7 +501,7 @@ func TestMaxKeysPerBatchReq(t *testing.T) {
s := <-sc
assert.Equal(t, int64(5), s.ba.MaxSpanRequestKeys)
assert.Len(t, s.ba.Requests, 3)
br := makeResp(&s.ba, spanMap{
br := makeResp(s.ba, spanMap{
{"d", "g"}: {"d", "g"},
{"a", "d"}: {"c", "d"},
{"b", "m"}: {"c", "m"},
Expand All @@ -513,7 +513,7 @@ func TestMaxKeysPerBatchReq(t *testing.T) {
s = <-sc
assert.Equal(t, int64(5), s.ba.MaxSpanRequestKeys)
assert.Len(t, s.ba.Requests, 3)
br = makeResp(&s.ba, spanMap{
br = makeResp(s.ba, spanMap{
{"d", "g"}: {"e", "g"},
{"c", "d"}: nilResumeSpan,
{"c", "m"}: {"e", "m"},
Expand All @@ -525,7 +525,7 @@ func TestMaxKeysPerBatchReq(t *testing.T) {
s = <-sc
assert.Equal(t, int64(5), s.ba.MaxSpanRequestKeys)
assert.Len(t, s.ba.Requests, 2)
br = makeResp(&s.ba, spanMap{
br = makeResp(s.ba, spanMap{
{"e", "g"}: nilResumeSpan,
{"e", "m"}: {"h", "m"},
})
Expand All @@ -536,7 +536,7 @@ func TestMaxKeysPerBatchReq(t *testing.T) {
s = <-sc
assert.Equal(t, int64(5), s.ba.MaxSpanRequestKeys)
assert.Len(t, s.ba.Requests, 1)
br = makeResp(&s.ba, spanMap{
br = makeResp(s.ba, spanMap{
{"h", "m"}: nilResumeSpan,
})
s.respChan <- batchResp{br: br}
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2728,7 +2728,7 @@ func TestStartableJobTxnRetry(t *testing.T) {
haveInjectedRetry := false
params := base.TestServerArgs{}
params.Knobs.Store = &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, r roachpb.BatchRequest) *roachpb.Error {
TestingRequestFilter: func(ctx context.Context, r *roachpb.BatchRequest) *roachpb.Error {
if r.Txn == nil || r.Txn.Name != txnName {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ func (b *SSTBatcher) addSSTable(
req.SSTTimestampToRequestTimestamp = batchTS
}

ba := roachpb.BatchRequest{
ba := &roachpb.BatchRequest{
Header: roachpb.Header{Timestamp: batchTS, ClientRangeInfo: roachpb.ClientRangeInfo{ExplicitlyRequested: true}},
AdmissionHeader: roachpb.AdmissionHeader{
Priority: int32(admissionpb.BulkNormalPri),
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ func TestReadConsistencyTypes(t *testing.T) {
// Mock out DistSender's sender function to check the read consistency for
// outgoing BatchRequests and return an empty reply.
factory := kv.NonTransactionalFactoryFunc(
func(_ context.Context, ba roachpb.BatchRequest,
func(_ context.Context, ba *roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
if ba.ReadConsistency != rc {
return nil, roachpb.NewErrorf("BatchRequest has unexpected ReadConsistency %s", ba.ReadConsistency)
Expand Down Expand Up @@ -908,7 +908,7 @@ func TestNodeIDAndObservedTimestamps(t *testing.T) {
// Mock out sender function to check that created transactions
// have the observed timestamp set for the configured node ID.
factory := kv.MakeMockTxnSenderFactory(
func(_ context.Context, _ *roachpb.Transaction, ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
func(_ context.Context, _ *roachpb.Transaction, ba *roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
return ba.CreateReply(), nil
})

Expand Down Expand Up @@ -1086,7 +1086,7 @@ func TestRollbackWithCanceledContextInsidious(t *testing.T) {
key := roachpb.Key("a")
ctx, cancel := context.WithCancel(context.Background())
var rollbacks int
storeKnobs.TestingRequestFilter = func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error {
storeKnobs.TestingRequestFilter = func(_ context.Context, ba *roachpb.BatchRequest) *roachpb.Error {
if !ba.IsSingleEndTxnRequest() {
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ var _ Sender = &CrossRangeTxnWrapperSender{}

// Send implements the Sender interface.
func (s *CrossRangeTxnWrapperSender) Send(
ctx context.Context, ba roachpb.BatchRequest,
ctx context.Context, ba *roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
if ba.Txn != nil {
log.Fatalf(ctx, "CrossRangeTxnWrapperSender can't handle transactional requests")
Expand Down Expand Up @@ -824,7 +824,7 @@ func sendAndFill(ctx context.Context, send SenderFunc, b *Batch) error {
// fails. But send() also returns its own errors, so there's some dancing
// here to do because we want to run fillResults() so that the individual
// result gets initialized with an error from the corresponding call.
var ba roachpb.BatchRequest
ba := &roachpb.BatchRequest{}
ba.Requests = b.reqs
ba.Header = b.Header
ba.AdmissionHeader = b.AdmissionHeader
Expand Down Expand Up @@ -965,14 +965,14 @@ func runTxn(ctx context.Context, txn *Txn, retryable func(context.Context, *Txn)
// send runs the specified calls synchronously in a single batch and returns
// any errors. Returns (nil, nil) for an empty batch.
func (db *DB) send(
ctx context.Context, ba roachpb.BatchRequest,
ctx context.Context, ba *roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
return db.sendUsingSender(ctx, ba, db.NonTransactionalSender())
}

// sendUsingSender uses the specified sender to send the batch request.
func (db *DB) sendUsingSender(
ctx context.Context, ba roachpb.BatchRequest, sender Sender,
ctx context.Context, ba *roachpb.BatchRequest, sender Sender,
) (*roachpb.BatchResponse, *roachpb.Error) {
if len(ba.Requests) == 0 {
return nil, nil
Expand Down
Loading

0 comments on commit e822649

Please sign in to comment.