Skip to content

Commit

Permalink
kv,sql: integrate row-level TTL reads with CPU limiter
Browse files Browse the repository at this point in the history
Part of cockroachdb#98722.

We do it at two levels, each appearing prominently in CPU profiles:
- Down in KV, where we're handling batch requests issued as part of
  row-level TTL selects. This is gated by
  kvadmission.ttl_read_elastic_control.enabled.
- Up in SQL, when handling KV responses to said batch requests. This is
  gated by sqladmission.ttl_read_response_elastic_control.enabled.

Similar to backups, rangefeed initial scans, and changefeed event
processing, we've observed latency impact during CPU-intensive scans
issued as part of row-level TTL jobs. We know from before that the
existing slots based mechanism for CPU work can result in excessive
scheduling latency for high-pri work in the presence of lower-pri work,
affecting end-user latencies. This commit then tries to control the
total CPU% used by row-level TTL selects through the elastic CPU
limiter. For the KV work, this was trivial -- we already have
integrations at the batch request level and now we pick out requests
with the admissionpb.TTLLowPri bit set.

For the SQL portion of the work we introduce some minimal plumbing.
Where previously we sought admission in the SQL-KV response queues after
fetching each batch of KVs from KV as part of our volcano operator
iteration, we now incrementally acquire CPU nanos. We do this
specifically for row-level TTL work. Experimentally the CPU nanos we
acquire here map roughly to the CPU utilization due to SQL work for
row-level TTL selects.

Release note: None
  • Loading branch information
irfansharif committed Aug 15, 2023
1 parent e2bf64f commit 96e69b9
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 31 deletions.
7 changes: 6 additions & 1 deletion pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/admission"
Expand Down Expand Up @@ -263,8 +264,12 @@ type DB struct {
// SQLKVResponseAdmissionQ is for use by SQL clients of the DB, and is
// placed here simply for plumbing convenience, as there is a diversity of
// SQL code that all uses kv.DB.
// TODO(sumeer): find a home for this in the SQL layer.
//
// TODO(sumeer,irfansharif): Find a home for these in the SQL layer.
// Especially SettingsValue.
SQLKVResponseAdmissionQ *admission.WorkQueue
AdmissionPacerFactory admission.PacerFactory
SettingsValues *settings.Values
}

// NonTransactionalSender returns a Sender that can be used for sending
Expand Down
81 changes: 67 additions & 14 deletions pkg/kv/kvserver/kvadmission/kvadmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,44 @@ var elasticCPUDurationPerExportRequest = settings.RegisterDurationSetting(
},
)

// elasticCPUDurationPerTTLRead controls how many CPU tokens are allotted
// for each ttl read request.
var elasticCPUDurationPerTTLRead = settings.RegisterDurationSetting(
settings.SystemOnly,
"kvadmission.elastic_cpu.duration_per_ttl_read",
"controls how many CPU tokens are allotted for each ttl read request",
10*time.Millisecond,
func(duration time.Duration) error {
if duration < admission.MinElasticCPUDuration {
return fmt.Errorf("minimum CPU duration allowed per ttl read request is %s, got %s",
admission.MinElasticCPUDuration, duration)
}
if duration > admission.MaxElasticCPUDuration {
return fmt.Errorf("maximum CPU duration allowed per ttl read request is %s, got %s",
admission.MaxElasticCPUDuration, duration)
}
return nil
},
)

// ttlReadElasticControlEnabled determines whether ttl reads integrate with
// elastic CPU control.
var ttlReadElasticControlEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kvadmission.ttl_read_elastic_control.enabled",
"determines whether the kv portion of row-level ttl reads integrate with elastic CPU control",
true,
)

// exportRequestElasticControlEnabled determines whether export requests
// integrate with elastic CPU control.
var exportRequestElasticControlEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kvadmission.export_request_elastic_control.enabled",
"determines whether the export requests integrate with elastic CPU control",
true,
)

// elasticCPUDurationPerRangefeedScanUnit controls how many CPU tokens are
// allotted for each unit of work during rangefeed catchup scans. Only takes
// effect if kvadmission.rangefeed_catchup_scan_elastic_control.enabled is set.
Expand Down Expand Up @@ -342,21 +380,35 @@ func (n *controllerImpl) AdmitKVWork(
}
}
if admissionEnabled {
if ba.IsSingleExportRequest() {
// Backups generate batches with single export requests, which we
// admit through the elastic CPU work queue. We grant this
// CPU-intensive work a set amount of CPU time and expect it to
// terminate (cooperatively) once it exceeds its grant. The amount
// disbursed is 100ms, which we've experimentally found to be long
// enough to do enough useful work per-request while not causing too
// much in the way of scheduling delays on individual cores. Within
// admission control we have machinery that observes scheduling
// latencies periodically and reduces the total amount of CPU time
// handed out through this mechanism, as a way to provide latency
// isolation to non-elastic ("latency sensitive") work running on
// the same machine.
// - Backups generate batches with single export requests, which we
// admit through the elastic CPU work queue. We grant this
// CPU-intensive work a set amount of CPU time and expect it to
// terminate (cooperatively) once it exceeds its grant. The amount
// disbursed is 100ms, which we've experimentally found to be long
// enough to do enough useful work per-request while not causing too
// much in the way of scheduling delays on individual cores. Within
// admission control we have machinery that observes scheduling
// latencies periodically and reduces the total amount of CPU time
// handed out through this mechanism, as a way to provide latency
// isolation to non-elastic ("latency sensitive") work running on the
// same machine.
// - We do the same for KV work done on the behalf of row-level TTL
// reads.
isTTLRead := ba.IsReadOnly() && admissionInfo.Priority == admissionpb.TTLLowPri
shouldUseElasticCPU :=
(exportRequestElasticControlEnabled.Get(&n.settings.SV) && ba.IsSingleExportRequest()) ||
(ttlReadElasticControlEnabled.Get(&n.settings.SV) && isTTLRead)

if shouldUseElasticCPU {
var admitDuration time.Duration
if ba.IsSingleExportRequest() {
admitDuration = elasticCPUDurationPerExportRequest.Get(&n.settings.SV)
} else if isTTLRead {
admitDuration = elasticCPUDurationPerTTLRead.Get(&n.settings.SV)
}

elasticWorkHandle, err := n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.Admit(
ctx, elasticCPUDurationPerExportRequest.Get(&n.settings.SV), admissionInfo,
ctx, admitDuration, admissionInfo,
)
if err != nil {
return Handle{}, err
Expand All @@ -369,6 +421,7 @@ func (n *controllerImpl) AdmitKVWork(
}
}()
} else {
// Use the slots-based mechanism for everything else.
callAdmittedWorkDoneOnKVAdmissionQ, err := n.kvAdmissionQ.Admit(ctx, admissionInfo)
if err != nil {
return Handle{}, err
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
admissionKnobs,
)
db.SQLKVResponseAdmissionQ = gcoords.Regular.GetWorkQueue(admission.SQLKVResponseWork)
db.AdmissionPacerFactory = gcoords.Elastic
db.SettingsValues = &cfg.Settings.SV
cbID := goschedstats.RegisterRunnableCountCallback(gcoords.Regular.CPULoad)
stopper.AddCloser(stop.CloserFn(func() {
goschedstats.UnregisterRunnableCountCallback(cbID)
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/row/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,10 @@ func (rf *Fetcher) Init(ctx context.Context, args FetcherInitArgs) error {
}
if args.Txn != nil {
fetcherArgs.sendFn = makeTxnKVFetcherDefaultSendFunc(args.Txn, &batchRequestsIssued)
fetcherArgs.requestAdmissionHeader = args.Txn.AdmissionHeader()
fetcherArgs.responseAdmissionQ = args.Txn.DB().SQLKVResponseAdmissionQ
fetcherArgs.admission.requestHeader = args.Txn.AdmissionHeader()
fetcherArgs.admission.responseQ = args.Txn.DB().SQLKVResponseAdmissionQ
fetcherArgs.admission.pacerFactory = args.Txn.DB().AdmissionPacerFactory
fetcherArgs.admission.settingsValues = args.Txn.DB().SettingsValues
}
rf.kvFetcher = newKVFetcher(newTxnKVFetcherInternal(fetcherArgs))
}
Expand Down
114 changes: 102 additions & 12 deletions pkg/sql/row/kv_batch_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package row

import (
"context"
"fmt"
"sync/atomic"
"time"
"unsafe"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/fetchpb"
Expand Down Expand Up @@ -57,6 +59,37 @@ var defaultKVBatchSize = rowinfra.KeyLimit(util.ConstantWithMetamorphicTestValue
1, /* metamorphicValue */
))

var logAdmissionPacerErr = log.Every(100 * time.Millisecond)

// elasticCPUDurationPerTTLRead controls how many CPU tokens are allotted
// each time we seek admission for response handling during row-level TTL reads.
var elasticCPUDurationPerTTLReadResponse = settings.RegisterDurationSetting(
settings.SystemOnly,
"sqladmission.elastic_cpu.duration_per_ttl_read_response",
"controls how many CPU tokens are allotted for handling responses for ttl reads",
100*time.Millisecond,
func(duration time.Duration) error {
if duration < admission.MinElasticCPUDuration {
return fmt.Errorf("minimum CPU duration allowed per ttl read response is %s, got %s",
admission.MinElasticCPUDuration, duration)
}
if duration > admission.MaxElasticCPUDuration {
return fmt.Errorf("maximum CPU duration allowed per ttl read response is %s, got %s",
admission.MaxElasticCPUDuration, duration)
}
return nil
},
)

// ttlReadElasticControlEnabled determines whether the sql portion of row-level
// TTL reads integrate with elastic CPU control.
var ttlReadElasticControlEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"sqladmission.ttl_read_response_elastic_control.enabled",
"determines whether the sql portion of row-level ttl reads integrate with elastic CPU control",
true,
)

// sendFunc is the function used to execute a KV batch; normally
// wraps (*kv.Txn).Send.
type sendFunc func(
Expand Down Expand Up @@ -186,6 +219,7 @@ type txnKVFetcher struct {
// For request and response admission control.
requestAdmissionHeader kvpb.AdmissionHeader
responseAdmissionQ *admission.WorkQueue
admissionPacer *admission.Pacer
}

var _ KVBatchFetcher = &txnKVFetcher{}
Expand Down Expand Up @@ -264,8 +298,13 @@ type newTxnKVFetcherArgs struct {
forceProductionKVBatchSize bool
kvPairsRead *int64
batchRequestsIssued *int64
requestAdmissionHeader kvpb.AdmissionHeader
responseAdmissionQ *admission.WorkQueue

admission struct { // groups AC-related fields
requestHeader kvpb.AdmissionHeader
responseQ *admission.WorkQueue
pacerFactory admission.PacerFactory
settingsValues *settings.Values
}
}

// newTxnKVFetcherInternal initializes a txnKVFetcher.
Expand All @@ -284,8 +323,27 @@ func newTxnKVFetcherInternal(args newTxnKVFetcherArgs) *txnKVFetcher {
lockTimeout: args.lockTimeout,
acc: args.acc,
forceProductionKVBatchSize: args.forceProductionKVBatchSize,
requestAdmissionHeader: args.requestAdmissionHeader,
responseAdmissionQ: args.responseAdmissionQ,
requestAdmissionHeader: args.admission.requestHeader,
responseAdmissionQ: args.admission.responseQ,
}

admissionPri := admissionpb.WorkPriority(args.admission.requestHeader.Priority)
if ttlReadElasticControlEnabled.Get(args.admission.settingsValues) &&
admissionPri == admissionpb.TTLLowPri &&
args.admission.pacerFactory != nil {

f.admissionPacer = args.admission.pacerFactory.NewPacer(
elasticCPUDurationPerTTLReadResponse.Get(args.admission.settingsValues),
admission.WorkInfo{
// NB: This is either code that runs in physically isolated SQL
// pods for secondary tenants, or for the system tenant, in
// nodes running colocated SQL+KV code where all SQL code is run
// on behalf of the one tenant. So from an AC perspective, the
// tenant ID we pass through here is irrelevant.
TenantID: roachpb.SystemTenantID,
Priority: admissionPri,
CreateTime: args.admission.requestHeader.CreateTime,
})
}
f.kvBatchFetcherHelper.init(f.nextBatch, args.kvPairsRead, args.batchRequestsIssued)
return f
Expand All @@ -297,6 +355,25 @@ func (f *txnKVFetcher) setTxnAndSendFn(txn *kv.Txn, sendFn sendFunc) {
f.sendFn = sendFn
f.requestAdmissionHeader = txn.AdmissionHeader()
f.responseAdmissionQ = txn.DB().SQLKVResponseAdmissionQ

if f.admissionPacer != nil {
f.admissionPacer.Close()
}
admissionPri := admissionpb.WorkPriority(txn.AdmissionHeader().Priority)
if ttlReadElasticControlEnabled.Get(txn.DB().SettingsValues) &&
admissionPri == admissionpb.TTLLowPri &&
txn.DB().AdmissionPacerFactory != nil {

f.admissionPacer = txn.DB().AdmissionPacerFactory.NewPacer(
elasticCPUDurationPerTTLReadResponse.Get(txn.DB().SettingsValues),
admission.WorkInfo{
// NB: See comment above in newTxnKVFetcherInternal for why
// blindly passing in the system tenant ID is ok.
TenantID: roachpb.SystemTenantID,
Priority: admissionPri,
CreateTime: txn.AdmissionHeader().CreateTime,
})
}
}

// SetupNextFetch sets up the Fetcher for the next set of spans.
Expand Down Expand Up @@ -540,14 +617,26 @@ func (f *txnKVFetcher) fetch(ctx context.Context) error {
f.batchResponseAccountedFor = returnedBytes
}
// Do admission control after we've accounted for the response bytes.
if br != nil && f.responseAdmissionQ != nil {
responseAdmission := admission.WorkInfo{
TenantID: roachpb.SystemTenantID,
Priority: admissionpb.WorkPriority(f.requestAdmissionHeader.Priority),
CreateTime: f.requestAdmissionHeader.CreateTime,
}
if _, err := f.responseAdmissionQ.Admit(ctx, responseAdmission); err != nil {
return err

if br != nil {
if f.admissionPacer != nil {
if err := f.admissionPacer.Pace(ctx); err != nil {
// We're unable to pace things automatically -- shout loudly
// semi-infrequently but don't fail the kv fetcher itself. At
// worst we'd be over-admitting.
if logAdmissionPacerErr.ShouldLog() {
log.Errorf(ctx, "automatic pacing: %v", err)
}
}
} else if f.responseAdmissionQ != nil {
responseAdmission := admission.WorkInfo{
TenantID: roachpb.SystemTenantID,
Priority: admissionpb.WorkPriority(f.requestAdmissionHeader.Priority),
CreateTime: f.requestAdmissionHeader.CreateTime,
}
if _, err := f.responseAdmissionQ.Admit(ctx, responseAdmission); err != nil {
return err
}
}
}

Expand Down Expand Up @@ -754,6 +843,7 @@ func (f *txnKVFetcher) reset(ctx context.Context) {
// Close releases the resources of this txnKVFetcher.
func (f *txnKVFetcher) Close(ctx context.Context) {
f.reset(ctx)
f.admissionPacer.Close()
}

const requestUnionOverhead = int64(unsafe.Sizeof(kvpb.RequestUnion{}))
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/row/kv_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,10 @@ func newTxnKVFetcher(
// In most cases, the txn is non-nil; however, in some code paths (e.g.
// when executing EXPLAIN (VEC)) it might be nil, so we need to have
// this check.
fetcherArgs.requestAdmissionHeader = txn.AdmissionHeader()
fetcherArgs.responseAdmissionQ = txn.DB().SQLKVResponseAdmissionQ
fetcherArgs.admission.requestHeader = txn.AdmissionHeader()
fetcherArgs.admission.responseQ = txn.DB().SQLKVResponseAdmissionQ
fetcherArgs.admission.pacerFactory = txn.DB().AdmissionPacerFactory
fetcherArgs.admission.settingsValues = txn.DB().SettingsValues
}
return newTxnKVFetcherInternal(fetcherArgs)
}
Expand Down

0 comments on commit 96e69b9

Please sign in to comment.