From d38de4e3864fb4aaeaf8ee5b1cecd7409e2482be Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 9 Feb 2022 09:03:43 -0800 Subject: [PATCH 1/4] kvstreamer: create Results for empty Get responses We recently merged a change to skip Get responses that came back empty (because there is no corresponding key found) in the `Streamer` which cleaned up some code. However, for the upcoming work on the `InOrder` mode the `Streamer` needs to know about such responses. We could introduce a separate tracking mechanism for empty Get responses and continue not emitting them, but that seems like unnecessary complication to the code, so this commit, instead, restores the creation of Results for empty Get responses. Additionally it fixes a nit in the test where we hard-code the table ID (table IDs recently have changed a bit). Release note: None --- pkg/kv/kvclient/kvstreamer/streamer.go | 49 ++++++++++----------- pkg/kv/kvclient/kvstreamer/streamer_test.go | 10 ++--- pkg/sql/row/kv_batch_streamer.go | 26 ++++++++--- 3 files changed, 46 insertions(+), 39 deletions(-) diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index 5be0801cc5d5..e111b0585e15 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -1249,22 +1249,17 @@ func (w *workerCoordinator) processSingleRangeResults( resumeReqIdx++ } else { // This Get was completed. - if get.Value != nil { - // Create a Result only for non-empty Get responses. - result := Result{ - GetResp: get, - // This currently only works because all requests - // are unique. - EnqueueKeysSatisfied: []int{enqueueKey}, - position: req.positions[i], - } - result.memoryTok.streamer = w.s - result.memoryTok.toRelease = getResponseSize(get) - memoryTokensBytes += result.memoryTok.toRelease - results = append(results, result) + result := Result{ + GetResp: get, + // This currently only works because all requests are + // unique. + EnqueueKeysSatisfied: []int{enqueueKey}, + position: req.positions[i], } - // Note that we count this Get response as complete regardless - // of the fact whether it is empty or not. + result.memoryTok.streamer = w.s + result.memoryTok.toRelease = getResponseSize(get) + memoryTokensBytes += result.memoryTok.toRelease + results = append(results, result) numCompleteGetResponses++ } @@ -1318,9 +1313,11 @@ func (w *workerCoordinator) processSingleRangeResults( } } - w.finalizeSingleRangeResults( - results, memoryFootprintBytes, hasNonEmptyScanResponse, numCompleteGetResponses, - ) + if len(results) > 0 { + w.finalizeSingleRangeResults( + results, memoryFootprintBytes, hasNonEmptyScanResponse, numCompleteGetResponses, + ) + } // If we have any incomplete requests, add them back into the work // pool. @@ -1333,12 +1330,19 @@ func (w *workerCoordinator) processSingleRangeResults( // singleRangeBatch. By "finalization" we mean setting Complete field of // ScanResp to correct value for all scan responses, updating the estimate of an // average response size, and telling the Streamer about these results. +// +// This method assumes that results has length greater than zero. func (w *workerCoordinator) finalizeSingleRangeResults( results []Result, actualMemoryReservation int64, hasNonEmptyScanResponse bool, numCompleteGetResponses int, ) { + if buildutil.CrdbTestBuild { + if len(results) == 0 { + panic(errors.AssertionFailedf("finalizeSingleRangeResults is called with no results")) + } + } w.s.mu.Lock() defer w.s.mu.Unlock() @@ -1381,14 +1385,7 @@ func (w *workerCoordinator) finalizeSingleRangeResults( w.s.mu.numCompleteRequests += numCompleteResponses w.s.mu.numUnreleasedResults += len(results) w.s.mu.results = append(w.s.mu.results, results...) - if len(results) > 0 || numCompleteResponses > 0 { - // We want to signal the condition variable when either we have some - // results to return to the client or we received some empty responses. - // The latter is needed so that the client doesn't block forever - // thinking there are more requests in flight when, in fact, all - // responses have already come back empty. - w.s.mu.hasResults.Signal() - } + w.s.mu.hasResults.Signal() } var zeroIntSlice []int diff --git a/pkg/kv/kvclient/kvstreamer/streamer_test.go b/pkg/kv/kvclient/kvstreamer/streamer_test.go index 3005984aaebc..65ab114468f2 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer_test.go +++ b/pkg/kv/kvclient/kvstreamer/streamer_test.go @@ -91,12 +91,10 @@ func TestStreamerLimitations(t *testing.T) { streamer := getStreamer() defer streamer.Close() streamer.Init(OutOfOrder, Hints{UniqueRequests: true}, 1 /* maxKeysPerRow */) - // Use a Scan request for this test case because Gets of non-existent - // keys aren't added to the results. - scan := roachpb.NewScan(roachpb.Key("key"), roachpb.Key("key1"), false /* forUpdate */) + get := roachpb.NewGet(roachpb.Key("key"), false /* forUpdate */) reqs := []roachpb.RequestUnion{{ - Value: &roachpb.RequestUnion_Scan{ - Scan: scan.(*roachpb.ScanRequest), + Value: &roachpb.RequestUnion_Get{ + Get: get.(*roachpb.GetRequest), }, }} require.NoError(t, streamer.Enqueue(ctx, reqs, nil /* enqueueKeys */)) @@ -215,7 +213,7 @@ func TestStreamerBudgetErrorInEnqueue(t *testing.T) { var get roachpb.GetRequest var union roachpb.RequestUnion_Get key := make([]byte, keySize+6) - key[0] = 190 + key[0] = 240 key[1] = 137 key[2] = 18 for i := 0; i < keySize; i++ { diff --git a/pkg/sql/row/kv_batch_streamer.go b/pkg/sql/row/kv_batch_streamer.go index 0aeb1cf5fe32..a1b48946a9d7 100644 --- a/pkg/sql/row/kv_batch_streamer.go +++ b/pkg/sql/row/kv_batch_streamer.go @@ -88,19 +88,24 @@ func NewTxnKVStreamer( // GetResponses). func (f *TxnKVStreamer) proceedWithLastResult( ctx context.Context, -) (kvs []roachpb.KeyValue, batchResp []byte, err error) { +) (skip bool, kvs []roachpb.KeyValue, batchResp []byte, err error) { result := f.lastResultState.Result if get := result.GetResp; get != nil { if get.IntentValue != nil { - return nil, nil, errors.AssertionFailedf( + return false, nil, nil, errors.AssertionFailedf( "unexpectedly got an IntentValue back from a SQL GetRequest %v", *get.IntentValue, ) } + if get.Value == nil { + // Nothing found in this particular response, so we skip it. + f.releaseLastResult(ctx) + return true, nil, nil, nil + } pos := result.EnqueueKeysSatisfied[f.lastResultState.numEmitted] origSpan := f.spans[pos] f.lastResultState.numEmitted++ f.getResponseScratch[0] = roachpb.KeyValue{Key: origSpan.Key, Value: *get.Value} - return f.getResponseScratch[:], nil, nil + return false, f.getResponseScratch[:], nil, nil } scan := result.ScanResp if len(scan.BatchResponses) > 0 { @@ -111,7 +116,7 @@ func (f *TxnKVStreamer) proceedWithLastResult( } // Note that scan.Rows and batchResp might be nil when the ScanResponse is // empty, and the caller (the KVFetcher) will skip over it. - return scan.Rows, batchResp, nil + return false, scan.Rows, batchResp, nil } func (f *TxnKVStreamer) releaseLastResult(ctx context.Context) { @@ -137,7 +142,7 @@ func (f *TxnKVStreamer) nextBatch( if f.lastResultState.numEmitted < len(f.lastResultState.EnqueueKeysSatisfied) { // Note that we should never get an error here since we're processing // the same result again. - kvs, batchResp, err = f.proceedWithLastResult(ctx) + _, kvs, batchResp, err = f.proceedWithLastResult(ctx) return true, kvs, batchResp, err } @@ -147,7 +152,7 @@ func (f *TxnKVStreamer) nextBatch( } // Process the next result we have already received from the streamer. - if len(f.results) > 0 { + for len(f.results) > 0 { // Peel off the next result and set it into lastResultState. f.lastResultState.Result = f.results[0] f.lastResultState.numEmitted = 0 @@ -156,7 +161,14 @@ func (f *TxnKVStreamer) nextBatch( // the next iteration. f.results[0] = kvstreamer.Result{} f.results = f.results[1:] - kvs, batchResp, err = f.proceedWithLastResult(ctx) + var skip bool + skip, kvs, batchResp, err = f.proceedWithLastResult(ctx) + if err != nil { + return false, nil, nil, err + } + if skip { + continue + } return true, kvs, batchResp, err } From d4840b32dfffb1233e551060b12e5295e37c3561 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 17 Feb 2022 13:52:09 -0800 Subject: [PATCH 2/4] colfetcher: fix memory limiting in the ColIndexJoin `ColIndexJoin` operator carefully measures the footprint of the input rows that it uses to generate spans to lookup. It wants to find the largest number of rows that have the total footprint not exceeding the target size (4MiB or a quarter of `workmem`, whichever is smaller), but always it includes at least one row into the "input batch". Previously, it incorrectly could include an extra row when the limit is reached because we incorrectly determined whether any rows are already included into the input batch (essentially we always thought "no"). This commit fixes this problem. Release note: None --- pkg/sql/colfetcher/index_join.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sql/colfetcher/index_join.go b/pkg/sql/colfetcher/index_join.go index 397eca79f0b4..bb0cba71cc61 100644 --- a/pkg/sql/colfetcher/index_join.go +++ b/pkg/sql/colfetcher/index_join.go @@ -177,7 +177,7 @@ func (s *ColIndexJoin) Next() coldata.Batch { // Because index joins discard input rows, we do not have to maintain a // reference to input tuples after span generation. So, we can discard // the input batch reference on each iteration. - endIdx := s.findEndIndex(len(spans) > 0) + endIdx := s.findEndIndex(rowCount > 0) rowCount += endIdx - s.startIdx s.spanAssembler.ConsumeBatch(s.batch, s.startIdx, endIdx) s.startIdx = endIdx From 8f7a79b484c337c3b8ddd7b933e6945e616cc486 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 17 Feb 2022 20:06:34 -0800 Subject: [PATCH 3/4] kvstreamer: fix a couple of bugs This commit fixes a rare concurrency bug where the newly enqueued requests could be incorrectly sliced off. This could occur because it is possible for a request to be issued asynchronously, responded to, returned to the client before the deferred function in `issueRequestsForAsyncProcessing` is executed. For the time being the fix is to just append the newly-enqueued requests to the old ones, but the follow-up work on the `InOrder` mode will clean this up properly. Additionally, this commit fixes another deadlock that could occur whenever the last request in flight returns without creating any results and the worker coordinator is waiting for more budget at the same time. Previously, it was possible for the signal on the condition variable to be missed by the worker coordinator, so it could wait forever. There are no explicit regression tests for these bugs because they are exercised by the improved testing in the following commit. Release note: None --- pkg/kv/kvclient/kvstreamer/streamer.go | 44 +++++++++++++++++++++++--- 1 file changed, 40 insertions(+), 4 deletions(-) diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index e111b0585e15..c014b32d2ff5 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -550,7 +550,14 @@ func (s *Streamer) Enqueue( } // Memory reservation was approved, so the requests are good to go. - s.mu.requestsToServe = requestsToServe + // + // Note that we do not just overwrite s.mu.requestsToServe because it is + // possible that the worker coordinator has not sliced off the issued + // requests yet. In other words, it is possible for a request to be issued, + // responded to, returned to the client before the deferred function in + // issueRequestsForAsyncProcessing is executed. + // TODO(yuzefovich): clean this up. + s.mu.requestsToServe = append(s.mu.requestsToServe, requestsToServe...) s.mu.hasWork.Signal() return nil } @@ -969,7 +976,36 @@ func (w *workerCoordinator) addRequest(req singleRangeBatch) { w.s.mu.hasWork.Signal() } -func (w *workerCoordinator) asyncRequestCleanup() { +// budgetMuAlreadyLocked must be true if the caller is currently holding the +// budget's mutex. +func (w *workerCoordinator) asyncRequestCleanup(budgetMuAlreadyLocked bool) { + if !budgetMuAlreadyLocked { + // Since we're decrementing the number of requests in flight, we want to + // make sure that the budget's mutex is locked, and it currently isn't. + // This is needed so that if we signal the budget in + // adjustNumRequestsInFlight, the worker coordinator doesn't miss the + // signal. + // + // If we don't do this, then it is possible for the worker coordinator + // to be blocked forever in waitUntilEnoughBudget. Namely, the following + // sequence of events is possible: + // 1. the worker coordinator checks that there are some requests in + // progress, then it goes to sleep before waiting on waitForBudget + // condition variable; + // 2. the last request in flight exits without creating any Results (so + // that no Release() calls will happen in the future), it decrements + // the number of requests in flight, signals waitForBudget condition + // variable, but nobody is waiting on that, so no goroutine is woken + // up; + // 3. the worker coordinator wakes up and starts waiting on the + // condition variable, forever. + // Acquiring the budget's mutex makes sure that such sequence doesn't + // occur. + w.s.budget.mu.Lock() + defer w.s.budget.mu.Unlock() + } else { + w.s.budget.mu.AssertHeld() + } w.s.adjustNumRequestsInFlight(-1 /* delta */) w.s.waitGroup.Done() } @@ -1007,7 +1043,7 @@ func (w *workerCoordinator) performRequestAsync( WaitForSem: true, }, func(ctx context.Context) { - defer w.asyncRequestCleanup() + defer w.asyncRequestCleanup(false /* budgetMuAlreadyLocked */) var ba roachpb.BatchRequest ba.Header.WaitPolicy = w.lockWaitPolicy ba.Header.TargetBytes = targetBytes @@ -1123,7 +1159,7 @@ func (w *workerCoordinator) performRequestAsync( }); err != nil { // The new goroutine for the request wasn't spun up, so we have to // perform the cleanup of this request ourselves. - w.asyncRequestCleanup() + w.asyncRequestCleanup(true /* budgetMuAlreadyLocked */) w.s.setError(err) } } From beec1aec80c2afa7d1d845a43b08f2da0491e0fe Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 16 Feb 2022 20:27:47 -0800 Subject: [PATCH 4/4] kvstreamer: improve behavior when responses vary in size This commit improves the behavior of the `Streamer` when the responses vary in size significantly. Previously, we could get into a situation when the `Streamer` issues many requests that come back with empty responses because `TargetBytes` limit is too small. This can be the case when some small responses come back first making our estimate for the average response size to be too small, and we later would use that estimate to set the `TargetBytes` limit on the new requests. If those new requests have large responses, we would keep getting empty responses until a request becomes the "head-of-the-line" (we could issue hundreds of such requests that would receive empty response). This behavior is now fixed by utilizing `ResumeNextBytes` hint from the KV layer about the size of the next KV pair. The KV layer on a best-effort basis provides this hint whenever it cannot fulfill the request fully. If a particular request keeps getting an empty response, we ensure that it is always issued with a larger `TargetBytes` limit every time. This commit additionally adds some `Printf` statements hidden behind a constant `debug=false` flag. I want to keep this in code for the time being since it's been very helpful during the debugging. Since it's a compile time constant set to `false`, it should have no performance impact. In order to verify that the `Streamer` does, in fact, make a reasonable number of KV requests, an existing test has been refactored and extended. In particular, it now looks at the trace of the query and counts how make async requests are issued by the `Streamer`. Release note: None --- pkg/kv/kvclient/kvstreamer/BUILD.bazel | 4 +- pkg/kv/kvclient/kvstreamer/large_keys_test.go | 189 ++++++++++++++++++ pkg/kv/kvclient/kvstreamer/streamer.go | 157 +++++++++++++-- pkg/kv/kvclient/kvstreamer/streamer_test.go | 73 ------- 4 files changed, 327 insertions(+), 96 deletions(-) create mode 100644 pkg/kv/kvclient/kvstreamer/large_keys_test.go diff --git a/pkg/kv/kvclient/kvstreamer/BUILD.bazel b/pkg/kv/kvclient/kvstreamer/BUILD.bazel index 7f73a4818703..4852f6938efc 100644 --- a/pkg/kv/kvclient/kvstreamer/BUILD.bazel +++ b/pkg/kv/kvclient/kvstreamer/BUILD.bazel @@ -31,6 +31,7 @@ go_test( name = "kvstreamer_test", srcs = [ "avg_response_estimator_test.go", + "large_keys_test.go", "main_test.go", "streamer_test.go", ], @@ -45,12 +46,13 @@ go_test( "//pkg/security/securitytest", "//pkg/server", "//pkg/settings/cluster", - "//pkg/testutils", + "//pkg/sql", "//pkg/testutils/serverutils", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/mon", "//pkg/util/randutil", + "//pkg/util/tracing", "@com_github_cockroachdb_errors//:errors", "@com_github_dustin_go_humanize//:go-humanize", "@com_github_stretchr_testify//require", diff --git a/pkg/kv/kvclient/kvstreamer/large_keys_test.go b/pkg/kv/kvclient/kvstreamer/large_keys_test.go new file mode 100644 index 000000000000..5ce034202de5 --- /dev/null +++ b/pkg/kv/kvclient/kvstreamer/large_keys_test.go @@ -0,0 +1,189 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvstreamer_test + +import ( + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/dustin/go-humanize" + "github.com/stretchr/testify/require" +) + +// TestLargeKeys verifies that the Streamer successfully completes the queries +// when the keys to lookup (i.e. the enqueued requests themselves) as well as +// the looked up rows are large. It additionally ensures that the Streamer +// issues a reasonable number of the KV request. +func TestLargeKeys(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testCases := []struct { + name string + query string + }{ + { + name: "no ordering", + query: "SELECT * FROM foo@foo_attribute_idx WHERE attribute=1", + }, + } + + rng, _ := randutil.NewTestRand() + recCh := make(chan tracing.Recording, 1) + // We want to capture the trace of the query so that we can count how many + // KV requests the Streamer issued. + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SQLExecutor: &sql.ExecutorTestingKnobs{ + WithStatementTrace: func(trace tracing.Recording, stmt string) { + for _, tc := range testCases { + if tc.query == stmt { + recCh <- trace + } + } + }, + }, + }, + }) + ctx := context.Background() + defer s.Stopper().Stop(ctx) + + // Lower the distsql_workmem limit so that we can operate with smaller + // blobs. Note that the joinReader in the row-by-row engine will override + // the limit if it is lower than 100KiB, so we cannot go lower than that + // here. + _, err := db.Exec("SET distsql_workmem='100KiB'") + require.NoError(t, err) + // In both engines, the index joiner will buffer input rows up to a quarter + // of workmem limit, so we have several interesting options for the blob + // size: + // - around 5000 is interesting because multiple requests are enqueued at + // same time and there might be parallelism going on within the Streamer. + // - 20000 is interesting because it doesn't exceed the buffer size, yet two + // rows with such blobs do exceed it. The index joiners are expected to to + // process each row on its own. + // - 40000 is interesting because a single row already exceeds the buffer + // size. + for _, pkBlobSize := range []int{3000 + rng.Intn(4000), 20000, 40000} { + // useScans indicates whether we want Scan requests to be used by the + // Streamer (if we do, then we need to have multiple column families). + for _, useScans := range []bool{false, true} { + // onlyLarge determines whether only large blobs are inserted or a + // mix of large and small blobs. + for _, onlyLarge := range []bool{false, true} { + _, err = db.Exec("DROP TABLE IF EXISTS foo") + require.NoError(t, err) + // We set up such a table that contains two large columns, one + // of them being the primary key. The idea is that the test + // query will first read from the secondary index which would + // include only the PK blob, and that will be used to construct + // index join lookups (i.e. the PK blobs will be the enqueued + // requests for the Streamer) whereas the other blob will be + // part of the response. + var familiesSuffix string + // In order to use Scan requests we need to have multiple column + // families. + if useScans { + familiesSuffix = ", FAMILY (pk_blob, attribute, extra), FAMILY (blob)" + } + _, err = db.Exec(fmt.Sprintf( + `CREATE TABLE foo ( + pk_blob STRING PRIMARY KEY, attribute INT8, extra INT8, blob STRING, + INDEX (attribute)%s + );`, familiesSuffix)) + require.NoError(t, err) + + // Insert some number of rows. + numRows := rng.Intn(10) + 10 + for i := 0; i < numRows; i++ { + letter := string(byte('a') + byte(i)) + valueSize := pkBlobSize + if !onlyLarge && rng.Float64() < 0.5 { + // If we're using a mix of large and small values, with + // 50% use a small value now. + valueSize = rng.Intn(10) + 1 + } + // We randomize the value size for 'blob' column to improve + // the test coverage. + blobSize := int(float64(valueSize)*2.0*rng.Float64()) + 1 + _, err = db.Exec("INSERT INTO foo SELECT repeat($1, $2), 1, 1, repeat($1, $3);", letter, valueSize, blobSize) + require.NoError(t, err) + } + + // Try two scenarios: one with a single range (so no parallelism + // within the Streamer) and another with a random number of + // ranges (which might add parallelism within the Streamer). + // + // Note that a single range scenario needs to be exercised first + // in order to reuse the same table without dropping it (we + // don't want to deal with merging ranges). + for _, newRangeProbability := range []float64{0, rng.Float64()} { + for i := 1; i < numRows; i++ { + if rng.Float64() < newRangeProbability { + // Create a new range. + letter := string(byte('a') + byte(i)) + _, err = db.Exec("ALTER TABLE foo SPLIT AT VALUES ($1);", letter) + require.NoError(t, err) + } + } + // Populate the range cache. + _, err = db.Exec("SELECT count(*) FROM foo") + require.NoError(t, err) + + for _, vectorizeMode := range []string{"on", "off"} { + _, err = db.Exec("SET vectorize = " + vectorizeMode) + require.NoError(t, err) + for _, tc := range testCases { + t.Run(fmt.Sprintf( + "%s/size=%s/scans=%t/onlyLarge=%t/numRows=%d/newRangeProb=%.2f/vec=%s", + tc.name, humanize.Bytes(uint64(pkBlobSize)), useScans, + onlyLarge, numRows, newRangeProbability, vectorizeMode, + ), + func(t *testing.T) { + _, err = db.Exec(tc.query) + require.NoError(t, err) + // Now examine the trace and count the async + // requests issued by the Streamer. + tr := <-recCh + var numStreamerRequests int + for _, rs := range tr { + if rs.Operation == kvstreamer.AsyncRequestOp { + numStreamerRequests++ + } + } + // Assert that the number of requests is + // reasonable using the number of rows as + // the proxy for how many requests need to + // be issued. We expect some requests to + // come back empty because of a low initial + // TargetBytes limit, some requests might + // get an empty result multiple times while + // we're figuring out the correct limit, so + // we use a 4x multiple on the number of + // rows. + require.Greater(t, 4*numRows, numStreamerRequests) + }) + } + } + } + } + } + } +} diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index c014b32d2ff5..8990f7d5aa82 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -12,6 +12,7 @@ package kvstreamer import ( "context" + "fmt" "runtime" "sort" "sync" @@ -33,6 +34,9 @@ import ( "github.com/cockroachdb/errors" ) +// TODO(yuzefovich): remove this once the Streamer is stabilized. +const debug = false + // OperationMode describes the mode of operation of the Streamer. type OperationMode int @@ -550,7 +554,9 @@ func (s *Streamer) Enqueue( } // Memory reservation was approved, so the requests are good to go. - // + if debug { + fmt.Printf("enqueuing %s to serve\n", reqsToString(requestsToServe)) + } // Note that we do not just overwrite s.mu.requestsToServe because it is // possible that the worker coordinator has not sliced off the issued // requests yet. In other words, it is possible for a request to be issued, @@ -597,8 +603,22 @@ func (s *Streamer) GetResults(ctx context.Context) ([]Result, error) { s.mu.results = nil allComplete := s.mu.numCompleteRequests == s.mu.numEnqueuedRequests if len(results) > 0 || allComplete || s.mu.err != nil { + if debug { + if len(results) > 0 { + fmt.Printf("returning %s to the client\n", resultsToString(results)) + } else { + suffix := "all requests have been responded to" + if !allComplete { + suffix = fmt.Sprintf("%v", s.mu.err) + } + fmt.Printf("returning no results to the client because %s\n", suffix) + } + } return results, s.mu.err } + if debug { + fmt.Println("client blocking to wait for results") + } s.mu.hasResults.Wait() // Check whether the Streamer has been canceled or closed while we were // waiting for the results. @@ -703,6 +723,11 @@ type singleRangeBatch struct { // reqsReservedBytes tracks the memory reservation against the budget for // the memory usage of reqs. reqsReservedBytes int64 + // minTargetBytes, if positive, indicates the minimum TargetBytes limit that + // this singleRangeBatch should be sent with in order for the response to + // not be empty. Note that TargetBytes of at least minTargetBytes is + // necessary but might not be sufficient for the response to be non-empty. + minTargetBytes int64 } var _ sort.Interface = &singleRangeBatch{} @@ -723,6 +748,28 @@ func (r *singleRangeBatch) Less(i, j int) bool { return r.reqs[i].GetInner().Header().Key.Compare(r.reqs[j].GetInner().Header().Key) < 0 } +func reqsToString(reqs []singleRangeBatch) string { + result := "requests for positions " + for i, r := range reqs { + if i > 0 { + result += ", " + } + result += fmt.Sprintf("%v", r.positions) + } + return result +} + +func resultsToString(results []Result) string { + result := "results for positions " + for i, r := range results { + if i > 0 { + result += ", " + } + result += fmt.Sprintf("%d", r.position) + } + return result +} + type workerCoordinator struct { s *Streamer txn *kv.Txn @@ -758,19 +805,15 @@ func (w *workerCoordinator) mainLoop(ctx context.Context) { } } - // At the moment, we're using a simple average which is suboptimal - // because it is not reactive enough to larger responses coming back - // later. As a result, we can have a degenerate behavior where many - // requests end up coming back empty. Furthermore, at the moment we're - // not incorporating the response size information from those empty - // requests. - // - // In order to work around this we'll just use a larger estimate for - // each response for now. - // TODO(yuzefovich): improve this. - avgResponseSize = 2 * avgResponseSize - - shouldExit = w.waitUntilEnoughBudget(ctx, avgResponseSize) + // If we already have minTargetBytes set on the first request to be + // issued, then use that; otherwise, use the estimate. + atLeastBytes := requestsToServe[0].minTargetBytes + if atLeastBytes == 0 { + // TODO(yuzefovich): consider using a multiple of avgResponseSize + // here. + atLeastBytes = avgResponseSize + } + shouldExit = w.waitUntilEnoughBudget(ctx, atLeastBytes) if shouldExit { return } @@ -832,17 +875,16 @@ func (w *workerCoordinator) getRequestsLocked() ( return requestsToServe, avgResponseSize, shouldExit } -// waitUntilEnoughBudget waits until there is enough budget to at least receive -// one full response. +// waitUntilEnoughBudget waits until atLeastBytes bytes is available in the +// budget. // // A boolean that indicates whether the coordinator should exit is returned. func (w *workerCoordinator) waitUntilEnoughBudget( - ctx context.Context, avgResponseSize int64, + ctx context.Context, atLeastBytes int64, ) (shouldExit bool) { w.s.budget.mu.Lock() defer w.s.budget.mu.Unlock() - // TODO(yuzefovich): consider using a multiple of avgResponseSize here. - for w.s.budget.limitBytes-w.s.budget.mu.acc.Used() < avgResponseSize { + for w.s.budget.limitBytes-w.s.budget.mu.acc.Used() < atLeastBytes { // There isn't enough budget at the moment. Check whether there are any // requests in progress. if w.s.getNumRequestsInProgress() == 0 { @@ -850,6 +892,12 @@ func (w *workerCoordinator) waitUntilEnoughBudget( // the budget. return false } + if debug { + fmt.Printf( + "waiting for budget to free up: atLeastBytes %d, available %d\n", + atLeastBytes, w.s.budget.limitBytes-w.s.budget.mu.acc.Used(), + ) + } // We have to wait for some budget.release() calls. w.s.budget.mu.waitForBudget.Wait() // Check if the Streamer has been canceled or closed while we were @@ -887,11 +935,29 @@ func (w *workerCoordinator) issueRequestsForAsyncProcessing( w.s.budget.mu.Lock() defer w.s.budget.mu.Unlock() + if debug { + fmt.Printf( + "picked up %s to serve, available budget %d, "+ + "num requests in progress %d, average response size %d\n", + reqsToString(requestsToServe), w.s.budget.limitBytes-w.s.budget.mu.acc.Used(), + w.s.getNumRequestsInProgress(), avgResponseSize, + ) + } + headOfLine := w.s.getNumRequestsInProgress() == 0 var budgetIsExhausted bool for numRequestsIssued < len(requestsToServe) && !budgetIsExhausted { + singleRangeReqs := requestsToServe[numRequestsIssued] availableBudget := w.s.budget.limitBytes - w.s.budget.mu.acc.Used() - if availableBudget < avgResponseSize { + // minAcceptableBudget is the minimum TargetBytes limit with which it + // makes sense to issue this request (if we issue the request with + // smaller limit, then it's very likely to come back with an empty + // response). + minAcceptableBudget := singleRangeReqs.minTargetBytes + if minAcceptableBudget == 0 { + minAcceptableBudget = avgResponseSize + } + if availableBudget < minAcceptableBudget { if !headOfLine { // We don't have enough budget available to serve this request, // and there are other requests in progress, so we'll wait for @@ -908,7 +974,6 @@ func (w *workerCoordinator) issueRequestsForAsyncProcessing( availableBudget = 1 } } - singleRangeReqs := requestsToServe[numRequestsIssued] // Calculate what TargetBytes limit to use for the BatchRequest that // will be issued based on singleRangeReqs. We use the estimate to guess // how much memory the response will need, and we reserve this @@ -919,6 +984,12 @@ func (w *workerCoordinator) issueRequestsForAsyncProcessing( // get a very large single row in response which will exceed this // limit). targetBytes := int64(len(singleRangeReqs.reqs)) * avgResponseSize + // Make sure that targetBytes is sufficient to receive non-empty + // response. Our estimate might be an under-estimate when responses vary + // significantly in size. + if targetBytes < singleRangeReqs.minTargetBytes { + targetBytes = singleRangeReqs.minTargetBytes + } if targetBytes > availableBudget { // The estimate tells us that we don't have enough budget to receive // the full response; however, in order to utilize the available @@ -961,6 +1032,12 @@ func (w *workerCoordinator) issueRequestsForAsyncProcessing( // any more responses at the moment. return err } + if debug { + fmt.Printf( + "issuing an async request for positions %v, targetBytes=%d, headOfLine=%t\n", + singleRangeReqs.positions, targetBytes, headOfLine, + ) + } w.performRequestAsync(ctx, singleRangeReqs, targetBytes, headOfLine) numRequestsIssued++ headOfLine = false @@ -972,6 +1049,9 @@ func (w *workerCoordinator) issueRequestsForAsyncProcessing( func (w *workerCoordinator) addRequest(req singleRangeBatch) { w.s.mu.Lock() defer w.s.mu.Unlock() + if debug { + fmt.Printf("adding a request for positions %v to be served, minTargetBytes=%d\n", req.positions, req.minTargetBytes) + } w.s.mu.requestsToServe = append(w.s.mu.requestsToServe, req) w.s.mu.hasWork.Signal() } @@ -1010,6 +1090,10 @@ func (w *workerCoordinator) asyncRequestCleanup(budgetMuAlreadyLocked bool) { w.s.waitGroup.Done() } +// AsyncRequestOp is the operation name (in tracing) of all requests issued by +// the Streamer asynchronously. +const AsyncRequestOp = "streamer-lookup-async" + // performRequestAsync dispatches the given single-range batch for evaluation // asynchronously. If the batch cannot be evaluated fully (due to exhausting its // memory limitBytes), the "resume" single-range batch will be added into @@ -1037,7 +1121,7 @@ func (w *workerCoordinator) performRequestAsync( if err := w.s.stopper.RunAsyncTaskEx( ctx, stop.TaskOpts{ - TaskName: "streamer-lookup-async", + TaskName: AsyncRequestOp, SpanOpt: stop.ChildSpan, Sem: w.asyncSem, WaitForSem: true, @@ -1282,6 +1366,9 @@ func (w *workerCoordinator) processSingleRangeResults( newGet.union.Get = &newGet.req resumeReq.reqs[resumeReqIdx].Value = &newGet.union resumeReq.positions[resumeReqIdx] = req.positions[i] + if resumeReq.minTargetBytes == 0 { + resumeReq.minTargetBytes = get.ResumeNextBytes + } resumeReqIdx++ } else { // This Get was completed. @@ -1335,6 +1422,9 @@ func (w *workerCoordinator) processSingleRangeResults( newScan.union.Scan = &newScan.req resumeReq.reqs[resumeReqIdx].Value = &newScan.union resumeReq.positions[resumeReqIdx] = req.positions[i] + if resumeReq.minTargetBytes == 0 { + resumeReq.minTargetBytes = scan.ResumeNextBytes + } resumeReqIdx++ } } @@ -1353,6 +1443,26 @@ func (w *workerCoordinator) processSingleRangeResults( w.finalizeSingleRangeResults( results, memoryFootprintBytes, hasNonEmptyScanResponse, numCompleteGetResponses, ) + } else { + // We received an empty response. + if req.minTargetBytes != 0 { + // We previously have already received an empty response for this + // request, and minTargetBytes wasn't sufficient. Make sure that + // minTargetBytes on the resume request has increased. + if resumeReq.minTargetBytes <= req.minTargetBytes { + // Since ResumeNextBytes is populated on a best-effort basis, we + // cannot rely on it to make progress, so we make sure that if + // minTargetBytes hasn't increased for the resume request, we + // use the double of the original target. + resumeReq.minTargetBytes = 2 * req.minTargetBytes + } + } + if debug { + fmt.Printf( + "request for positions %v came back empty, original minTargetBytes=%d, "+ + "resumeReq.minTargetBytes=%d\n", req.positions, req.minTargetBytes, resumeReq.minTargetBytes, + ) + } } // If we have any incomplete requests, add them back into the work @@ -1420,6 +1530,9 @@ func (w *workerCoordinator) finalizeSingleRangeResults( w.s.mu.avgResponseEstimator.update(actualMemoryReservation, int64(len(results))) w.s.mu.numCompleteRequests += numCompleteResponses w.s.mu.numUnreleasedResults += len(results) + if debug { + fmt.Printf("created %s with total size %d\n", resultsToString(results), actualMemoryReservation) + } w.s.mu.results = append(w.s.mu.results, results...) w.s.mu.hasResults.Signal() } diff --git a/pkg/kv/kvclient/kvstreamer/streamer_test.go b/pkg/kv/kvclient/kvstreamer/streamer_test.go index 65ab114468f2..07ae67a17509 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer_test.go +++ b/pkg/kv/kvclient/kvstreamer/streamer_test.go @@ -23,12 +23,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" - "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/errors" "github.com/dustin/go-humanize" "github.com/stretchr/testify/require" @@ -118,77 +116,6 @@ func TestStreamerLimitations(t *testing.T) { }) } -// TestLargeKeys verifies that the Streamer successfully completes the queries -// when the keys to lookup are large (i.e. the enqueued requests themselves have -// large memory footprint). -func TestLargeKeys(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - rng, _ := randutil.NewTestRand() - s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) - ctx := context.Background() - defer s.Stopper().Stop(ctx) - - // Lower the distsql_workmem limit so that we can operate with smaller - // blobs. Note that the joinReader in the row-by-row engine will override - // the limit if it is lower than 100KiB, so we cannot go lower than that - // here. - _, err := db.Exec("SET distsql_workmem='100KiB'") - require.NoError(t, err) - // In both engines, the index joiner will buffer input rows up to a quarter - // of workmem limit, so we have a couple of interesting options for the blob - // size: - // - 20000 is interesting because it doesn't exceed the buffer size, yet two - // rows with such blobs do exceed it. The index joiners are expected to to - // process each row on its own. - // - 40000 is interesting because a single row already exceeds the buffer - // size. - for _, blobSize := range []int{20000, 40000} { - // onlyLarge determines whether only large blobs are inserted or a mix - // of large and small blobs. - for _, onlyLarge := range []bool{false, true} { - _, err = db.Exec("DROP TABLE IF EXISTS foo") - require.NoError(t, err) - // We set up such a table that contains two large columns, one of them - // being the primary key. The idea is that the query below will first - // read from the secondary index which would include only the PK blob, - // and that will be used to construct index join lookups (i.e. the PK - // blobs will be the enqueued requests for the Streamer) whereas the - // other blob will be part of the response. - _, err = db.Exec("CREATE TABLE foo (pk_blob STRING PRIMARY KEY, attribute INT, blob TEXT, INDEX(attribute))") - require.NoError(t, err) - - // Insert a handful of rows. - numRows := rng.Intn(3) + 3 - for i := 0; i < numRows; i++ { - letter := string(byte('a') + byte(i)) - valueSize := blobSize - if !onlyLarge && rng.Float64() < 0.5 { - // If we're using a mix of large and small values, with 50% - // use a small value now. - valueSize = rng.Intn(10) + 1 - } - _, err = db.Exec("INSERT INTO foo SELECT repeat($1, $2), 1, repeat($1, $2)", letter, valueSize) - require.NoError(t, err) - } - - // Perform an index join so that the Streamer API is used. - query := "SELECT * FROM foo@foo_attribute_idx WHERE attribute=1" - testutils.RunTrueAndFalse(t, "vectorize", func(t *testing.T, vectorize bool) { - vectorizeMode := "off" - if vectorize { - vectorizeMode = "on" - } - _, err = db.Exec("SET vectorize = " + vectorizeMode) - require.NoError(t, err) - _, err = db.Exec(query) - require.NoError(t, err) - }) - } - } -} - // TestStreamerBudgetErrorInEnqueue verifies the behavior of the Streamer in // Enqueue when its limit and/or root pool limit are exceeded. Additional tests // around the memory limit errors (when the responses exceed the limit) can be