diff --git a/pkg/kv/kvclient/kvstreamer/BUILD.bazel b/pkg/kv/kvclient/kvstreamer/BUILD.bazel index 7f73a4818703..4852f6938efc 100644 --- a/pkg/kv/kvclient/kvstreamer/BUILD.bazel +++ b/pkg/kv/kvclient/kvstreamer/BUILD.bazel @@ -31,6 +31,7 @@ go_test( name = "kvstreamer_test", srcs = [ "avg_response_estimator_test.go", + "large_keys_test.go", "main_test.go", "streamer_test.go", ], @@ -45,12 +46,13 @@ go_test( "//pkg/security/securitytest", "//pkg/server", "//pkg/settings/cluster", - "//pkg/testutils", + "//pkg/sql", "//pkg/testutils/serverutils", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/mon", "//pkg/util/randutil", + "//pkg/util/tracing", "@com_github_cockroachdb_errors//:errors", "@com_github_dustin_go_humanize//:go-humanize", "@com_github_stretchr_testify//require", diff --git a/pkg/kv/kvclient/kvstreamer/large_keys_test.go b/pkg/kv/kvclient/kvstreamer/large_keys_test.go new file mode 100644 index 000000000000..5ce034202de5 --- /dev/null +++ b/pkg/kv/kvclient/kvstreamer/large_keys_test.go @@ -0,0 +1,189 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvstreamer_test + +import ( + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvstreamer" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/dustin/go-humanize" + "github.com/stretchr/testify/require" +) + +// TestLargeKeys verifies that the Streamer successfully completes the queries +// when the keys to lookup (i.e. the enqueued requests themselves) as well as +// the looked up rows are large. It additionally ensures that the Streamer +// issues a reasonable number of the KV request. +func TestLargeKeys(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testCases := []struct { + name string + query string + }{ + { + name: "no ordering", + query: "SELECT * FROM foo@foo_attribute_idx WHERE attribute=1", + }, + } + + rng, _ := randutil.NewTestRand() + recCh := make(chan tracing.Recording, 1) + // We want to capture the trace of the query so that we can count how many + // KV requests the Streamer issued. + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SQLExecutor: &sql.ExecutorTestingKnobs{ + WithStatementTrace: func(trace tracing.Recording, stmt string) { + for _, tc := range testCases { + if tc.query == stmt { + recCh <- trace + } + } + }, + }, + }, + }) + ctx := context.Background() + defer s.Stopper().Stop(ctx) + + // Lower the distsql_workmem limit so that we can operate with smaller + // blobs. Note that the joinReader in the row-by-row engine will override + // the limit if it is lower than 100KiB, so we cannot go lower than that + // here. + _, err := db.Exec("SET distsql_workmem='100KiB'") + require.NoError(t, err) + // In both engines, the index joiner will buffer input rows up to a quarter + // of workmem limit, so we have several interesting options for the blob + // size: + // - around 5000 is interesting because multiple requests are enqueued at + // same time and there might be parallelism going on within the Streamer. + // - 20000 is interesting because it doesn't exceed the buffer size, yet two + // rows with such blobs do exceed it. The index joiners are expected to to + // process each row on its own. + // - 40000 is interesting because a single row already exceeds the buffer + // size. + for _, pkBlobSize := range []int{3000 + rng.Intn(4000), 20000, 40000} { + // useScans indicates whether we want Scan requests to be used by the + // Streamer (if we do, then we need to have multiple column families). + for _, useScans := range []bool{false, true} { + // onlyLarge determines whether only large blobs are inserted or a + // mix of large and small blobs. + for _, onlyLarge := range []bool{false, true} { + _, err = db.Exec("DROP TABLE IF EXISTS foo") + require.NoError(t, err) + // We set up such a table that contains two large columns, one + // of them being the primary key. The idea is that the test + // query will first read from the secondary index which would + // include only the PK blob, and that will be used to construct + // index join lookups (i.e. the PK blobs will be the enqueued + // requests for the Streamer) whereas the other blob will be + // part of the response. + var familiesSuffix string + // In order to use Scan requests we need to have multiple column + // families. + if useScans { + familiesSuffix = ", FAMILY (pk_blob, attribute, extra), FAMILY (blob)" + } + _, err = db.Exec(fmt.Sprintf( + `CREATE TABLE foo ( + pk_blob STRING PRIMARY KEY, attribute INT8, extra INT8, blob STRING, + INDEX (attribute)%s + );`, familiesSuffix)) + require.NoError(t, err) + + // Insert some number of rows. + numRows := rng.Intn(10) + 10 + for i := 0; i < numRows; i++ { + letter := string(byte('a') + byte(i)) + valueSize := pkBlobSize + if !onlyLarge && rng.Float64() < 0.5 { + // If we're using a mix of large and small values, with + // 50% use a small value now. + valueSize = rng.Intn(10) + 1 + } + // We randomize the value size for 'blob' column to improve + // the test coverage. + blobSize := int(float64(valueSize)*2.0*rng.Float64()) + 1 + _, err = db.Exec("INSERT INTO foo SELECT repeat($1, $2), 1, 1, repeat($1, $3);", letter, valueSize, blobSize) + require.NoError(t, err) + } + + // Try two scenarios: one with a single range (so no parallelism + // within the Streamer) and another with a random number of + // ranges (which might add parallelism within the Streamer). + // + // Note that a single range scenario needs to be exercised first + // in order to reuse the same table without dropping it (we + // don't want to deal with merging ranges). + for _, newRangeProbability := range []float64{0, rng.Float64()} { + for i := 1; i < numRows; i++ { + if rng.Float64() < newRangeProbability { + // Create a new range. + letter := string(byte('a') + byte(i)) + _, err = db.Exec("ALTER TABLE foo SPLIT AT VALUES ($1);", letter) + require.NoError(t, err) + } + } + // Populate the range cache. + _, err = db.Exec("SELECT count(*) FROM foo") + require.NoError(t, err) + + for _, vectorizeMode := range []string{"on", "off"} { + _, err = db.Exec("SET vectorize = " + vectorizeMode) + require.NoError(t, err) + for _, tc := range testCases { + t.Run(fmt.Sprintf( + "%s/size=%s/scans=%t/onlyLarge=%t/numRows=%d/newRangeProb=%.2f/vec=%s", + tc.name, humanize.Bytes(uint64(pkBlobSize)), useScans, + onlyLarge, numRows, newRangeProbability, vectorizeMode, + ), + func(t *testing.T) { + _, err = db.Exec(tc.query) + require.NoError(t, err) + // Now examine the trace and count the async + // requests issued by the Streamer. + tr := <-recCh + var numStreamerRequests int + for _, rs := range tr { + if rs.Operation == kvstreamer.AsyncRequestOp { + numStreamerRequests++ + } + } + // Assert that the number of requests is + // reasonable using the number of rows as + // the proxy for how many requests need to + // be issued. We expect some requests to + // come back empty because of a low initial + // TargetBytes limit, some requests might + // get an empty result multiple times while + // we're figuring out the correct limit, so + // we use a 4x multiple on the number of + // rows. + require.Greater(t, 4*numRows, numStreamerRequests) + }) + } + } + } + } + } + } +} diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index 5be0801cc5d5..8990f7d5aa82 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -12,6 +12,7 @@ package kvstreamer import ( "context" + "fmt" "runtime" "sort" "sync" @@ -33,6 +34,9 @@ import ( "github.com/cockroachdb/errors" ) +// TODO(yuzefovich): remove this once the Streamer is stabilized. +const debug = false + // OperationMode describes the mode of operation of the Streamer. type OperationMode int @@ -550,7 +554,16 @@ func (s *Streamer) Enqueue( } // Memory reservation was approved, so the requests are good to go. - s.mu.requestsToServe = requestsToServe + if debug { + fmt.Printf("enqueuing %s to serve\n", reqsToString(requestsToServe)) + } + // Note that we do not just overwrite s.mu.requestsToServe because it is + // possible that the worker coordinator has not sliced off the issued + // requests yet. In other words, it is possible for a request to be issued, + // responded to, returned to the client before the deferred function in + // issueRequestsForAsyncProcessing is executed. + // TODO(yuzefovich): clean this up. + s.mu.requestsToServe = append(s.mu.requestsToServe, requestsToServe...) s.mu.hasWork.Signal() return nil } @@ -590,8 +603,22 @@ func (s *Streamer) GetResults(ctx context.Context) ([]Result, error) { s.mu.results = nil allComplete := s.mu.numCompleteRequests == s.mu.numEnqueuedRequests if len(results) > 0 || allComplete || s.mu.err != nil { + if debug { + if len(results) > 0 { + fmt.Printf("returning %s to the client\n", resultsToString(results)) + } else { + suffix := "all requests have been responded to" + if !allComplete { + suffix = fmt.Sprintf("%v", s.mu.err) + } + fmt.Printf("returning no results to the client because %s\n", suffix) + } + } return results, s.mu.err } + if debug { + fmt.Println("client blocking to wait for results") + } s.mu.hasResults.Wait() // Check whether the Streamer has been canceled or closed while we were // waiting for the results. @@ -696,6 +723,11 @@ type singleRangeBatch struct { // reqsReservedBytes tracks the memory reservation against the budget for // the memory usage of reqs. reqsReservedBytes int64 + // minTargetBytes, if positive, indicates the minimum TargetBytes limit that + // this singleRangeBatch should be sent with in order for the response to + // not be empty. Note that TargetBytes of at least minTargetBytes is + // necessary but might not be sufficient for the response to be non-empty. + minTargetBytes int64 } var _ sort.Interface = &singleRangeBatch{} @@ -716,6 +748,28 @@ func (r *singleRangeBatch) Less(i, j int) bool { return r.reqs[i].GetInner().Header().Key.Compare(r.reqs[j].GetInner().Header().Key) < 0 } +func reqsToString(reqs []singleRangeBatch) string { + result := "requests for positions " + for i, r := range reqs { + if i > 0 { + result += ", " + } + result += fmt.Sprintf("%v", r.positions) + } + return result +} + +func resultsToString(results []Result) string { + result := "results for positions " + for i, r := range results { + if i > 0 { + result += ", " + } + result += fmt.Sprintf("%d", r.position) + } + return result +} + type workerCoordinator struct { s *Streamer txn *kv.Txn @@ -751,19 +805,15 @@ func (w *workerCoordinator) mainLoop(ctx context.Context) { } } - // At the moment, we're using a simple average which is suboptimal - // because it is not reactive enough to larger responses coming back - // later. As a result, we can have a degenerate behavior where many - // requests end up coming back empty. Furthermore, at the moment we're - // not incorporating the response size information from those empty - // requests. - // - // In order to work around this we'll just use a larger estimate for - // each response for now. - // TODO(yuzefovich): improve this. - avgResponseSize = 2 * avgResponseSize - - shouldExit = w.waitUntilEnoughBudget(ctx, avgResponseSize) + // If we already have minTargetBytes set on the first request to be + // issued, then use that; otherwise, use the estimate. + atLeastBytes := requestsToServe[0].minTargetBytes + if atLeastBytes == 0 { + // TODO(yuzefovich): consider using a multiple of avgResponseSize + // here. + atLeastBytes = avgResponseSize + } + shouldExit = w.waitUntilEnoughBudget(ctx, atLeastBytes) if shouldExit { return } @@ -825,17 +875,16 @@ func (w *workerCoordinator) getRequestsLocked() ( return requestsToServe, avgResponseSize, shouldExit } -// waitUntilEnoughBudget waits until there is enough budget to at least receive -// one full response. +// waitUntilEnoughBudget waits until atLeastBytes bytes is available in the +// budget. // // A boolean that indicates whether the coordinator should exit is returned. func (w *workerCoordinator) waitUntilEnoughBudget( - ctx context.Context, avgResponseSize int64, + ctx context.Context, atLeastBytes int64, ) (shouldExit bool) { w.s.budget.mu.Lock() defer w.s.budget.mu.Unlock() - // TODO(yuzefovich): consider using a multiple of avgResponseSize here. - for w.s.budget.limitBytes-w.s.budget.mu.acc.Used() < avgResponseSize { + for w.s.budget.limitBytes-w.s.budget.mu.acc.Used() < atLeastBytes { // There isn't enough budget at the moment. Check whether there are any // requests in progress. if w.s.getNumRequestsInProgress() == 0 { @@ -843,6 +892,12 @@ func (w *workerCoordinator) waitUntilEnoughBudget( // the budget. return false } + if debug { + fmt.Printf( + "waiting for budget to free up: atLeastBytes %d, available %d\n", + atLeastBytes, w.s.budget.limitBytes-w.s.budget.mu.acc.Used(), + ) + } // We have to wait for some budget.release() calls. w.s.budget.mu.waitForBudget.Wait() // Check if the Streamer has been canceled or closed while we were @@ -880,11 +935,29 @@ func (w *workerCoordinator) issueRequestsForAsyncProcessing( w.s.budget.mu.Lock() defer w.s.budget.mu.Unlock() + if debug { + fmt.Printf( + "picked up %s to serve, available budget %d, "+ + "num requests in progress %d, average response size %d\n", + reqsToString(requestsToServe), w.s.budget.limitBytes-w.s.budget.mu.acc.Used(), + w.s.getNumRequestsInProgress(), avgResponseSize, + ) + } + headOfLine := w.s.getNumRequestsInProgress() == 0 var budgetIsExhausted bool for numRequestsIssued < len(requestsToServe) && !budgetIsExhausted { + singleRangeReqs := requestsToServe[numRequestsIssued] availableBudget := w.s.budget.limitBytes - w.s.budget.mu.acc.Used() - if availableBudget < avgResponseSize { + // minAcceptableBudget is the minimum TargetBytes limit with which it + // makes sense to issue this request (if we issue the request with + // smaller limit, then it's very likely to come back with an empty + // response). + minAcceptableBudget := singleRangeReqs.minTargetBytes + if minAcceptableBudget == 0 { + minAcceptableBudget = avgResponseSize + } + if availableBudget < minAcceptableBudget { if !headOfLine { // We don't have enough budget available to serve this request, // and there are other requests in progress, so we'll wait for @@ -901,7 +974,6 @@ func (w *workerCoordinator) issueRequestsForAsyncProcessing( availableBudget = 1 } } - singleRangeReqs := requestsToServe[numRequestsIssued] // Calculate what TargetBytes limit to use for the BatchRequest that // will be issued based on singleRangeReqs. We use the estimate to guess // how much memory the response will need, and we reserve this @@ -912,6 +984,12 @@ func (w *workerCoordinator) issueRequestsForAsyncProcessing( // get a very large single row in response which will exceed this // limit). targetBytes := int64(len(singleRangeReqs.reqs)) * avgResponseSize + // Make sure that targetBytes is sufficient to receive non-empty + // response. Our estimate might be an under-estimate when responses vary + // significantly in size. + if targetBytes < singleRangeReqs.minTargetBytes { + targetBytes = singleRangeReqs.minTargetBytes + } if targetBytes > availableBudget { // The estimate tells us that we don't have enough budget to receive // the full response; however, in order to utilize the available @@ -954,6 +1032,12 @@ func (w *workerCoordinator) issueRequestsForAsyncProcessing( // any more responses at the moment. return err } + if debug { + fmt.Printf( + "issuing an async request for positions %v, targetBytes=%d, headOfLine=%t\n", + singleRangeReqs.positions, targetBytes, headOfLine, + ) + } w.performRequestAsync(ctx, singleRangeReqs, targetBytes, headOfLine) numRequestsIssued++ headOfLine = false @@ -965,15 +1049,51 @@ func (w *workerCoordinator) issueRequestsForAsyncProcessing( func (w *workerCoordinator) addRequest(req singleRangeBatch) { w.s.mu.Lock() defer w.s.mu.Unlock() + if debug { + fmt.Printf("adding a request for positions %v to be served, minTargetBytes=%d\n", req.positions, req.minTargetBytes) + } w.s.mu.requestsToServe = append(w.s.mu.requestsToServe, req) w.s.mu.hasWork.Signal() } -func (w *workerCoordinator) asyncRequestCleanup() { +// budgetMuAlreadyLocked must be true if the caller is currently holding the +// budget's mutex. +func (w *workerCoordinator) asyncRequestCleanup(budgetMuAlreadyLocked bool) { + if !budgetMuAlreadyLocked { + // Since we're decrementing the number of requests in flight, we want to + // make sure that the budget's mutex is locked, and it currently isn't. + // This is needed so that if we signal the budget in + // adjustNumRequestsInFlight, the worker coordinator doesn't miss the + // signal. + // + // If we don't do this, then it is possible for the worker coordinator + // to be blocked forever in waitUntilEnoughBudget. Namely, the following + // sequence of events is possible: + // 1. the worker coordinator checks that there are some requests in + // progress, then it goes to sleep before waiting on waitForBudget + // condition variable; + // 2. the last request in flight exits without creating any Results (so + // that no Release() calls will happen in the future), it decrements + // the number of requests in flight, signals waitForBudget condition + // variable, but nobody is waiting on that, so no goroutine is woken + // up; + // 3. the worker coordinator wakes up and starts waiting on the + // condition variable, forever. + // Acquiring the budget's mutex makes sure that such sequence doesn't + // occur. + w.s.budget.mu.Lock() + defer w.s.budget.mu.Unlock() + } else { + w.s.budget.mu.AssertHeld() + } w.s.adjustNumRequestsInFlight(-1 /* delta */) w.s.waitGroup.Done() } +// AsyncRequestOp is the operation name (in tracing) of all requests issued by +// the Streamer asynchronously. +const AsyncRequestOp = "streamer-lookup-async" + // performRequestAsync dispatches the given single-range batch for evaluation // asynchronously. If the batch cannot be evaluated fully (due to exhausting its // memory limitBytes), the "resume" single-range batch will be added into @@ -1001,13 +1121,13 @@ func (w *workerCoordinator) performRequestAsync( if err := w.s.stopper.RunAsyncTaskEx( ctx, stop.TaskOpts{ - TaskName: "streamer-lookup-async", + TaskName: AsyncRequestOp, SpanOpt: stop.ChildSpan, Sem: w.asyncSem, WaitForSem: true, }, func(ctx context.Context) { - defer w.asyncRequestCleanup() + defer w.asyncRequestCleanup(false /* budgetMuAlreadyLocked */) var ba roachpb.BatchRequest ba.Header.WaitPolicy = w.lockWaitPolicy ba.Header.TargetBytes = targetBytes @@ -1123,7 +1243,7 @@ func (w *workerCoordinator) performRequestAsync( }); err != nil { // The new goroutine for the request wasn't spun up, so we have to // perform the cleanup of this request ourselves. - w.asyncRequestCleanup() + w.asyncRequestCleanup(true /* budgetMuAlreadyLocked */) w.s.setError(err) } } @@ -1246,25 +1366,23 @@ func (w *workerCoordinator) processSingleRangeResults( newGet.union.Get = &newGet.req resumeReq.reqs[resumeReqIdx].Value = &newGet.union resumeReq.positions[resumeReqIdx] = req.positions[i] + if resumeReq.minTargetBytes == 0 { + resumeReq.minTargetBytes = get.ResumeNextBytes + } resumeReqIdx++ } else { // This Get was completed. - if get.Value != nil { - // Create a Result only for non-empty Get responses. - result := Result{ - GetResp: get, - // This currently only works because all requests - // are unique. - EnqueueKeysSatisfied: []int{enqueueKey}, - position: req.positions[i], - } - result.memoryTok.streamer = w.s - result.memoryTok.toRelease = getResponseSize(get) - memoryTokensBytes += result.memoryTok.toRelease - results = append(results, result) + result := Result{ + GetResp: get, + // This currently only works because all requests are + // unique. + EnqueueKeysSatisfied: []int{enqueueKey}, + position: req.positions[i], } - // Note that we count this Get response as complete regardless - // of the fact whether it is empty or not. + result.memoryTok.streamer = w.s + result.memoryTok.toRelease = getResponseSize(get) + memoryTokensBytes += result.memoryTok.toRelease + results = append(results, result) numCompleteGetResponses++ } @@ -1304,6 +1422,9 @@ func (w *workerCoordinator) processSingleRangeResults( newScan.union.Scan = &newScan.req resumeReq.reqs[resumeReqIdx].Value = &newScan.union resumeReq.positions[resumeReqIdx] = req.positions[i] + if resumeReq.minTargetBytes == 0 { + resumeReq.minTargetBytes = scan.ResumeNextBytes + } resumeReqIdx++ } } @@ -1318,9 +1439,31 @@ func (w *workerCoordinator) processSingleRangeResults( } } - w.finalizeSingleRangeResults( - results, memoryFootprintBytes, hasNonEmptyScanResponse, numCompleteGetResponses, - ) + if len(results) > 0 { + w.finalizeSingleRangeResults( + results, memoryFootprintBytes, hasNonEmptyScanResponse, numCompleteGetResponses, + ) + } else { + // We received an empty response. + if req.minTargetBytes != 0 { + // We previously have already received an empty response for this + // request, and minTargetBytes wasn't sufficient. Make sure that + // minTargetBytes on the resume request has increased. + if resumeReq.minTargetBytes <= req.minTargetBytes { + // Since ResumeNextBytes is populated on a best-effort basis, we + // cannot rely on it to make progress, so we make sure that if + // minTargetBytes hasn't increased for the resume request, we + // use the double of the original target. + resumeReq.minTargetBytes = 2 * req.minTargetBytes + } + } + if debug { + fmt.Printf( + "request for positions %v came back empty, original minTargetBytes=%d, "+ + "resumeReq.minTargetBytes=%d\n", req.positions, req.minTargetBytes, resumeReq.minTargetBytes, + ) + } + } // If we have any incomplete requests, add them back into the work // pool. @@ -1333,12 +1476,19 @@ func (w *workerCoordinator) processSingleRangeResults( // singleRangeBatch. By "finalization" we mean setting Complete field of // ScanResp to correct value for all scan responses, updating the estimate of an // average response size, and telling the Streamer about these results. +// +// This method assumes that results has length greater than zero. func (w *workerCoordinator) finalizeSingleRangeResults( results []Result, actualMemoryReservation int64, hasNonEmptyScanResponse bool, numCompleteGetResponses int, ) { + if buildutil.CrdbTestBuild { + if len(results) == 0 { + panic(errors.AssertionFailedf("finalizeSingleRangeResults is called with no results")) + } + } w.s.mu.Lock() defer w.s.mu.Unlock() @@ -1380,15 +1530,11 @@ func (w *workerCoordinator) finalizeSingleRangeResults( w.s.mu.avgResponseEstimator.update(actualMemoryReservation, int64(len(results))) w.s.mu.numCompleteRequests += numCompleteResponses w.s.mu.numUnreleasedResults += len(results) - w.s.mu.results = append(w.s.mu.results, results...) - if len(results) > 0 || numCompleteResponses > 0 { - // We want to signal the condition variable when either we have some - // results to return to the client or we received some empty responses. - // The latter is needed so that the client doesn't block forever - // thinking there are more requests in flight when, in fact, all - // responses have already come back empty. - w.s.mu.hasResults.Signal() + if debug { + fmt.Printf("created %s with total size %d\n", resultsToString(results), actualMemoryReservation) } + w.s.mu.results = append(w.s.mu.results, results...) + w.s.mu.hasResults.Signal() } var zeroIntSlice []int diff --git a/pkg/kv/kvclient/kvstreamer/streamer_test.go b/pkg/kv/kvclient/kvstreamer/streamer_test.go index 3005984aaebc..07ae67a17509 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer_test.go +++ b/pkg/kv/kvclient/kvstreamer/streamer_test.go @@ -23,12 +23,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" - "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/errors" "github.com/dustin/go-humanize" "github.com/stretchr/testify/require" @@ -91,12 +89,10 @@ func TestStreamerLimitations(t *testing.T) { streamer := getStreamer() defer streamer.Close() streamer.Init(OutOfOrder, Hints{UniqueRequests: true}, 1 /* maxKeysPerRow */) - // Use a Scan request for this test case because Gets of non-existent - // keys aren't added to the results. - scan := roachpb.NewScan(roachpb.Key("key"), roachpb.Key("key1"), false /* forUpdate */) + get := roachpb.NewGet(roachpb.Key("key"), false /* forUpdate */) reqs := []roachpb.RequestUnion{{ - Value: &roachpb.RequestUnion_Scan{ - Scan: scan.(*roachpb.ScanRequest), + Value: &roachpb.RequestUnion_Get{ + Get: get.(*roachpb.GetRequest), }, }} require.NoError(t, streamer.Enqueue(ctx, reqs, nil /* enqueueKeys */)) @@ -120,77 +116,6 @@ func TestStreamerLimitations(t *testing.T) { }) } -// TestLargeKeys verifies that the Streamer successfully completes the queries -// when the keys to lookup are large (i.e. the enqueued requests themselves have -// large memory footprint). -func TestLargeKeys(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - rng, _ := randutil.NewTestRand() - s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) - ctx := context.Background() - defer s.Stopper().Stop(ctx) - - // Lower the distsql_workmem limit so that we can operate with smaller - // blobs. Note that the joinReader in the row-by-row engine will override - // the limit if it is lower than 100KiB, so we cannot go lower than that - // here. - _, err := db.Exec("SET distsql_workmem='100KiB'") - require.NoError(t, err) - // In both engines, the index joiner will buffer input rows up to a quarter - // of workmem limit, so we have a couple of interesting options for the blob - // size: - // - 20000 is interesting because it doesn't exceed the buffer size, yet two - // rows with such blobs do exceed it. The index joiners are expected to to - // process each row on its own. - // - 40000 is interesting because a single row already exceeds the buffer - // size. - for _, blobSize := range []int{20000, 40000} { - // onlyLarge determines whether only large blobs are inserted or a mix - // of large and small blobs. - for _, onlyLarge := range []bool{false, true} { - _, err = db.Exec("DROP TABLE IF EXISTS foo") - require.NoError(t, err) - // We set up such a table that contains two large columns, one of them - // being the primary key. The idea is that the query below will first - // read from the secondary index which would include only the PK blob, - // and that will be used to construct index join lookups (i.e. the PK - // blobs will be the enqueued requests for the Streamer) whereas the - // other blob will be part of the response. - _, err = db.Exec("CREATE TABLE foo (pk_blob STRING PRIMARY KEY, attribute INT, blob TEXT, INDEX(attribute))") - require.NoError(t, err) - - // Insert a handful of rows. - numRows := rng.Intn(3) + 3 - for i := 0; i < numRows; i++ { - letter := string(byte('a') + byte(i)) - valueSize := blobSize - if !onlyLarge && rng.Float64() < 0.5 { - // If we're using a mix of large and small values, with 50% - // use a small value now. - valueSize = rng.Intn(10) + 1 - } - _, err = db.Exec("INSERT INTO foo SELECT repeat($1, $2), 1, repeat($1, $2)", letter, valueSize) - require.NoError(t, err) - } - - // Perform an index join so that the Streamer API is used. - query := "SELECT * FROM foo@foo_attribute_idx WHERE attribute=1" - testutils.RunTrueAndFalse(t, "vectorize", func(t *testing.T, vectorize bool) { - vectorizeMode := "off" - if vectorize { - vectorizeMode = "on" - } - _, err = db.Exec("SET vectorize = " + vectorizeMode) - require.NoError(t, err) - _, err = db.Exec(query) - require.NoError(t, err) - }) - } - } -} - // TestStreamerBudgetErrorInEnqueue verifies the behavior of the Streamer in // Enqueue when its limit and/or root pool limit are exceeded. Additional tests // around the memory limit errors (when the responses exceed the limit) can be @@ -215,7 +140,7 @@ func TestStreamerBudgetErrorInEnqueue(t *testing.T) { var get roachpb.GetRequest var union roachpb.RequestUnion_Get key := make([]byte, keySize+6) - key[0] = 190 + key[0] = 240 key[1] = 137 key[2] = 18 for i := 0; i < keySize; i++ { diff --git a/pkg/sql/colfetcher/index_join.go b/pkg/sql/colfetcher/index_join.go index 397eca79f0b4..bb0cba71cc61 100644 --- a/pkg/sql/colfetcher/index_join.go +++ b/pkg/sql/colfetcher/index_join.go @@ -177,7 +177,7 @@ func (s *ColIndexJoin) Next() coldata.Batch { // Because index joins discard input rows, we do not have to maintain a // reference to input tuples after span generation. So, we can discard // the input batch reference on each iteration. - endIdx := s.findEndIndex(len(spans) > 0) + endIdx := s.findEndIndex(rowCount > 0) rowCount += endIdx - s.startIdx s.spanAssembler.ConsumeBatch(s.batch, s.startIdx, endIdx) s.startIdx = endIdx diff --git a/pkg/sql/row/kv_batch_streamer.go b/pkg/sql/row/kv_batch_streamer.go index 0aeb1cf5fe32..a1b48946a9d7 100644 --- a/pkg/sql/row/kv_batch_streamer.go +++ b/pkg/sql/row/kv_batch_streamer.go @@ -88,19 +88,24 @@ func NewTxnKVStreamer( // GetResponses). func (f *TxnKVStreamer) proceedWithLastResult( ctx context.Context, -) (kvs []roachpb.KeyValue, batchResp []byte, err error) { +) (skip bool, kvs []roachpb.KeyValue, batchResp []byte, err error) { result := f.lastResultState.Result if get := result.GetResp; get != nil { if get.IntentValue != nil { - return nil, nil, errors.AssertionFailedf( + return false, nil, nil, errors.AssertionFailedf( "unexpectedly got an IntentValue back from a SQL GetRequest %v", *get.IntentValue, ) } + if get.Value == nil { + // Nothing found in this particular response, so we skip it. + f.releaseLastResult(ctx) + return true, nil, nil, nil + } pos := result.EnqueueKeysSatisfied[f.lastResultState.numEmitted] origSpan := f.spans[pos] f.lastResultState.numEmitted++ f.getResponseScratch[0] = roachpb.KeyValue{Key: origSpan.Key, Value: *get.Value} - return f.getResponseScratch[:], nil, nil + return false, f.getResponseScratch[:], nil, nil } scan := result.ScanResp if len(scan.BatchResponses) > 0 { @@ -111,7 +116,7 @@ func (f *TxnKVStreamer) proceedWithLastResult( } // Note that scan.Rows and batchResp might be nil when the ScanResponse is // empty, and the caller (the KVFetcher) will skip over it. - return scan.Rows, batchResp, nil + return false, scan.Rows, batchResp, nil } func (f *TxnKVStreamer) releaseLastResult(ctx context.Context) { @@ -137,7 +142,7 @@ func (f *TxnKVStreamer) nextBatch( if f.lastResultState.numEmitted < len(f.lastResultState.EnqueueKeysSatisfied) { // Note that we should never get an error here since we're processing // the same result again. - kvs, batchResp, err = f.proceedWithLastResult(ctx) + _, kvs, batchResp, err = f.proceedWithLastResult(ctx) return true, kvs, batchResp, err } @@ -147,7 +152,7 @@ func (f *TxnKVStreamer) nextBatch( } // Process the next result we have already received from the streamer. - if len(f.results) > 0 { + for len(f.results) > 0 { // Peel off the next result and set it into lastResultState. f.lastResultState.Result = f.results[0] f.lastResultState.numEmitted = 0 @@ -156,7 +161,14 @@ func (f *TxnKVStreamer) nextBatch( // the next iteration. f.results[0] = kvstreamer.Result{} f.results = f.results[1:] - kvs, batchResp, err = f.proceedWithLastResult(ctx) + var skip bool + skip, kvs, batchResp, err = f.proceedWithLastResult(ctx) + if err != nil { + return false, nil, nil, err + } + if skip { + continue + } return true, kvs, batchResp, err }