diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index abaf65c9dcd5..439789a6155a 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -399,8 +399,9 @@ func runBackupProcessor( // We set the DistSender response target bytes field to a sentinel // value. The sentinel value of 1 forces the ExportRequest to paginate // after creating a single SST. - TargetBytes: 1, - Timestamp: span.end, + TargetBytes: 1, + Timestamp: span.end, + ReturnElasticCPUResumeSpans: true, } if priority { // This re-attempt is reading far enough in the past that we just want diff --git a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go index 27602393a50d..a999b74e731b 100644 --- a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go +++ b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go @@ -579,7 +579,10 @@ func sendExportRequestWithPriorityOverride( span roachpb.Span, startTS, endTS hlc.Timestamp, ) (kvpb.Response, error) { - header := kvpb.Header{Timestamp: endTS} + header := kvpb.Header{ + Timestamp: endTS, + ReturnElasticCPUResumeSpans: true, + } req := &kvpb.ExportRequest{ RequestHeader: kvpb.RequestHeaderFromSpan(span), StartTime: startTS, diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index a0300f31ac5a..262bcca1382c 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -1357,7 +1357,8 @@ func (ds *DistSender) divideAndSendBatchToRanges( }() canParallelize := ba.Header.MaxSpanRequestKeys == 0 && ba.Header.TargetBytes == 0 && - !ba.Header.ReturnOnRangeBoundary + !ba.Header.ReturnOnRangeBoundary && + !ba.Header.ReturnElasticCPUResumeSpans if ba.IsSingleCheckConsistencyRequest() { // Don't parallelize full checksum requests as they have to touch the // entirety of each replica of each range they touch. @@ -1447,7 +1448,7 @@ func (ds *DistSender) divideAndSendBatchToRanges( ba.UpdateTxn(resp.reply.Txn) } - mightStopEarly := ba.MaxSpanRequestKeys > 0 || ba.TargetBytes > 0 || ba.ReturnOnRangeBoundary + mightStopEarly := ba.MaxSpanRequestKeys > 0 || ba.TargetBytes > 0 || ba.ReturnOnRangeBoundary || ba.ReturnElasticCPUResumeSpans // Check whether we've received enough responses to exit query loop. if mightStopEarly { var replyKeys int64 diff --git a/pkg/kv/kvclient/revision_reader.go b/pkg/kv/kvclient/revision_reader.go index 8724b4ebac89..3b6e8e84ad11 100644 --- a/pkg/kv/kvclient/revision_reader.go +++ b/pkg/kv/kvclient/revision_reader.go @@ -38,7 +38,10 @@ func GetAllRevisions( allRevs chan []VersionedValues, ) error { for { - header := kvpb.Header{Timestamp: endTime} + header := kvpb.Header{ + Timestamp: endTime, + ReturnElasticCPUResumeSpans: true, + } req := &kvpb.ExportRequest{ RequestHeader: kvpb.RequestHeader{Key: startKey, EndKey: endKey}, StartTime: startTime, diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index d7b6ee6e7e78..c4d7436ad0e8 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -2635,6 +2635,15 @@ message Header { // each Scan and ReverseScan. sql.sqlbase.IndexFetchSpec index_fetch_spec = 29; + // ReturnElasticCPUResumeSpans, if set, indicates that the caller + // expects early-termination of requests based on the Elastic CPU + // limiter. + // + // Resume spans returned because of the underlying request being + // rate-limited by the ElasticCPU limiter will have a reason of + // RESUME_ELASTIC_CPU_LIMIT. + bool return_elastic_cpu_resume_spans = 30 [(gogoproto.customname) = "ReturnElasticCPUResumeSpans"]; + reserved 7, 10, 12, 14, 20; } diff --git a/pkg/kv/kvserver/batcheval/BUILD.bazel b/pkg/kv/kvserver/batcheval/BUILD.bazel index a66b5e087097..7d0d1155d4b6 100644 --- a/pkg/kv/kvserver/batcheval/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/BUILD.bazel @@ -150,9 +150,11 @@ go_test( "//pkg/security/securitytest", "//pkg/server", "//pkg/settings/cluster", + "//pkg/sql", "//pkg/sql/catalog", "//pkg/sql/catalog/bootstrap", "//pkg/sql/catalog/descpb", + "//pkg/sql/catalog/desctestutils", "//pkg/sql/rowenc", "//pkg/sql/sem/tree", "//pkg/storage", @@ -165,6 +167,7 @@ go_test( "//pkg/testutils/storageutils", "//pkg/testutils/testcluster", "//pkg/util", + "//pkg/util/admission", "//pkg/util/encoding", "//pkg/util/hlc", "//pkg/util/leaktest", diff --git a/pkg/kv/kvserver/batcheval/cmd_export.go b/pkg/kv/kvserver/batcheval/cmd_export.go index ef9bcce22193..cf6cbfbb985a 100644 --- a/pkg/kv/kvserver/batcheval/cmd_export.go +++ b/pkg/kv/kvserver/batcheval/cmd_export.go @@ -238,7 +238,7 @@ func evalExport( // chance to move the goroutine off CPU allowing other processes to make // progress. The client is responsible for handling pagination of // ExportRequests. - if resumeInfo.CPUOverlimit { + if resumeInfo.CPUOverlimit && h.ReturnElasticCPUResumeSpans { // Note, since we have not exported any data we do not populate the // `Files` field of the ExportResponse. reply.ResumeSpan = &roachpb.Span{ @@ -248,14 +248,18 @@ func evalExport( reply.ResumeReason = kvpb.RESUME_ELASTIC_CPU_LIMIT break } else { - // We should never come here. There should be no condition aside from - // resource constraints that results in an early exit without - // exporting any data. Regardless, if we have a resumeKey we - // immediately retry the ExportRequest from that key and timestamp - // onwards. - if !build.IsRelease() { - return result.Result{}, errors.AssertionFailedf("ExportRequest exited without " + - "exporting any data for an unknown reason; programming error") + if !resumeInfo.CPUOverlimit { + // We should never come here. There should be no condition aside from + // resource constraints that results in an early exit without + // exporting any data. Regardless, if we have a resumeKey we + // immediately retry the ExportRequest from that key and timestamp + // onwards. + if !build.IsRelease() { + return result.Result{}, errors.AssertionFailedf("ExportRequest exited without " + + "exporting any data for an unknown reason; programming error") + } else { + log.Warningf(ctx, "unexpected resume span from ExportRequest without exporting any data for an unknown reason: %v", resumeInfo) + } } start = resumeInfo.ResumeKey.Key resumeKeyTS = resumeInfo.ResumeKey.Timestamp @@ -303,14 +307,12 @@ func evalExport( // resuming our export from the resume key. This gives the scheduler a // chance to take the current goroutine off CPU and allow other processes to // progress. - if resumeInfo.CPUOverlimit { + if resumeInfo.CPUOverlimit && h.ReturnElasticCPUResumeSpans { if resumeInfo.ResumeKey.Key != nil { reply.ResumeSpan = &roachpb.Span{ Key: resumeInfo.ResumeKey.Key, EndKey: args.EndKey, } - // TODO(during review): Do we want to add another resume reason - // specifically for CPU preemption. reply.ResumeReason = kvpb.RESUME_ELASTIC_CPU_LIMIT } break diff --git a/pkg/kv/kvserver/batcheval/cmd_export_test.go b/pkg/kv/kvserver/batcheval/cmd_export_test.go index c4440fa57408..87754c46a133 100644 --- a/pkg/kv/kvserver/batcheval/cmd_export_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_export_test.go @@ -19,22 +19,28 @@ import ( "sort" "strconv" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" @@ -415,6 +421,69 @@ INTO }) } +func TestExportRequestWithCPULimitResumeSpans(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + rng, _ := randutil.NewTestRand() + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + UseDatabase: "test", + Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{ + TestingRequestFilter: func(ctx context.Context, request *kvpb.BatchRequest) *kvpb.Error { + for _, ru := range request.Requests { + if _, ok := ru.GetInner().(*kvpb.ExportRequest); ok { + h := admission.ElasticCPUWorkHandleFromContext(ctx) + if h == nil { + t.Fatalf("expected context to have CPU work handle") + } + h.TestingOverrideOverLimit(func() (bool, time.Duration) { + if rng.Float32() > 0.5 { + return true, 0 + } + return false, 0 + }) + } + } + return nil + }, + }}, + }, + }) + + defer tc.Stopper().Stop(context.Background()) + + s := tc.TenantOrServer(0) + sqlDB := tc.Conns[0] + db := sqlutils.MakeSQLRunner(sqlDB) + execCfg := s.ExecutorConfig().(sql.ExecutorConfig) + kvDB := s.DB() + + const ( + initRows = 1000 + splits = 100 + ) + db.Exec(t, "CREATE DATABASE IF NOT EXISTS test") + db.Exec(t, "CREATE TABLE test (k PRIMARY KEY) AS SELECT generate_series(1, $1)", initRows) + db.Exec(t, "ALTER TABLE test SPLIT AT (select i*10 from generate_series(1, $1) as i)", initRows/splits) + db.Exec(t, "ALTER TABLE test SCATTER") + + desc := desctestutils.TestingGetPublicTableDescriptor(kvDB, execCfg.Codec, "test", "test") + span := desc.TableSpan(execCfg.Codec) + + req := &kvpb.ExportRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: span.Key, + EndKey: span.EndKey}, + } + header := kvpb.Header{ + ReturnElasticCPUResumeSpans: true, + } + _, err := kv.SendWrappedWith(ctx, kvDB.NonTransactionalSender(), header, req) + require.NoError(t, err.GoError()) +} + func TestExportGCThreshold(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 9780d502231c..a99d68f904fa 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -252,7 +252,8 @@ func getDescriptorsFromStoreForInterval( // Create an export request (1 kv call) for all descriptors for given // descriptor ID written during the interval [timestamp, endTimestamp). batchRequestHeader := kvpb.Header{ - Timestamp: upperBound.Prev(), + Timestamp: upperBound.Prev(), + ReturnElasticCPUResumeSpans: true, } descriptorKey := catalogkeys.MakeDescMetadataKey(codec, id) // Unmarshal key span retrieved from export request to construct historical descs. diff --git a/pkg/sql/sem/builtins/builtins.go b/pkg/sql/sem/builtins/builtins.go index cb2840303a5b..aafb12fd7cd6 100644 --- a/pkg/sql/sem/builtins/builtins.go +++ b/pkg/sql/sem/builtins/builtins.go @@ -7563,6 +7563,10 @@ expires until the statement bundle is collected`, // specially in the future so as to allow the fingerprint to complete // in the face of intents. WaitPolicy: lock.WaitPolicy_Error, + // TODO(ssd): Setting this disables async sending in + // DistSender so it likely substantially impacts + // performance. + ReturnElasticCPUResumeSpans: true, } admissionHeader := kvpb.AdmissionHeader{ Priority: int32(admissionpb.BulkNormalPri),