Skip to content

Commit

Permalink
Merge #103602
Browse files Browse the repository at this point in the history
103602: kvstreamer: improve avg response estimation for partial responses r=yuzefovich a=yuzefovich

This commit fixes the avg response estimator when it comes to handling
partial responses. "Partial responses" mean that a single ScanRequest
was evaluated across multiple BatchRequests because the response
happened to be large enough to exceed TargetBytes budget we used. For
example, if a ScanRequest needs to fetch 100KiB of data, but the
Streamer only gives 1KiB TargetBytes budget, then this ScanRequest will
be evaluated across 100 BatchRequests, and each BatchRequest would
contain a single "partial" response of 1KiB in size.

Previously, the avg response estimator would treat these partial
responses independently, so it would come up with the estimate of 1KiB
for that example. However, this is very suboptimal - the goal of the
estimator is to guess the footprint of the whole response to a
single-range request. As a result of the previous behavior, the streamer
could keep on paginating the ScanResponse in very small batches (with
the estimate never increasing) which would then lead to long latency for
query evaluation.

This commit fixes this problem by adjusting what the estimator includes
in the denominator for the average computation. In particular, this
commit makes it so that only "non-resume" responses are included into
that number. The idea is that when we receive the first paginated
response, we increase the counter of responses, but on all consequent
"resume" responses - we don't. This allows us to "update" the footprint
of the big ScanResponse that is being paginated across multiple
BatchRequests. In the example above, our estimate will be growing
exponentially, and as a result, instead of performing 100 BatchRequests,
we will now do only 7.

This requires tracking whether we have received a response to
a particular ScanRequest, and in order to optimize the space usage of
such tracking, a simple utility bitmap package is introduced.

Impact on TPCH (average over 50 runs of `tpchvec/perf`):
```
Q1:	before: 3.21s	after: 3.23s	 0.79%
Q2:	before: 3.38s	after: 3.16s	 -6.42%
Q3:	before: 2.56s	after: 2.55s	 -0.11%
Q4:	before: 1.76s	after: 1.61s	 -8.41%
Q5:	before: 2.55s	after: 2.47s	 -3.37%
Q6:	before: 4.64s	after: 4.65s	 0.21%
Q7:	before: 5.89s	after: 5.56s	 -5.57%
Q8:	before: 1.09s	after: 1.07s	 -1.33%
Q9:	before: 5.61s	after: 5.55s	 -1.05%
Q10:	before: 2.21s	after: 2.09s	 -5.47%
Q11:	before: 0.97s	after: 0.94s	 -2.64%
Q12:	before: 4.88s	after: 4.43s	 -9.31%
Q13:	before: 1.15s	after: 1.01s	 -11.92%
Q14:	before: 0.45s	after: 0.45s	 1.16%
Q15:	before: 2.53s	after: 2.51s	 -0.58%
Q16:	before: 0.92s	after: 0.90s	 -2.38%
Q17:	before: 0.24s	after: 0.24s	 -0.58%
Q18:	before: 2.03s	after: 2.02s	 -0.46%
Q19:	before: 0.48s	after: 0.48s	 -0.99%
Q20:	before: 9.74s	after: 9.71s	 -0.38%
Q21:	before: 5.05s	after: 5.01s	 -0.81%
Q22:	before: 0.58s	after: 0.57s	 -0.38%
```

Addresses: #82164.
Fixes: #103586.

Release note: None

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
  • Loading branch information
craig[bot] and yuzefovich committed May 31, 2023
2 parents 906e7ad + cd8a11d commit 186317e
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 27 deletions.
4 changes: 4 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ ALL_TESTS = [
"//pkg/util/admission:admission_test",
"//pkg/util/binfetcher:binfetcher_test",
"//pkg/util/bitarray:bitarray_test",
"//pkg/util/bitmap:bitmap_test",
"//pkg/util/buildutil:buildutil_test",
"//pkg/util/bulk:bulk_test",
"//pkg/util/cache:cache_test",
Expand Down Expand Up @@ -2147,6 +2148,8 @@ GO_TARGETS = [
"//pkg/util/binfetcher:binfetcher_test",
"//pkg/util/bitarray:bitarray",
"//pkg/util/bitarray:bitarray_test",
"//pkg/util/bitmap:bitmap",
"//pkg/util/bitmap:bitmap_test",
"//pkg/util/bufalloc:bufalloc",
"//pkg/util/buildutil:buildutil",
"//pkg/util/buildutil:buildutil_test",
Expand Down Expand Up @@ -3306,6 +3309,7 @@ GET_X_DATA_TARGETS = [
"//pkg/util/asciitsdb:get_x_data",
"//pkg/util/binfetcher:get_x_data",
"//pkg/util/bitarray:get_x_data",
"//pkg/util/bitmap:get_x_data",
"//pkg/util/bufalloc:get_x_data",
"//pkg/util/buildutil:get_x_data",
"//pkg/util/bulk:get_x_data",
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvstreamer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"//pkg/settings/cluster",
"//pkg/util/admission",
"//pkg/util/admission/admissionpb",
"//pkg/util/bitmap",
"//pkg/util/buildutil",
"//pkg/util/humanizeutil",
"//pkg/util/log",
Expand Down
45 changes: 35 additions & 10 deletions pkg/kv/kvclient/kvstreamer/avg_response_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,14 @@ type avgResponseEstimator struct {
// responseBytes tracks the total footprint of all responses that the
// Streamer has already received.
responseBytes float64
numResponses float64
// numRequestsStarted tracks the number of single-range requests that we
// have started evaluating (regardless of the fact whether the evaluation is
// finished or needs to be resumed). Any request included into a
// BatchRequest can be "not started" if the BatchRequest's limits were
// exhausted before the request's turn came. If the originally-enqueued
// request spanned multiple ranges, then each single-range request is
// tracked separately here.
numRequestsStarted float64
}

const (
Expand All @@ -43,12 +50,29 @@ func (e *avgResponseEstimator) init(sv *settings.Values) {
e.avgResponseSizeMultiple = streamerAvgResponseSizeMultiple.Get(sv)
}

// getAvgResponseSize returns the current estimate of a footprint of a single
// response.
// getAvgResponseSize returns the current estimate of a footprint of a response
// to one single-range request.
//
// This means that if a ScanRequest spans multiple ranges, then this estimate
// should be used for the TargetBytes parameter for each truncated single-range
// ScanRequest independently (rather than the whole multi-range original one).
// Also, it means that if one single-range ScanRequest needs to be paginated
// across BatchRequests (i.e. there will be "resume" ScanRequests), then the
// estimate includes the footprint of all those "resume" responses as well as of
// the first "non-resume" response.
// TODO(yuzefovich): we might want to have a separate estimate for Gets and
// Scans.
func (e *avgResponseEstimator) getAvgResponseSize() int64 {
if e.numResponses == 0 {
if e.numRequestsStarted == 0 {
return initialAvgResponseSize
}
// We're estimating the response size as the average over the received
// responses. Importantly, we divide the total responses' footprint by the
// number of "started" requests. This allows us to handle partial
// ScanResponses that need to be resumed across BatchRequests without
// double-counting them in the denominator of the average computation (which
// would lead to artificially low estimate, see #103586 for an example).
//
// Note that we're multiplying the average by a response size multiple for a
// couple of reasons:
//
Expand Down Expand Up @@ -77,13 +101,14 @@ func (e *avgResponseEstimator) getAvgResponseSize() int64 {
// responses, but it is likely to be suboptimal because it would be unfair
// to "large" batches that come in late (i.e. it would not be reactive
// enough). Consider using another function here.
return int64(e.responseBytes / e.numResponses * e.avgResponseSizeMultiple)
return int64(e.responseBytes / e.numRequestsStarted * e.avgResponseSizeMultiple)
}

// update updates the actual information of the estimator based on numResponses
// responses that took up responseBytes bytes in footprint and correspond to a
// single BatchResponse.
func (e *avgResponseEstimator) update(responseBytes int64, numResponses int64) {
// update updates the actual information of the estimator based on responses
// that took up responseBytes bytes in footprint and correspond to a single
// BatchResponse. numRequestsStarted indicates the number of requests that were
// just started in that BatchResponse.
func (e *avgResponseEstimator) update(responseBytes int64, numRequestsStarted int) {
e.responseBytes += float64(responseBytes)
e.numResponses += float64(numResponses)
e.numRequestsStarted += float64(numRequestsStarted)
}
48 changes: 42 additions & 6 deletions pkg/kv/kvclient/kvstreamer/avg_response_estimator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,30 @@ import (
"github.com/stretchr/testify/require"
)

func withMultiple(s int64) int64 {
return int64(float64(s) * defaultAvgResponseSizeMultiple)
}

func TestAvgResponseEstimator(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

e := avgResponseEstimator{avgResponseSizeMultiple: defaultAvgResponseSizeMultiple}
withMultiple := func(s int64) int64 {
return int64(float64(s) * defaultAvgResponseSizeMultiple)
}

// Before receiving any responses, we should be using the initial estimate.
require.Equal(t, int64(initialAvgResponseSize), e.getAvgResponseSize())

// Simulate receiving a single response.
firstResponseSize := int64(42)
e.update(firstResponseSize, 1)
e.update(firstResponseSize, 1 /* numRequestsStarted */)
// The estimate should now be the size of that single response times
// defaultAvgResponseSizeMultiple.
require.Equal(t, withMultiple(firstResponseSize), e.getAvgResponseSize())

// Simulate receiving 100 small BatchResponses.
smallResponseSize := int64(63)
for i := 0; i < 100; i++ {
e.update(smallResponseSize*5, 5)
e.update(smallResponseSize*5, 5 /* numRequestsStarted */)
}
// The estimate should now be pretty close to the size of a single response
// in the small BatchResponse (after adjusting with the multiple).
Expand All @@ -51,10 +52,45 @@ func TestAvgResponseEstimator(t *testing.T) {
// Now simulate receiving 10 large BatchResponses.
largeResponseSize := int64(17)
for i := 0; i < 10; i++ {
e.update(largeResponseSize*1000, 1000)
e.update(largeResponseSize*1000, 1000 /* numRequestsStarted */)
}
// The estimate should now be pretty close to the size of a single response
// in the large BatchResponse (after adjusting with the multiple).
diff = withMultiple(largeResponseSize) - e.getAvgResponseSize()
require.True(t, math.Abs(float64(diff))/float64(smallResponseSize) < 0.15)
}

func TestAvgResponseSizeForPartialResponses(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

e := avgResponseEstimator{avgResponseSizeMultiple: defaultAvgResponseSizeMultiple}
// Simulate a ScanRequest that needs to fetch 100 rows, 1KiB each in size.
const totalRows, rowSize = 100, 1 << 10
rowsLeft := int64(totalRows)
var batchRequestsCount int
for ; rowsLeft > 0; batchRequestsCount++ {
targetBytes := e.getAvgResponseSize()
rowsReceived := targetBytes / rowSize
if rowsReceived > rowsLeft {
rowsReceived = rowsLeft
}
rowsLeft -= rowsReceived
// Only the first BatchRequest starts evaluation of the ScanRequest.
var numRequestsStarted int
if batchRequestsCount == 0 {
numRequestsStarted = 1
}
e.update(rowsReceived*rowSize, numRequestsStarted)
}
// We started with the TargetBytes equal to the initial estimate of 1KiB,
// and then with each update the estimate should have grown. In particular,
// we expect 7 BatchRequests total that fetch 1, 1, 3, 7, 18, 45, 25 rows
// respectively (note that the growth is faster than 2x because we use 1.5
// multiple on top of the average).
require.Equal(t, 7, batchRequestsCount)
// From the perspective of the response estimator, we received only one
// response (that happened to be paginated across BatchRequests), so our
// estimate should match exactly the total size.
require.Equal(t, withMultiple(totalRows*rowSize), e.getAvgResponseSize())
}
15 changes: 13 additions & 2 deletions pkg/kv/kvclient/kvstreamer/requests_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/bitmap"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -75,15 +76,25 @@ type singleRangeBatch struct {
// subRequestIdx is only allocated in InOrder mode when
// Hints.SingleRowLookup is false and some Scan requests were enqueued.
subRequestIdx []int32
// isScanStarted tracks whether we have already received at least one
// response for the corresponding ScanRequest (i.e. whether the ScanRequest
// has been started). In particular, if enqueuedReqs[i] is a
// ScanRequest, then isScanStarted.IsSet(i) will return true once at least
// one ScanResponse was received while evaluating enqueuedReqs[i].
//
// This bitmap is preserved and "accumulated" across singleRangeBatches.
//
// isScanStarted is only allocated if at least one Scan request was enqueued.
isScanStarted *bitmap.Bitmap
// numGetsInReqs tracks the number of Get requests in reqs.
numGetsInReqs int64
// reqsReservedBytes tracks the memory reservation against the budget for
// the memory usage of reqs, excluding the overhead.
reqsReservedBytes int64
// overheadAccountedFor tracks the memory reservation against the budget for
// the overhead of the reqs slice (i.e. of kvpb.RequestUnion objects) as
// well as the positions and the subRequestIdx slices. Since we reuse these
// slices for the resume requests, this can be released only when the
// well as positions, subRequestIdx, and isScanStarted. Since we reuse these
// things for the resume requests, this can be released only when the
// BatchResponse doesn't have any resume spans.
overheadAccountedFor int64
// minTargetBytes, if positive, indicates the minimum TargetBytes limit that
Expand Down
36 changes: 27 additions & 9 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/bitmap"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -599,14 +600,20 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []kvpb.RequestUnion) (retEr
}
}

var isScanStarted *bitmap.Bitmap
if numScansInReqs > 0 {
isScanStarted = bitmap.NewBitmap(len(reqs))
}
numGetsInReqs := int64(len(singleRangeReqs)) - numScansInReqs
overheadAccountedFor := requestUnionSliceOverhead + requestUnionOverhead*int64(cap(singleRangeReqs)) + // reqs
intSliceOverhead + intSize*int64(cap(positions)) + // positions
subRequestIdxOverhead // subRequestIdx
subRequestIdxOverhead + // subRequestIdx
isScanStarted.MemUsage() // isScanStarted
r := singleRangeBatch{
reqs: singleRangeReqs,
positions: positions,
subRequestIdx: subRequestIdx,
isScanStarted: isScanStarted,
numGetsInReqs: numGetsInReqs,
reqsReservedBytes: requestsMemUsage(singleRangeReqs),
overheadAccountedFor: overheadAccountedFor,
Expand Down Expand Up @@ -1371,6 +1378,11 @@ type singleRangeBatchResponseFootprint struct {
// need to be created for Get and Scan responses, respectively.
numGetResults, numScanResults int
numIncompleteGets, numIncompleteScans int
// numStartedScans indicates how many Scan requests were just started. If a
// Scan response was received due to a Scan request that was the "resume"
// request (i.e. the pagination of the previous request), it's not included
// in this number.
numStartedScans int
}

func (fp singleRangeBatchResponseFootprint) hasResults() bool {
Expand Down Expand Up @@ -1424,6 +1436,12 @@ func calculateFootprint(
if len(scan.BatchResponses) > 0 || scan.ResumeSpan == nil {
fp.responsesOverhead += scanResponseOverhead
fp.numScanResults++
if pos := req.positions[i]; !req.isScanStarted.IsSet(pos) {
// This is the first response to the enqueuedReqs[pos] Scan
// request.
fp.numStartedScans++
req.isScanStarted.Set(pos)
}
}
if scan.ResumeSpan != nil {
// This Scan wasn't completed.
Expand Down Expand Up @@ -1484,12 +1502,10 @@ func processSingleRangeResults(
defer s.budget.mu.Unlock()
s.mu.Lock()

// TODO(yuzefovich): some of the responses might be partial, yet the
// estimator doesn't distinguish the footprint of the full response vs
// the partial one. Think more about this.
s.mu.avgResponseEstimator.update(
fp.memoryFootprintBytes, int64(fp.numGetResults+fp.numScanResults),
)
// Gets cannot be resumed, so we include all of them here, but Scans can be
// resumed, so we only include Scans that were just started.
numRequestsStarted := fp.numGetResults + fp.numStartedScans
s.mu.avgResponseEstimator.update(fp.memoryFootprintBytes, numRequestsStarted)

// If we have any Scan results to create and the Scan requests can return
// multiple rows, we'll need to consult s.mu.numRangesPerScanRequest, so
Expand Down Expand Up @@ -1619,11 +1635,13 @@ func buildResumeSingleRangeBatch(
s *Streamer, req singleRangeBatch, br *kvpb.BatchResponse, fp singleRangeBatchResponseFootprint,
) (resumeReq singleRangeBatch) {
numIncompleteRequests := fp.numIncompleteGets + fp.numIncompleteScans
// We have to allocate the new Get and Scan requests, but we can reuse the
// reqs and the positions slices.
// We have to allocate the new Get and Scan requests, but we can reuse reqs,
// positions, and subRequestIdx slices.
resumeReq.reqs = req.reqs[:numIncompleteRequests]
resumeReq.positions = req.positions[:0]
resumeReq.subRequestIdx = req.subRequestIdx[:0]
// isScanStarted actually needs to be preserved between singleRangeBatches.
resumeReq.isScanStarted = req.isScanStarted
resumeReq.numGetsInReqs = int64(fp.numIncompleteGets)
// We've already reconciled the budget with the actual reservation for the
// requests with the ResumeSpans.
Expand Down
22 changes: 22 additions & 0 deletions pkg/util/bitmap/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "bitmap",
srcs = ["bitmap.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/util/bitmap",
visibility = ["//visibility:public"],
)

go_test(
name = "bitmap_test",
srcs = ["bitmap_test.go"],
args = ["-test.timeout=295s"],
embed = [":bitmap"],
deps = [
"//pkg/util/randutil",
"@com_github_stretchr_testify//require",
],
)

get_x_data(name = "get_x_data")
49 changes: 49 additions & 0 deletions pkg/util/bitmap/bitmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package bitmap

import "unsafe"

// Bitmap is a helper struct that tracks whether a particular bit has been
// "set".
type Bitmap struct {
bits []byte
}

// NewBitmap returns a new Bitmap that supports [0, n) range of integers. n is
// expected to be positive.
func NewBitmap(n int) *Bitmap {
return &Bitmap{bits: make([]byte, (n-1)/8+1)}
}

// bitMask[i] is a byte with a single bit set at ith position.
var bitMask = [8]byte{0x1, 0x2, 0x4, 0x8, 0x10, 0x20, 0x40, 0x80}

// Set sets the ith bit in the Bitmap.
func (bm *Bitmap) Set(i int) {
bm.bits[i>>3] |= bitMask[i&7]
}

// IsSet returns true if the ith bit is set in the Bitmap.
func (bm *Bitmap) IsSet(i int) bool {
return bm.bits[i>>3]&bitMask[i&7] != 0
}

var bitmapOverhead = int64(unsafe.Sizeof(Bitmap{}))

// MemUsage returns the memory footprint of the Bitmap in bytes. Safe to be
// called on nil Bitmap.
func (bm *Bitmap) MemUsage() int64 {
if bm == nil {
return 0
}
return bitmapOverhead + int64(cap(bm.bits))
}
Loading

0 comments on commit 186317e

Please sign in to comment.