diff --git a/pkg/kv/db.go b/pkg/kv/db.go index 9de451e26cbe..ff8f6fc76cfc 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/admission" @@ -263,8 +264,12 @@ type DB struct { // SQLKVResponseAdmissionQ is for use by SQL clients of the DB, and is // placed here simply for plumbing convenience, as there is a diversity of // SQL code that all uses kv.DB. - // TODO(sumeer): find a home for this in the SQL layer. + // + // TODO(sumeer,irfansharif): Find a home for these in the SQL layer. + // Especially SettingsValue. SQLKVResponseAdmissionQ *admission.WorkQueue + AdmissionPacerFactory admission.PacerFactory + SettingsValues *settings.Values } // NonTransactionalSender returns a Sender that can be used for sending diff --git a/pkg/kv/kvserver/kvadmission/kvadmission.go b/pkg/kv/kvserver/kvadmission/kvadmission.go index def2d71c9507..65ec8e5a6cab 100644 --- a/pkg/kv/kvserver/kvadmission/kvadmission.go +++ b/pkg/kv/kvserver/kvadmission/kvadmission.go @@ -54,6 +54,44 @@ var elasticCPUDurationPerExportRequest = settings.RegisterDurationSetting( }, ) +// elasticCPUDurationPerInternalLowPriRead controls how many CPU tokens are +// allotted for each internally submitted low priority read request. +var elasticCPUDurationPerInternalLowPriRead = settings.RegisterDurationSetting( + settings.SystemOnly, + "kvadmission.elastic_cpu.duration_per_low_pri_read", + "controls how many CPU tokens are allotted for each internally submitted low priority read request", + 10*time.Millisecond, + func(duration time.Duration) error { + if duration < admission.MinElasticCPUDuration { + return fmt.Errorf("minimum CPU duration allowed is %s, got %s", + admission.MinElasticCPUDuration, duration) + } + if duration > admission.MaxElasticCPUDuration { + return fmt.Errorf("maximum CPU duration allowed is %s, got %s", + admission.MaxElasticCPUDuration, duration) + } + return nil + }, +) + +// internalLowPriReadElasticControlEnabled determines whether internally +// submitted low pri reads integrate with elastic CPU control. +var internalLowPriReadElasticControlEnabled = settings.RegisterBoolSetting( + settings.SystemOnly, + "kvadmission.low_pri_read_elastic_control.enabled", + "determines whether the internally submitted low priority reads reads integrate with elastic CPU control", + true, +) + +// exportRequestElasticControlEnabled determines whether export requests +// integrate with elastic CPU control. +var exportRequestElasticControlEnabled = settings.RegisterBoolSetting( + settings.SystemOnly, + "kvadmission.export_request_elastic_control.enabled", + "determines whether the export requests integrate with elastic CPU control", + true, +) + // elasticCPUDurationPerRangefeedScanUnit controls how many CPU tokens are // allotted for each unit of work during rangefeed catchup scans. Only takes // effect if kvadmission.rangefeed_catchup_scan_elastic_control.enabled is set. @@ -258,21 +296,42 @@ func (n *controllerImpl) AdmitKVWork( } } if admissionEnabled { - if ba.IsSingleExportRequest() { - // Backups generate batches with single export requests, which we - // admit through the elastic CPU work queue. We grant this - // CPU-intensive work a set amount of CPU time and expect it to - // terminate (cooperatively) once it exceeds its grant. The amount - // disbursed is 100ms, which we've experimentally found to be long - // enough to do enough useful work per-request while not causing too - // much in the way of scheduling delays on individual cores. Within - // admission control we have machinery that observes scheduling - // latencies periodically and reduces the total amount of CPU time - // handed out through this mechanism, as a way to provide latency - // isolation to non-elastic ("latency sensitive") work running on - // the same machine. + // - Backups generate batches with single export requests, which we + // admit through the elastic CPU work queue. We grant this + // CPU-intensive work a set amount of CPU time and expect it to + // terminate (cooperatively) once it exceeds its grant. The amount + // disbursed is 100ms, which we've experimentally found to be long + // enough to do enough useful work per-request while not causing too + // much in the way of scheduling delays on individual cores. Within + // admission control we have machinery that observes scheduling + // latencies periodically and reduces the total amount of CPU time + // handed out through this mechanism, as a way to provide latency + // isolation to non-elastic ("latency sensitive") work running on the + // same machine. + // - We do the same for internally submitted low priority reads in + // general (notably, for KV work done on the behalf of row-level TTL + // reads). Everything admissionpb.UserLowPri and above uses the slots + // mechanism. + isInternalLowPriRead := ba.IsReadOnly() && admissionInfo.Priority < admissionpb.UserLowPri + shouldUseElasticCPU := + (exportRequestElasticControlEnabled.Get(&n.settings.SV) && ba.IsSingleExportRequest()) || + (internalLowPriReadElasticControlEnabled.Get(&n.settings.SV) && isInternalLowPriRead) + + if shouldUseElasticCPU { + var admitDuration time.Duration + if ba.IsSingleExportRequest() { + admitDuration = elasticCPUDurationPerExportRequest.Get(&n.settings.SV) + } else if isInternalLowPriRead { + admitDuration = elasticCPUDurationPerInternalLowPriRead.Get(&n.settings.SV) + } + + // TODO(irfansharif): For export requests it's possible to preempt, + // i.e. once the CPU slice is used up we terminate the work. We + // don't do this for the general case of low priority internal + // reads, so in some sense, the integration is incomplete. This is + // probably harmless. elasticWorkHandle, err := n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.Admit( - ctx, elasticCPUDurationPerExportRequest.Get(&n.settings.SV), admissionInfo, + ctx, admitDuration, admissionInfo, ) if err != nil { return Handle{}, err @@ -285,6 +344,7 @@ func (n *controllerImpl) AdmitKVWork( } }() } else { + // Use the slots-based mechanism for everything else. callAdmittedWorkDoneOnKVAdmissionQ, err := n.kvAdmissionQ.Admit(ctx, admissionInfo) if err != nil { return Handle{}, err diff --git a/pkg/server/server.go b/pkg/server/server.go index ea886556ea86..551fc7efb373 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -457,6 +457,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { dbCtx.Stopper = stopper db := kv.NewDBWithContext(cfg.AmbientCtx, tcsFactory, clock, dbCtx) db.SQLKVResponseAdmissionQ = gcoords.Regular.GetWorkQueue(admission.SQLKVResponseWork) + db.AdmissionPacerFactory = gcoords.Elastic + db.SettingsValues = &cfg.Settings.SV nlActive, nlRenewal := cfg.NodeLivenessDurations() if knobs := cfg.TestingKnobs.NodeLiveness; knobs != nil { diff --git a/pkg/sql/row/fetcher.go b/pkg/sql/row/fetcher.go index 68b05491c357..73aed0dd7f29 100644 --- a/pkg/sql/row/fetcher.go +++ b/pkg/sql/row/fetcher.go @@ -436,8 +436,10 @@ func (rf *Fetcher) Init(ctx context.Context, args FetcherInitArgs) error { } if args.Txn != nil { fetcherArgs.sendFn = makeTxnKVFetcherDefaultSendFunc(args.Txn, &batchRequestsIssued) - fetcherArgs.requestAdmissionHeader = args.Txn.AdmissionHeader() - fetcherArgs.responseAdmissionQ = args.Txn.DB().SQLKVResponseAdmissionQ + fetcherArgs.admission.requestHeader = args.Txn.AdmissionHeader() + fetcherArgs.admission.responseQ = args.Txn.DB().SQLKVResponseAdmissionQ + fetcherArgs.admission.pacerFactory = args.Txn.DB().AdmissionPacerFactory + fetcherArgs.admission.settingsValues = args.Txn.DB().SettingsValues } rf.kvFetcher = newKVFetcher(newTxnKVFetcherInternal(fetcherArgs)) } diff --git a/pkg/sql/row/kv_batch_fetcher.go b/pkg/sql/row/kv_batch_fetcher.go index 09a6a6197d8a..3079caf99c10 100644 --- a/pkg/sql/row/kv_batch_fetcher.go +++ b/pkg/sql/row/kv_batch_fetcher.go @@ -12,6 +12,7 @@ package row import ( "context" + "fmt" "sync/atomic" "time" "unsafe" @@ -21,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/fetchpb" @@ -57,6 +59,42 @@ var defaultKVBatchSize = rowinfra.KeyLimit(util.ConstantWithMetamorphicTestValue 1, /* metamorphicValue */ )) +var logAdmissionPacerErr = log.Every(100 * time.Millisecond) + +// elasticCPUDurationPerLowPriReadResponse controls how many CPU tokens are allotted +// each time we seek admission for response handling during internally submitted +// low priority reads (like row-level TTL selects). +var elasticCPUDurationPerLowPriReadResponse = settings.RegisterDurationSetting( + settings.SystemOnly, + "sqladmission.elastic_cpu.duration_per_low_pri_read_response", + "controls how many CPU tokens are allotted for handling responses for internally submitted low priority reads", + // NB: Experimentally, during TTL reads, we observed cumulative on-CPU time + // by SQL processors >> 100ms, over the course of a single select fetching + // many rows. So we pick a relative high duration here. + 100*time.Millisecond, + func(duration time.Duration) error { + if duration < admission.MinElasticCPUDuration { + return fmt.Errorf("minimum CPU duration allowed is %s, got %s", + admission.MinElasticCPUDuration, duration) + } + if duration > admission.MaxElasticCPUDuration { + return fmt.Errorf("maximum CPU duration allowed is %s, got %s", + admission.MaxElasticCPUDuration, duration) + } + return nil + }, +) + +// internalLowPriReadElasticControlEnabled determines whether the sql portion of +// internally submitted low-priority reads (like row-level TTL selects) +// integrate with elastic CPU control. +var internalLowPriReadElasticControlEnabled = settings.RegisterBoolSetting( + settings.SystemOnly, + "sqladmission.low_pri_read_response_elastic_control.enabled", + "determines whether the sql portion of internally submitted reads integrate with elastic CPU controller", + true, +) + // sendFunc is the function used to execute a KV batch; normally // wraps (*client.Txn).Send. type sendFunc func( @@ -185,6 +223,7 @@ type txnKVFetcher struct { // For request and response admission control. requestAdmissionHeader kvpb.AdmissionHeader responseAdmissionQ *admission.WorkQueue + admissionPacer *admission.Pacer } var _ KVBatchFetcher = &txnKVFetcher{} @@ -262,8 +301,13 @@ type newTxnKVFetcherArgs struct { acc *mon.BoundAccount forceProductionKVBatchSize bool batchRequestsIssued *int64 - requestAdmissionHeader kvpb.AdmissionHeader - responseAdmissionQ *admission.WorkQueue + + admission struct { // groups AC-related fields + requestHeader kvpb.AdmissionHeader + responseQ *admission.WorkQueue + pacerFactory admission.PacerFactory + settingsValues *settings.Values + } } // newTxnKVFetcherInternal initializes a txnKVFetcher. @@ -282,9 +326,14 @@ func newTxnKVFetcherInternal(args newTxnKVFetcherArgs) *txnKVFetcher { lockTimeout: args.lockTimeout, acc: args.acc, forceProductionKVBatchSize: args.forceProductionKVBatchSize, - requestAdmissionHeader: args.requestAdmissionHeader, - responseAdmissionQ: args.responseAdmissionQ, - } + requestAdmissionHeader: args.admission.requestHeader, + responseAdmissionQ: args.admission.responseQ, + } + f.maybeInitAdmissionPacer( + args.admission.requestHeader, + args.admission.pacerFactory, + args.admission.settingsValues, + ) f.kvBatchFetcherHelper.init(f.nextBatch, args.batchRequestsIssued) return f } @@ -295,6 +344,42 @@ func (f *txnKVFetcher) setTxnAndSendFn(txn *kv.Txn, sendFn sendFunc) { f.sendFn = sendFn f.requestAdmissionHeader = txn.AdmissionHeader() f.responseAdmissionQ = txn.DB().SQLKVResponseAdmissionQ + + if f.admissionPacer != nil { + f.admissionPacer.Close() + } + f.maybeInitAdmissionPacer(txn.AdmissionHeader(), txn.DB().AdmissionPacerFactory, txn.DB().SettingsValues) +} + +// maybeInitAdmissionPacer selectively initializes an admission.Pacer for work +// done as part of internally submitted low-priority reads (like row-level TTL +// selects). +func (f *txnKVFetcher) maybeInitAdmissionPacer( + admissionHeader kvpb.AdmissionHeader, pacerFactory admission.PacerFactory, sv *settings.Values, +) { + if sv == nil { + // Only nil in tests and in SQL pods (we don't have admission pacing in + // the latter anyway). + return + } + admissionPri := admissionpb.WorkPriority(admissionHeader.Priority) + if internalLowPriReadElasticControlEnabled.Get(sv) && + admissionPri < admissionpb.UserLowPri && + pacerFactory != nil { + + f.admissionPacer = pacerFactory.NewPacer( + elasticCPUDurationPerLowPriReadResponse.Get(sv), + admission.WorkInfo{ + // NB: This is either code that runs in physically isolated SQL + // pods for secondary tenants, or for the system tenant, in + // nodes running colocated SQL+KV code where all SQL code is run + // on behalf of the one tenant. So from an AC perspective, the + // tenant ID we pass through here is irrelevant. + TenantID: roachpb.SystemTenantID, + Priority: admissionPri, + CreateTime: admissionHeader.CreateTime, + }) + } } // SetupNextFetch sets up the Fetcher for the next set of spans. @@ -533,16 +618,10 @@ func (f *txnKVFetcher) fetch(ctx context.Context) error { } f.batchResponseAccountedFor = returnedBytes } + // Do admission control after we've accounted for the response bytes. - if br != nil && f.responseAdmissionQ != nil { - responseAdmission := admission.WorkInfo{ - TenantID: roachpb.SystemTenantID, - Priority: admissionpb.WorkPriority(f.requestAdmissionHeader.Priority), - CreateTime: f.requestAdmissionHeader.CreateTime, - } - if _, err := f.responseAdmissionQ.Admit(ctx, responseAdmission); err != nil { - return err - } + if err := f.maybeAdmitBatchResponse(ctx, br); err != nil { + return err } f.batchIdx++ @@ -570,6 +649,58 @@ func (f *txnKVFetcher) fetch(ctx context.Context) error { return nil } +func (f *txnKVFetcher) maybeAdmitBatchResponse(ctx context.Context, br *kvpb.BatchResponse) error { + if br == nil { + return nil // nothing to do + } + + if f.admissionPacer != nil { + // If admissionPacer is initialized, we're using the elastic CPU control + // mechanism (the work is elastic in nature and using the slots based + // mechanism would permit high scheduling latencies). We want to limit + // the CPU% used by SQL during internally submitted reads, like + // row-level TTL selects. All that work happens on the same goroutine + // doing this fetch, so is accounted for when invoking .Pace() as we + // fetch KVs as part of our volcano operator iteration. See CPU profiles + // posted on #98722. + // + // TODO(irfansharif): At the time of writing, SELECTs done by the TTL + // job are not distributed at SQL level (since our DistSQL physical + // planning heuristics deems it not worthy of distribution), and with + // the local plan we only have a single goroutine (unless + // maybeParallelizeLocalScans splits up the single scan into multiple + // TableReader processors). This may change as part of + // https://github.com/cockroachdb/cockroach/issues/82164 where CPU + // intensive SQL work will happen on a different goroutine from the ones + // that evaluate the BatchRequests, so the integration is tricker there. + // If we're unable to integrate it well, we could disable usage of the + // streamer to preserve this current form of pacing. + // + // TODO(irfansharif): Add tests for the SELECT queries issued by the TTL + // to ensure that they have local plans with a single TableReader + // processor in multi-node clusters. + if err := f.admissionPacer.Pace(ctx); err != nil { + // We're unable to pace things automatically -- shout loudly + // semi-infrequently but don't fail the kv fetcher itself. At + // worst we'd be over-admitting. + if logAdmissionPacerErr.ShouldLog() { + log.Errorf(ctx, "automatic pacing: %v", err) + } + } + } else if f.responseAdmissionQ != nil { + responseAdmission := admission.WorkInfo{ + TenantID: roachpb.SystemTenantID, + Priority: admissionpb.WorkPriority(f.requestAdmissionHeader.Priority), + CreateTime: f.requestAdmissionHeader.CreateTime, + } + if _, err := f.responseAdmissionQ.Admit(ctx, responseAdmission); err != nil { + return err + } + } + + return nil +} + // popBatch returns the 0th "batch" in a slice of "batches", as well as the rest // of the slice of the "batches". It nils the pointer to the 0th element before // reslicing the outer slice. @@ -746,6 +877,7 @@ func (f *txnKVFetcher) reset(ctx context.Context) { // Close releases the resources of this txnKVFetcher. func (f *txnKVFetcher) Close(ctx context.Context) { f.reset(ctx) + f.admissionPacer.Close() } const requestUnionOverhead = int64(unsafe.Sizeof(kvpb.RequestUnion{})) diff --git a/pkg/sql/row/kv_fetcher.go b/pkg/sql/row/kv_fetcher.go index 0f498c4a3c25..1575d1459ce9 100644 --- a/pkg/sql/row/kv_fetcher.go +++ b/pkg/sql/row/kv_fetcher.go @@ -102,8 +102,10 @@ func newTxnKVFetcher( // In most cases, the txn is non-nil; however, in some code paths (e.g. // when executing EXPLAIN (VEC)) it might be nil, so we need to have // this check. - fetcherArgs.requestAdmissionHeader = txn.AdmissionHeader() - fetcherArgs.responseAdmissionQ = txn.DB().SQLKVResponseAdmissionQ + fetcherArgs.admission.requestHeader = txn.AdmissionHeader() + fetcherArgs.admission.responseQ = txn.DB().SQLKVResponseAdmissionQ + fetcherArgs.admission.pacerFactory = txn.DB().AdmissionPacerFactory + fetcherArgs.admission.settingsValues = txn.DB().SettingsValues } return newTxnKVFetcherInternal(fetcherArgs) }