Skip to content

Commit

Permalink
kv,sql: integrate row-level TTL reads with CPU limiter
Browse files Browse the repository at this point in the history
Part of cockroachdb#98722.

We do it at two levels, each appearing prominently in CPU profiles:

- Down in KV, where we're handling batch requests issued as part of
  row-level TTL selects. This is gated by
  kvadmission.low_pri_read_elastic_control.enabled.
- Up in SQL, when handling KV responses to said batch requests. This is
  gated by sqladmission.low_pri_read_response_elastic_control.enabled.

Similar to backups, rangefeed initial scans, and changefeed event
processing, we've observed latency impact during CPU-intensive scans
issued as part of row-level TTL jobs. We know from before that the
existing slots based mechanism for CPU work can result in excessive
scheduling latency for high-pri work in the presence of lower-pri
work, affecting end-user latencies. This is because the slots mechanisms
aims for full utilization of the underlying resource, which is
incompatible with low scheduling latencies. This commit then tries to
control the total CPU% used by row-level TTL selects through the elastic
CPU limiter. For the KV work, this was trivial -- we already have
integrations at the batch request level and now we pick out requests
with priorities less than admissionpb.UserLowPri, which includes
admissionpb.TTLLowPri.

For the SQL portion of the work we introduce some minimal plumbing.
Where previously we sought admission in the SQL-KV response queues after
fetching each batch of KVs from KV as part of our volcano operator
iteration, we now incrementally acquire CPU nanos. We do this
specifically for row-level TTL work. Experimentally the CPU nanos we
acquire here map roughly to the CPU utilization due to SQL work for
row-level TTL selects.

(Note that we apply the elastic CPU limiter for all reads with priorities
less than admissionpb.UserPriLow. This is typically internally submitted
reads, and includes row-level TTL selects.)

Release note: None
  • Loading branch information
irfansharif committed Aug 22, 2023
1 parent a16ab19 commit e91550f
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 33 deletions.
7 changes: 6 additions & 1 deletion pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/admission"
Expand Down Expand Up @@ -263,8 +264,12 @@ type DB struct {
// SQLKVResponseAdmissionQ is for use by SQL clients of the DB, and is
// placed here simply for plumbing convenience, as there is a diversity of
// SQL code that all uses kv.DB.
// TODO(sumeer): find a home for this in the SQL layer.
//
// TODO(sumeer,irfansharif): Find a home for these in the SQL layer.
// Especially SettingsValue.
SQLKVResponseAdmissionQ *admission.WorkQueue
AdmissionPacerFactory admission.PacerFactory
SettingsValues *settings.Values
}

// NonTransactionalSender returns a Sender that can be used for sending
Expand Down
88 changes: 74 additions & 14 deletions pkg/kv/kvserver/kvadmission/kvadmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,44 @@ var elasticCPUDurationPerExportRequest = settings.RegisterDurationSetting(
},
)

// elasticCPUDurationPerInternalLowPriRead controls how many CPU tokens are
// allotted for each internally submitted low priority read request.
var elasticCPUDurationPerInternalLowPriRead = settings.RegisterDurationSetting(
settings.SystemOnly,
"kvadmission.elastic_cpu.duration_per_low_pri_read",
"controls how many CPU tokens are allotted for each internally submitted low priority read request",
10*time.Millisecond,
func(duration time.Duration) error {
if duration < admission.MinElasticCPUDuration {
return fmt.Errorf("minimum CPU duration allowed is %s, got %s",
admission.MinElasticCPUDuration, duration)
}
if duration > admission.MaxElasticCPUDuration {
return fmt.Errorf("maximum CPU duration allowed is %s, got %s",
admission.MaxElasticCPUDuration, duration)
}
return nil
},
)

// internalLowPriReadElasticControlEnabled determines whether internally
// submitted low pri reads integrate with elastic CPU control.
var internalLowPriReadElasticControlEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kvadmission.low_pri_read_elastic_control.enabled",
"determines whether the internally submitted low priority reads reads integrate with elastic CPU control",
true,
)

// exportRequestElasticControlEnabled determines whether export requests
// integrate with elastic CPU control.
var exportRequestElasticControlEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kvadmission.export_request_elastic_control.enabled",
"determines whether the export requests integrate with elastic CPU control",
true,
)

// elasticCPUDurationPerRangefeedScanUnit controls how many CPU tokens are
// allotted for each unit of work during rangefeed catchup scans. Only takes
// effect if kvadmission.rangefeed_catchup_scan_elastic_control.enabled is set.
Expand Down Expand Up @@ -258,21 +296,42 @@ func (n *controllerImpl) AdmitKVWork(
}
}
if admissionEnabled {
if ba.IsSingleExportRequest() {
// Backups generate batches with single export requests, which we
// admit through the elastic CPU work queue. We grant this
// CPU-intensive work a set amount of CPU time and expect it to
// terminate (cooperatively) once it exceeds its grant. The amount
// disbursed is 100ms, which we've experimentally found to be long
// enough to do enough useful work per-request while not causing too
// much in the way of scheduling delays on individual cores. Within
// admission control we have machinery that observes scheduling
// latencies periodically and reduces the total amount of CPU time
// handed out through this mechanism, as a way to provide latency
// isolation to non-elastic ("latency sensitive") work running on
// the same machine.
// - Backups generate batches with single export requests, which we
// admit through the elastic CPU work queue. We grant this
// CPU-intensive work a set amount of CPU time and expect it to
// terminate (cooperatively) once it exceeds its grant. The amount
// disbursed is 100ms, which we've experimentally found to be long
// enough to do enough useful work per-request while not causing too
// much in the way of scheduling delays on individual cores. Within
// admission control we have machinery that observes scheduling
// latencies periodically and reduces the total amount of CPU time
// handed out through this mechanism, as a way to provide latency
// isolation to non-elastic ("latency sensitive") work running on the
// same machine.
// - We do the same for internally submitted low priority reads in
// general (notably, for KV work done on the behalf of row-level TTL
// reads). Everything admissionpb.UserLowPri and above uses the slots
// mechanism.
isInternalLowPriRead := ba.IsReadOnly() && admissionInfo.Priority < admissionpb.UserLowPri
shouldUseElasticCPU :=
(exportRequestElasticControlEnabled.Get(&n.settings.SV) && ba.IsSingleExportRequest()) ||
(internalLowPriReadElasticControlEnabled.Get(&n.settings.SV) && isInternalLowPriRead)

if shouldUseElasticCPU {
var admitDuration time.Duration
if ba.IsSingleExportRequest() {
admitDuration = elasticCPUDurationPerExportRequest.Get(&n.settings.SV)
} else if isInternalLowPriRead {
admitDuration = elasticCPUDurationPerInternalLowPriRead.Get(&n.settings.SV)
}

// TODO(irfansharif): For export requests it's possible to preempt,
// i.e. once the CPU slice is used up we terminate the work. We
// don't do this for the general case of low priority internal
// reads, so in some sense, the integration is incomplete. This is
// probably harmless.
elasticWorkHandle, err := n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.Admit(
ctx, elasticCPUDurationPerExportRequest.Get(&n.settings.SV), admissionInfo,
ctx, admitDuration, admissionInfo,
)
if err != nil {
return Handle{}, err
Expand All @@ -285,6 +344,7 @@ func (n *controllerImpl) AdmitKVWork(
}
}()
} else {
// Use the slots-based mechanism for everything else.
callAdmittedWorkDoneOnKVAdmissionQ, err := n.kvAdmissionQ.Admit(ctx, admissionInfo)
if err != nil {
return Handle{}, err
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
dbCtx.Stopper = stopper
db := kv.NewDBWithContext(cfg.AmbientCtx, tcsFactory, clock, dbCtx)
db.SQLKVResponseAdmissionQ = gcoords.Regular.GetWorkQueue(admission.SQLKVResponseWork)
db.AdmissionPacerFactory = gcoords.Elastic
db.SettingsValues = &cfg.Settings.SV

nlActive, nlRenewal := cfg.NodeLivenessDurations()
if knobs := cfg.TestingKnobs.NodeLiveness; knobs != nil {
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/row/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,10 @@ func (rf *Fetcher) Init(ctx context.Context, args FetcherInitArgs) error {
}
if args.Txn != nil {
fetcherArgs.sendFn = makeTxnKVFetcherDefaultSendFunc(args.Txn, &batchRequestsIssued)
fetcherArgs.requestAdmissionHeader = args.Txn.AdmissionHeader()
fetcherArgs.responseAdmissionQ = args.Txn.DB().SQLKVResponseAdmissionQ
fetcherArgs.admission.requestHeader = args.Txn.AdmissionHeader()
fetcherArgs.admission.responseQ = args.Txn.DB().SQLKVResponseAdmissionQ
fetcherArgs.admission.pacerFactory = args.Txn.DB().AdmissionPacerFactory
fetcherArgs.admission.settingsValues = args.Txn.DB().SettingsValues
}
rf.kvFetcher = newKVFetcher(newTxnKVFetcherInternal(fetcherArgs))
}
Expand Down
160 changes: 146 additions & 14 deletions pkg/sql/row/kv_batch_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package row

import (
"context"
"fmt"
"sync/atomic"
"time"
"unsafe"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/fetchpb"
Expand Down Expand Up @@ -57,6 +59,42 @@ var defaultKVBatchSize = rowinfra.KeyLimit(util.ConstantWithMetamorphicTestValue
1, /* metamorphicValue */
))

var logAdmissionPacerErr = log.Every(100 * time.Millisecond)

// elasticCPUDurationPerLowPriReadResponse controls how many CPU tokens are allotted
// each time we seek admission for response handling during internally submitted
// low priority reads (like row-level TTL selects).
var elasticCPUDurationPerLowPriReadResponse = settings.RegisterDurationSetting(
settings.SystemOnly,
"sqladmission.elastic_cpu.duration_per_low_pri_read_response",
"controls how many CPU tokens are allotted for handling responses for internally submitted low priority reads",
// NB: Experimentally, during TTL reads, we observed cumulative on-CPU time
// by SQL processors >> 100ms, over the course of a single select fetching
// many rows. So we pick a relative high duration here.
100*time.Millisecond,
func(duration time.Duration) error {
if duration < admission.MinElasticCPUDuration {
return fmt.Errorf("minimum CPU duration allowed is %s, got %s",
admission.MinElasticCPUDuration, duration)
}
if duration > admission.MaxElasticCPUDuration {
return fmt.Errorf("maximum CPU duration allowed is %s, got %s",
admission.MaxElasticCPUDuration, duration)
}
return nil
},
)

// internalLowPriReadElasticControlEnabled determines whether the sql portion of
// internally submitted low-priority reads (like row-level TTL selects)
// integrate with elastic CPU control.
var internalLowPriReadElasticControlEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"sqladmission.low_pri_read_response_elastic_control.enabled",
"determines whether the sql portion of internally submitted reads integrate with elastic CPU controller",
true,
)

// sendFunc is the function used to execute a KV batch; normally
// wraps (*client.Txn).Send.
type sendFunc func(
Expand Down Expand Up @@ -185,6 +223,7 @@ type txnKVFetcher struct {
// For request and response admission control.
requestAdmissionHeader kvpb.AdmissionHeader
responseAdmissionQ *admission.WorkQueue
admissionPacer *admission.Pacer
}

var _ KVBatchFetcher = &txnKVFetcher{}
Expand Down Expand Up @@ -262,8 +301,13 @@ type newTxnKVFetcherArgs struct {
acc *mon.BoundAccount
forceProductionKVBatchSize bool
batchRequestsIssued *int64
requestAdmissionHeader kvpb.AdmissionHeader
responseAdmissionQ *admission.WorkQueue

admission struct { // groups AC-related fields
requestHeader kvpb.AdmissionHeader
responseQ *admission.WorkQueue
pacerFactory admission.PacerFactory
settingsValues *settings.Values
}
}

// newTxnKVFetcherInternal initializes a txnKVFetcher.
Expand All @@ -282,9 +326,14 @@ func newTxnKVFetcherInternal(args newTxnKVFetcherArgs) *txnKVFetcher {
lockTimeout: args.lockTimeout,
acc: args.acc,
forceProductionKVBatchSize: args.forceProductionKVBatchSize,
requestAdmissionHeader: args.requestAdmissionHeader,
responseAdmissionQ: args.responseAdmissionQ,
}
requestAdmissionHeader: args.admission.requestHeader,
responseAdmissionQ: args.admission.responseQ,
}
f.maybeInitAdmissionPacer(
args.admission.requestHeader,
args.admission.pacerFactory,
args.admission.settingsValues,
)
f.kvBatchFetcherHelper.init(f.nextBatch, args.batchRequestsIssued)
return f
}
Expand All @@ -295,6 +344,42 @@ func (f *txnKVFetcher) setTxnAndSendFn(txn *kv.Txn, sendFn sendFunc) {
f.sendFn = sendFn
f.requestAdmissionHeader = txn.AdmissionHeader()
f.responseAdmissionQ = txn.DB().SQLKVResponseAdmissionQ

if f.admissionPacer != nil {
f.admissionPacer.Close()
}
f.maybeInitAdmissionPacer(txn.AdmissionHeader(), txn.DB().AdmissionPacerFactory, txn.DB().SettingsValues)
}

// maybeInitAdmissionPacer selectively initializes an admission.Pacer for work
// done as part of internally submitted low-priority reads (like row-level TTL
// selects).
func (f *txnKVFetcher) maybeInitAdmissionPacer(
admissionHeader kvpb.AdmissionHeader, pacerFactory admission.PacerFactory, sv *settings.Values,
) {
if sv == nil {
// Only nil in tests and in SQL pods (we don't have admission pacing in
// the latter anyway).
return
}
admissionPri := admissionpb.WorkPriority(admissionHeader.Priority)
if internalLowPriReadElasticControlEnabled.Get(sv) &&
admissionPri < admissionpb.UserLowPri &&
pacerFactory != nil {

f.admissionPacer = pacerFactory.NewPacer(
elasticCPUDurationPerLowPriReadResponse.Get(sv),
admission.WorkInfo{
// NB: This is either code that runs in physically isolated SQL
// pods for secondary tenants, or for the system tenant, in
// nodes running colocated SQL+KV code where all SQL code is run
// on behalf of the one tenant. So from an AC perspective, the
// tenant ID we pass through here is irrelevant.
TenantID: roachpb.SystemTenantID,
Priority: admissionPri,
CreateTime: admissionHeader.CreateTime,
})
}
}

// SetupNextFetch sets up the Fetcher for the next set of spans.
Expand Down Expand Up @@ -533,16 +618,10 @@ func (f *txnKVFetcher) fetch(ctx context.Context) error {
}
f.batchResponseAccountedFor = returnedBytes
}

// Do admission control after we've accounted for the response bytes.
if br != nil && f.responseAdmissionQ != nil {
responseAdmission := admission.WorkInfo{
TenantID: roachpb.SystemTenantID,
Priority: admissionpb.WorkPriority(f.requestAdmissionHeader.Priority),
CreateTime: f.requestAdmissionHeader.CreateTime,
}
if _, err := f.responseAdmissionQ.Admit(ctx, responseAdmission); err != nil {
return err
}
if err := f.maybeAdmitBatchResponse(ctx, br); err != nil {
return err
}

f.batchIdx++
Expand Down Expand Up @@ -570,6 +649,58 @@ func (f *txnKVFetcher) fetch(ctx context.Context) error {
return nil
}

func (f *txnKVFetcher) maybeAdmitBatchResponse(ctx context.Context, br *kvpb.BatchResponse) error {
if br == nil {
return nil // nothing to do
}

if f.admissionPacer != nil {
// If admissionPacer is initialized, we're using the elastic CPU control
// mechanism (the work is elastic in nature and using the slots based
// mechanism would permit high scheduling latencies). We want to limit
// the CPU% used by SQL during internally submitted reads, like
// row-level TTL selects. All that work happens on the same goroutine
// doing this fetch, so is accounted for when invoking .Pace() as we
// fetch KVs as part of our volcano operator iteration. See CPU profiles
// posted on #98722.
//
// TODO(irfansharif): At the time of writing, SELECTs done by the TTL
// job are not distributed at SQL level (since our DistSQL physical
// planning heuristics deems it not worthy of distribution), and with
// the local plan we only have a single goroutine (unless
// maybeParallelizeLocalScans splits up the single scan into multiple
// TableReader processors). This may change as part of
// https://github.com/cockroachdb/cockroach/issues/82164 where CPU
// intensive SQL work will happen on a different goroutine from the ones
// that evaluate the BatchRequests, so the integration is tricker there.
// If we're unable to integrate it well, we could disable usage of the
// streamer to preserve this current form of pacing.
//
// TODO(irfansharif): Add tests for the SELECT queries issued by the TTL
// to ensure that they have local plans with a single TableReader
// processor in multi-node clusters.
if err := f.admissionPacer.Pace(ctx); err != nil {
// We're unable to pace things automatically -- shout loudly
// semi-infrequently but don't fail the kv fetcher itself. At
// worst we'd be over-admitting.
if logAdmissionPacerErr.ShouldLog() {
log.Errorf(ctx, "automatic pacing: %v", err)
}
}
} else if f.responseAdmissionQ != nil {
responseAdmission := admission.WorkInfo{
TenantID: roachpb.SystemTenantID,
Priority: admissionpb.WorkPriority(f.requestAdmissionHeader.Priority),
CreateTime: f.requestAdmissionHeader.CreateTime,
}
if _, err := f.responseAdmissionQ.Admit(ctx, responseAdmission); err != nil {
return err
}
}

return nil
}

// popBatch returns the 0th "batch" in a slice of "batches", as well as the rest
// of the slice of the "batches". It nils the pointer to the 0th element before
// reslicing the outer slice.
Expand Down Expand Up @@ -746,6 +877,7 @@ func (f *txnKVFetcher) reset(ctx context.Context) {
// Close releases the resources of this txnKVFetcher.
func (f *txnKVFetcher) Close(ctx context.Context) {
f.reset(ctx)
f.admissionPacer.Close()
}

const requestUnionOverhead = int64(unsafe.Sizeof(kvpb.RequestUnion{}))
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/row/kv_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,10 @@ func newTxnKVFetcher(
// In most cases, the txn is non-nil; however, in some code paths (e.g.
// when executing EXPLAIN (VEC)) it might be nil, so we need to have
// this check.
fetcherArgs.requestAdmissionHeader = txn.AdmissionHeader()
fetcherArgs.responseAdmissionQ = txn.DB().SQLKVResponseAdmissionQ
fetcherArgs.admission.requestHeader = txn.AdmissionHeader()
fetcherArgs.admission.responseQ = txn.DB().SQLKVResponseAdmissionQ
fetcherArgs.admission.pacerFactory = txn.DB().AdmissionPacerFactory
fetcherArgs.admission.settingsValues = txn.DB().SettingsValues
}
return newTxnKVFetcherInternal(fetcherArgs)
}
Expand Down

0 comments on commit e91550f

Please sign in to comment.