From 4451735852d8bc99a9108d92e5247c0d8b896342 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Tue, 22 Aug 2023 10:29:35 -0400 Subject: [PATCH] kv,sql: integrate row-level TTL reads with CPU limiter Part of #98722. We do it at two levels, each appearing prominently in CPU profiles: - Down in KV, where we're handling batch requests issued as part of row-level TTL selects. This is gated by kvadmission.low_pri_read_elastic_control.enabled. - Up in SQL, when handling KV responses to said batch requests. This is gated by sqladmission.low_pri_read_response_elastic_control.enabled. Similar to backups, rangefeed initial scans, and changefeed event processing, we've observed latency impact during CPU-intensive scans issued as part of row-level TTL jobs. We know from before that the existing slots based mechanism for CPU work can result in excessive scheduling latency for high-pri work in the presence of lower-pri work, affecting end-user latencies. This is because the slots mechanisms aims for full utilization of the underlying resource, which is incompatible with low scheduling latencies. This commit then tries to control the total CPU% used by row-level TTL selects through the elastic CPU limiter. For the KV work, this was trivial -- we already have integrations at the batch request level and now we pick out requests with priorities less than admissionpb.UserLowPri, which includes admissionpb.TTLLowPri. For the SQL portion of the work we introduce some minimal plumbing. Where previously we sought admission in the SQL-KV response queues after fetching each batch of KVs from KV as part of our volcano operator iteration, we now incrementally acquire CPU nanos. We do this specifically for row-level TTL work. Experimentally the CPU nanos we acquire here map roughly to the CPU utilization due to SQL work for row-level TTL selects. (Note that we apply the elastic CPU limiter for all reads with priorities less than admissionpb.UserPriLow. This is typically internally submitted reads, and includes row-level TTL selects.) Release note: None --- pkg/kv/db.go | 7 +- pkg/kv/kvserver/kvadmission/kvadmission.go | 78 +++++++++-- pkg/server/server.go | 2 + pkg/sql/row/fetcher.go | 6 +- pkg/sql/row/kv_batch_fetcher.go | 148 +++++++++++++++++++-- pkg/sql/row/kv_fetcher.go | 13 +- 6 files changed, 220 insertions(+), 34 deletions(-) diff --git a/pkg/kv/db.go b/pkg/kv/db.go index e1ba42ad2164..e31bc1754fdf 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/admission" @@ -263,8 +264,12 @@ type DB struct { // SQLKVResponseAdmissionQ is for use by SQL clients of the DB, and is // placed here simply for plumbing convenience, as there is a diversity of // SQL code that all uses kv.DB. - // TODO(sumeer): find a home for this in the SQL layer. + // + // TODO(sumeer,irfansharif): Find a home for these in the SQL layer. + // Especially SettingsValue. SQLKVResponseAdmissionQ *admission.WorkQueue + AdmissionPacerFactory admission.PacerFactory + SettingsValues *settings.Values } // NonTransactionalSender returns a Sender that can be used for sending diff --git a/pkg/kv/kvserver/kvadmission/kvadmission.go b/pkg/kv/kvserver/kvadmission/kvadmission.go index 84ec04388f25..d1e2f4de8615 100644 --- a/pkg/kv/kvserver/kvadmission/kvadmission.go +++ b/pkg/kv/kvserver/kvadmission/kvadmission.go @@ -47,6 +47,34 @@ var elasticCPUDurationPerExportRequest = settings.RegisterDurationSetting( settings.DurationInRange(admission.MinElasticCPUDuration, admission.MaxElasticCPUDuration), ) +// elasticCPUDurationPerInternalLowPriRead controls how many CPU tokens are +// allotted for each internally submitted low priority read request. +var elasticCPUDurationPerInternalLowPriRead = settings.RegisterDurationSetting( + settings.SystemOnly, + "kvadmission.elastic_cpu.duration_per_low_pri_read", + "controls how many CPU tokens are allotted for each internally submitted low priority read request", + 10*time.Millisecond, + settings.DurationInRange(admission.MinElasticCPUDuration, admission.MaxElasticCPUDuration), +) + +// internalLowPriReadElasticControlEnabled determines whether internally +// submitted low pri reads integrate with elastic CPU control. +var internalLowPriReadElasticControlEnabled = settings.RegisterBoolSetting( + settings.SystemOnly, + "kvadmission.low_pri_read_elastic_control.enabled", + "determines whether the internally submitted low priority reads reads integrate with elastic CPU control", + true, +) + +// exportRequestElasticControlEnabled determines whether export requests +// integrate with elastic CPU control. +var exportRequestElasticControlEnabled = settings.RegisterBoolSetting( + settings.SystemOnly, + "kvadmission.export_request_elastic_control.enabled", + "determines whether the export requests integrate with elastic CPU control", + true, +) + // elasticCPUDurationPerRangefeedScanUnit controls how many CPU tokens are // allotted for each unit of work during rangefeed catchup scans. Only takes // effect if kvadmission.rangefeed_catchup_scan_elastic_control.enabled is set. @@ -322,21 +350,42 @@ func (n *controllerImpl) AdmitKVWork( } } if admissionEnabled { - if ba.IsSingleExportRequest() { - // Backups generate batches with single export requests, which we - // admit through the elastic CPU work queue. We grant this - // CPU-intensive work a set amount of CPU time and expect it to - // terminate (cooperatively) once it exceeds its grant. The amount - // disbursed is 100ms, which we've experimentally found to be long - // enough to do enough useful work per-request while not causing too - // much in the way of scheduling delays on individual cores. Within - // admission control we have machinery that observes scheduling - // latencies periodically and reduces the total amount of CPU time - // handed out through this mechanism, as a way to provide latency - // isolation to non-elastic ("latency sensitive") work running on - // the same machine. + // - Backups generate batches with single export requests, which we + // admit through the elastic CPU work queue. We grant this + // CPU-intensive work a set amount of CPU time and expect it to + // terminate (cooperatively) once it exceeds its grant. The amount + // disbursed is 100ms, which we've experimentally found to be long + // enough to do enough useful work per-request while not causing too + // much in the way of scheduling delays on individual cores. Within + // admission control we have machinery that observes scheduling + // latencies periodically and reduces the total amount of CPU time + // handed out through this mechanism, as a way to provide latency + // isolation to non-elastic ("latency sensitive") work running on the + // same machine. + // - We do the same for internally submitted low priority reads in + // general (notably, for KV work done on the behalf of row-level TTL + // reads). Everything admissionpb.UserLowPri and above uses the slots + // mechanism. + isInternalLowPriRead := ba.IsReadOnly() && admissionInfo.Priority < admissionpb.UserLowPri + shouldUseElasticCPU := + (exportRequestElasticControlEnabled.Get(&n.settings.SV) && ba.IsSingleExportRequest()) || + (internalLowPriReadElasticControlEnabled.Get(&n.settings.SV) && isInternalLowPriRead) + + if shouldUseElasticCPU { + var admitDuration time.Duration + if ba.IsSingleExportRequest() { + admitDuration = elasticCPUDurationPerExportRequest.Get(&n.settings.SV) + } else if isInternalLowPriRead { + admitDuration = elasticCPUDurationPerInternalLowPriRead.Get(&n.settings.SV) + } + + // TODO(irfansharif): For export requests it's possible to preempt, + // i.e. once the CPU slice is used up we terminate the work. We + // don't do this for the general case of low priority internal + // reads, so in some sense, the integration is incomplete. This is + // probably harmless. elasticWorkHandle, err := n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.Admit( - ctx, elasticCPUDurationPerExportRequest.Get(&n.settings.SV), admissionInfo, + ctx, admitDuration, admissionInfo, ) if err != nil { return Handle{}, err @@ -349,6 +398,7 @@ func (n *controllerImpl) AdmitKVWork( } }() } else { + // Use the slots-based mechanism for everything else. callAdmittedWorkDoneOnKVAdmissionQ, err := n.kvAdmissionQ.Admit(ctx, admissionInfo) if err != nil { return Handle{}, err diff --git a/pkg/server/server.go b/pkg/server/server.go index 3e0722eaa9ec..743e9174bf81 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -563,6 +563,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf admissionKnobs, ) db.SQLKVResponseAdmissionQ = gcoords.Regular.GetWorkQueue(admission.SQLKVResponseWork) + db.AdmissionPacerFactory = gcoords.Elastic + db.SettingsValues = &cfg.Settings.SV cbID := goschedstats.RegisterRunnableCountCallback(gcoords.Regular.CPULoad) stopper.AddCloser(stop.CloserFn(func() { goschedstats.UnregisterRunnableCountCallback(cbID) diff --git a/pkg/sql/row/fetcher.go b/pkg/sql/row/fetcher.go index 97d8d44618a7..b2ed310d3ee9 100644 --- a/pkg/sql/row/fetcher.go +++ b/pkg/sql/row/fetcher.go @@ -448,8 +448,10 @@ func (rf *Fetcher) Init(ctx context.Context, args FetcherInitArgs) error { } if args.Txn != nil { fetcherArgs.sendFn = makeTxnKVFetcherDefaultSendFunc(args.Txn, &batchRequestsIssued) - fetcherArgs.requestAdmissionHeader = args.Txn.AdmissionHeader() - fetcherArgs.responseAdmissionQ = args.Txn.DB().SQLKVResponseAdmissionQ + fetcherArgs.admission.requestHeader = args.Txn.AdmissionHeader() + fetcherArgs.admission.responseQ = args.Txn.DB().SQLKVResponseAdmissionQ + fetcherArgs.admission.pacerFactory = args.Txn.DB().AdmissionPacerFactory + fetcherArgs.admission.settingsValues = args.Txn.DB().SettingsValues } rf.kvFetcher = newKVFetcher(newTxnKVFetcherInternal(fetcherArgs)) } diff --git a/pkg/sql/row/kv_batch_fetcher.go b/pkg/sql/row/kv_batch_fetcher.go index 832915b804f0..f47c1d822627 100644 --- a/pkg/sql/row/kv_batch_fetcher.go +++ b/pkg/sql/row/kv_batch_fetcher.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/fetchpb" @@ -57,6 +58,32 @@ var defaultKVBatchSize = rowinfra.KeyLimit(util.ConstantWithMetamorphicTestValue 1, /* metamorphicValue */ )) +var logAdmissionPacerErr = log.Every(100 * time.Millisecond) + +// elasticCPUDurationPerLowPriReadResponse controls how many CPU tokens are allotted +// each time we seek admission for response handling during internally submitted +// low priority reads (like row-level TTL selects). +var elasticCPUDurationPerLowPriReadResponse = settings.RegisterDurationSetting( + settings.SystemOnly, + "sqladmission.elastic_cpu.duration_per_low_pri_read_response", + "controls how many CPU tokens are allotted for handling responses for internally submitted low priority reads", + // NB: Experimentally, during TTL reads, we observed cumulative on-CPU time + // by SQL processors >> 100ms, over the course of a single select fetching + // many rows. So we pick a relative high duration here. + 100*time.Millisecond, + settings.DurationInRange(admission.MinElasticCPUDuration, admission.MaxElasticCPUDuration), +) + +// internalLowPriReadElasticControlEnabled determines whether the sql portion of +// internally submitted low-priority reads (like row-level TTL selects) +// integrate with elastic CPU control. +var internalLowPriReadElasticControlEnabled = settings.RegisterBoolSetting( + settings.SystemOnly, + "sqladmission.low_pri_read_response_elastic_control.enabled", + "determines whether the sql portion of internally submitted reads integrate with elastic CPU controller", + true, +) + // sendFunc is the function used to execute a KV batch; normally // wraps (*kv.Txn).Send. type sendFunc func( @@ -186,6 +213,7 @@ type txnKVFetcher struct { // For request and response admission control. requestAdmissionHeader kvpb.AdmissionHeader responseAdmissionQ *admission.WorkQueue + admissionPacer *admission.Pacer } var _ KVBatchFetcher = &txnKVFetcher{} @@ -264,8 +292,13 @@ type newTxnKVFetcherArgs struct { forceProductionKVBatchSize bool kvPairsRead *int64 batchRequestsIssued *int64 - requestAdmissionHeader kvpb.AdmissionHeader - responseAdmissionQ *admission.WorkQueue + + admission struct { // groups AC-related fields + requestHeader kvpb.AdmissionHeader + responseQ *admission.WorkQueue + pacerFactory admission.PacerFactory + settingsValues *settings.Values + } } // newTxnKVFetcherInternal initializes a txnKVFetcher. @@ -284,9 +317,15 @@ func newTxnKVFetcherInternal(args newTxnKVFetcherArgs) *txnKVFetcher { lockTimeout: args.lockTimeout, acc: args.acc, forceProductionKVBatchSize: args.forceProductionKVBatchSize, - requestAdmissionHeader: args.requestAdmissionHeader, - responseAdmissionQ: args.responseAdmissionQ, + requestAdmissionHeader: args.admission.requestHeader, + responseAdmissionQ: args.admission.responseQ, } + + f.maybeInitAdmissionPacer( + args.admission.requestHeader, + args.admission.pacerFactory, + args.admission.settingsValues, + ) f.kvBatchFetcherHelper.init(f.nextBatch, args.kvPairsRead, args.batchRequestsIssued) return f } @@ -297,6 +336,42 @@ func (f *txnKVFetcher) setTxnAndSendFn(txn *kv.Txn, sendFn sendFunc) { f.sendFn = sendFn f.requestAdmissionHeader = txn.AdmissionHeader() f.responseAdmissionQ = txn.DB().SQLKVResponseAdmissionQ + + if f.admissionPacer != nil { + f.admissionPacer.Close() + } + f.maybeInitAdmissionPacer(txn.AdmissionHeader(), txn.DB().AdmissionPacerFactory, txn.DB().SettingsValues) +} + +// maybeInitAdmissionPacer selectively initializes an admission.Pacer for work +// done as part of internally submitted low-priority reads (like row-level TTL +// selects). +func (f *txnKVFetcher) maybeInitAdmissionPacer( + admissionHeader kvpb.AdmissionHeader, pacerFactory admission.PacerFactory, sv *settings.Values, +) { + if sv == nil { + // Only nil in tests and in SQL pods (we don't have admission pacing in + // the latter anyway). + return + } + admissionPri := admissionpb.WorkPriority(admissionHeader.Priority) + if internalLowPriReadElasticControlEnabled.Get(sv) && + admissionPri < admissionpb.UserLowPri && + pacerFactory != nil { + + f.admissionPacer = pacerFactory.NewPacer( + elasticCPUDurationPerLowPriReadResponse.Get(sv), + admission.WorkInfo{ + // NB: This is either code that runs in physically isolated SQL + // pods for secondary tenants, or for the system tenant, in + // nodes running colocated SQL+KV code where all SQL code is run + // on behalf of the one tenant. So from an AC perspective, the + // tenant ID we pass through here is irrelevant. + TenantID: roachpb.SystemTenantID, + Priority: admissionPri, + CreateTime: admissionHeader.CreateTime, + }) + } } // SetupNextFetch sets up the Fetcher for the next set of spans. @@ -539,16 +614,10 @@ func (f *txnKVFetcher) fetch(ctx context.Context) error { } f.batchResponseAccountedFor = returnedBytes } + // Do admission control after we've accounted for the response bytes. - if br != nil && f.responseAdmissionQ != nil { - responseAdmission := admission.WorkInfo{ - TenantID: roachpb.SystemTenantID, - Priority: admissionpb.WorkPriority(f.requestAdmissionHeader.Priority), - CreateTime: f.requestAdmissionHeader.CreateTime, - } - if _, err := f.responseAdmissionQ.Admit(ctx, responseAdmission); err != nil { - return err - } + if err := f.maybeAdmitBatchResponse(ctx, br); err != nil { + return err } f.batchIdx++ @@ -576,6 +645,58 @@ func (f *txnKVFetcher) fetch(ctx context.Context) error { return nil } +func (f *txnKVFetcher) maybeAdmitBatchResponse(ctx context.Context, br *kvpb.BatchResponse) error { + if br == nil { + return nil // nothing to do + } + + if f.admissionPacer != nil { + // If admissionPacer is initialized, we're using the elastic CPU control + // mechanism (the work is elastic in nature and using the slots based + // mechanism would permit high scheduling latencies). We want to limit + // the CPU% used by SQL during internally submitted reads, like + // row-level TTL selects. All that work happens on the same goroutine + // doing this fetch, so is accounted for when invoking .Pace() as we + // fetch KVs as part of our volcano operator iteration. See CPU profiles + // posted on #98722. + // + // TODO(irfansharif): At the time of writing, SELECTs done by the TTL + // job are not distributed at SQL level (since our DistSQL physical + // planning heuristics deems it not worthy of distribution), and with + // the local plan we only have a single goroutine (unless + // maybeParallelizeLocalScans splits up the single scan into multiple + // TableReader processors). This may change as part of + // https://github.com/cockroachdb/cockroach/issues/82164 where CPU + // intensive SQL work will happen on a different goroutine from the ones + // that evaluate the BatchRequests, so the integration is tricker there. + // If we're unable to integrate it well, we could disable usage of the + // streamer to preserve this current form of pacing. + // + // TODO(irfansharif): Add tests for the SELECT queries issued by the TTL + // to ensure that they have local plans with a single TableReader + // processor in multi-node clusters. + if err := f.admissionPacer.Pace(ctx); err != nil { + // We're unable to pace things automatically -- shout loudly + // semi-infrequently but don't fail the kv fetcher itself. At + // worst we'd be over-admitting. + if logAdmissionPacerErr.ShouldLog() { + log.Errorf(ctx, "automatic pacing: %v", err) + } + } + } else if f.responseAdmissionQ != nil { + responseAdmission := admission.WorkInfo{ + TenantID: roachpb.SystemTenantID, + Priority: admissionpb.WorkPriority(f.requestAdmissionHeader.Priority), + CreateTime: f.requestAdmissionHeader.CreateTime, + } + if _, err := f.responseAdmissionQ.Admit(ctx, responseAdmission); err != nil { + return err + } + } + + return nil +} + // popBatch returns the 0th "batch" in a slice of "batches", as well as the rest // of the slice of the "batches". It nils the pointer to the 0th element before // reslicing the outer slice. @@ -754,6 +875,7 @@ func (f *txnKVFetcher) reset(ctx context.Context) { // Close releases the resources of this txnKVFetcher. func (f *txnKVFetcher) Close(ctx context.Context) { f.reset(ctx) + f.admissionPacer.Close() } const requestUnionOverhead = int64(unsafe.Sizeof(kvpb.RequestUnion{})) diff --git a/pkg/sql/row/kv_fetcher.go b/pkg/sql/row/kv_fetcher.go index 678cff19b49c..dc762e2c7828 100644 --- a/pkg/sql/row/kv_fetcher.go +++ b/pkg/sql/row/kv_fetcher.go @@ -87,7 +87,8 @@ func newTxnKVFetcher( return br, nil } } - return newTxnKVFetcherInternal(newTxnKVFetcherArgs{ + + fetcherArgs := newTxnKVFetcherArgs{ sendFn: sendFn, reverse: reverse, lockStrength: lockStrength, @@ -97,9 +98,13 @@ func newTxnKVFetcher( forceProductionKVBatchSize: forceProductionKVBatchSize, kvPairsRead: new(int64), batchRequestsIssued: &batchRequestsIssued, - requestAdmissionHeader: txn.AdmissionHeader(), - responseAdmissionQ: txn.DB().SQLKVResponseAdmissionQ, - }) + } + fetcherArgs.admission.requestHeader = txn.AdmissionHeader() + fetcherArgs.admission.responseQ = txn.DB().SQLKVResponseAdmissionQ + fetcherArgs.admission.pacerFactory = txn.DB().AdmissionPacerFactory + fetcherArgs.admission.settingsValues = txn.DB().SettingsValues + + return newTxnKVFetcherInternal(fetcherArgs) } // NewDirectKVBatchFetcher creates a new KVBatchFetcher that uses the