Skip to content

Commit

Permalink
Merge #108815
Browse files Browse the repository at this point in the history
108815: kv,sql: integrate row-level TTL reads with CPU limiter r=irfansharif a=irfansharif

Part of #98722.

We do it at two levels, each appearing prominently in CPU profiles:
- Down in KV, where we're handling batch requests issued as part of row-level TTL selects. This is gated by kvadmission.ttl_read_elastic_control.enabled.
- Up in SQL, when handling KV responses to said batch requests. This is gated by sqladmission.ttl_read_response_elastic_control.enabled.

Similar to backups, rangefeed initial scans, and changefeed event processing, we've observed latency impact during CPU-intensive scans issued as part of row-level TTL jobs. We know from before that the existing slots based mechanism for CPU work can result in excessive scheduling latency for high-pri work in the presence of lower-pri work, affecting end-user latencies. This commit then tries to control the total CPU% used by row-level TTL selects through the elastic CPU limiter. For the KV work, this was trivial -- we already have integrations at the batch request level and now we pick out requests with the admissionpb.TTLLowPri bit set.

For the SQL portion of the work we introduce some minimal plumbing. Where previously we sought admission in the SQL-KV response queues after fetching each batch of KVs from KV as part of our volcano operator iteration, we now incrementally acquire CPU nanos. We do this specifically for row-level TTL work. Experimentally the CPU nanos we acquire here map roughly to the CPU utilization due to SQL work for row-level TTL selects.

Release note: None

Co-authored-by: irfan sharif <irfanmahmoudsharif@gmail.com>
  • Loading branch information
craig[bot] and irfansharif committed Aug 22, 2023
2 parents c040b73 + 4451735 commit fd3a78b
Show file tree
Hide file tree
Showing 13 changed files with 341 additions and 120 deletions.
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"admission_control_intent_resolution.go",
"admission_control_multi_store_overload.go",
"admission_control_multitenant_fairness.go",
"admission_control_row_level_ttl.go",
"admission_control_snapshot_overload.go",
"admission_control_tpcc_overload.go",
"allocation_bench.go",
Expand Down Expand Up @@ -140,7 +141,6 @@ go_library(
"restore.go",
"roachmart.go",
"roachtest.go",
"row_level_ttl.go",
"ruby_pg.go",
"ruby_pg_blocklist.go",
"rust_postgres.go",
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/admission_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func registerAdmission(r registry.Registry) {

registerElasticControlForBackups(r)
registerElasticControlForCDC(r)
registerElasticControlForRowLevelTTL(r)
registerMultiStoreOverload(r)
registerMultiTenantFairness(r)
registerSnapshotOverload(r)
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/tests/admission_control_elastic_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ func registerElasticControlForBackups(r registry.Registry) {
)

if t.SkipInit() {
t.Status(fmt.Sprintf("running tpcc for %s (<%s)", workloadDuration, time.Minute))
t.Status(fmt.Sprintf("running tpcc for %s (<%s)", workloadDuration, estimatedSetupTime))
} else {
t.Status(fmt.Sprintf("initializing + running tpcc for %s (<%s)", workloadDuration, 10*time.Minute))
t.Status(fmt.Sprintf("initializing + running tpcc for %s (<%s)", workloadDuration, estimatedSetupTime))
}

runTPCC(ctx, t, c, tpccOptions{
Expand Down
116 changes: 116 additions & 0 deletions pkg/cmd/roachtest/tests/admission_control_row_level_ttl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tests

import (
"context"
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

func registerElasticControlForRowLevelTTL(r registry.Registry) {
const nodes = 7
var clusterSpec = spec.CPU(4)
r.Add(makeElasticControlRowLevelTTL(r.MakeClusterSpec(nodes, clusterSpec), false /* expiredRows */))
r.Add(makeElasticControlRowLevelTTL(r.MakeClusterSpec(nodes, clusterSpec), true /* expiredRows */))
}

func makeElasticControlRowLevelTTL(spec spec.ClusterSpec, expiredRows bool) registry.TestSpec {
return registry.TestSpec{
Name: fmt.Sprintf("admission-control/row-level-ttl/expired-rows=%t", expiredRows),
Owner: registry.OwnerAdmissionControl,
Benchmark: true,
Tags: registry.Tags(`weekly`),
Cluster: spec,
Leases: registry.MetamorphicLeases,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
crdbNodes := c.Spec().NodeCount - 1
workloadNode := crdbNodes + 1

numWarehouses, activeWarehouses, workloadDuration, estimatedSetupTime := 1500, 100, 20*time.Minute, 20*time.Minute
if c.IsLocal() {
numWarehouses, activeWarehouses, workloadDuration, estimatedSetupTime = 1, 1, 3*time.Minute, 2*time.Minute
}

promCfg := &prometheus.Config{}
promCfg.WithPrometheusNode(c.Node(workloadNode).InstallNodes()[0]).
WithNodeExporter(c.Range(1, c.Spec().NodeCount-1).InstallNodes()).
WithCluster(c.Range(1, c.Spec().NodeCount-1).InstallNodes()).
WithGrafanaDashboard("https://go.crdb.dev/p/index-admission-control-grafana").
WithScrapeConfigs(
prometheus.MakeWorkloadScrapeConfig("workload", "/",
makeWorkloadScrapeNodes(
c.Node(workloadNode).InstallNodes()[0],
[]workloadInstance{{nodes: c.Node(workloadNode)}},
),
),
)

if t.SkipInit() {
t.Status(fmt.Sprintf("running tpcc for %s (<%s)", workloadDuration, estimatedSetupTime))
} else {
t.Status(fmt.Sprintf("initializing + running tpcc for %s (<%s)", workloadDuration, estimatedSetupTime))
}

runTPCC(ctx, t, c, tpccOptions{
Warehouses: numWarehouses,
Duration: workloadDuration,
SetupType: usingImport,
EstimatedSetupTime: estimatedSetupTime,
// The expired-rows test will delete rows from the order_line table, so
// the post run checks are expected to fail.
SkipPostRunCheck: expiredRows,
PrometheusConfig: promCfg,
// We limit the number of workers because the default results in a lot
// of connections which can lead to OOM issues (see #40566).
ExtraRunArgs: fmt.Sprintf("--wait=false --tolerate-errors --max-rate=100 --active-warehouses=%d --workers=%d", activeWarehouses, numWarehouses),
DisableDefaultScheduledBackup: true,
During: func(ctx context.Context) error {
cronOffset := 10
if c.IsLocal() {
cronOffset = 1
}
nowMinute := timeutil.Now().Minute()
scheduledMinute := (nowMinute + cronOffset) % 60 // schedule the TTL cron job to kick off a few minutes after test start

var expirationExpr string
if expiredRows {
expirationExpr = `'((ol_delivery_d::TIMESTAMP) + INTERVAL ''1 days'') AT TIME ZONE ''UTC'''`
} else {
// The TPCC fixtures have dates from 2006 for the ol_delivery_d column.
expirationExpr = `'((ol_delivery_d::TIMESTAMP) + INTERVAL ''1000 years'') AT TIME ZONE ''UTC'''`
}

// NB: To verify that AC is working as expected, ensure
// admission_scheduler_latency_listener_p99_nanos is around
// 1ms, that sys_cpu_combined_percent_normalized doesn't hit
// 100% (it stays around 60% at the time of writing) and
// that admission_elastic_cpu_utilization >= 5%, showing
// that we're acquiring elastic CPU tokens.

ttlStatement := fmt.Sprintf(`
ALTER TABLE tpcc.public.order_line SET (
ttl_expiration_expression=%s,
ttl_job_cron='%d * * * *'
);`, expirationExpr, scheduledMinute)
return runAndLogStmts(ctx, t, c, "enable-ttl", []string{ttlStatement})
},
})
},
}
}
1 change: 0 additions & 1 deletion pkg/cmd/roachtest/tests/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ func RegisterTests(r registry.Registry) {
registerRestoreNodeShutdown(r)
registerRoachmart(r)
registerRoachtest(r)
registerRowLevelTTLDuringTPCC(r)
registerRubyPG(r)
registerRustPostgres(r)
registerSQLAlchemy(r)
Expand Down
81 changes: 0 additions & 81 deletions pkg/cmd/roachtest/tests/row_level_ttl.go

This file was deleted.

2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/tests/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func runTPCC(ctx context.Context, t test.Test, c cluster.Cluster, opts tpccOptio

var ep *tpccChaosEventProcessor
var promCfg *prometheus.Config
if !opts.DisablePrometheus {
if !opts.DisablePrometheus && !t.SkipInit() {
// TODO(irfansharif): Move this after the import step. The statistics
// during import itself is uninteresting and pollutes actual workload
// data.
Expand Down
7 changes: 6 additions & 1 deletion pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/admission"
Expand Down Expand Up @@ -263,8 +264,12 @@ type DB struct {
// SQLKVResponseAdmissionQ is for use by SQL clients of the DB, and is
// placed here simply for plumbing convenience, as there is a diversity of
// SQL code that all uses kv.DB.
// TODO(sumeer): find a home for this in the SQL layer.
//
// TODO(sumeer,irfansharif): Find a home for these in the SQL layer.
// Especially SettingsValue.
SQLKVResponseAdmissionQ *admission.WorkQueue
AdmissionPacerFactory admission.PacerFactory
SettingsValues *settings.Values
}

// NonTransactionalSender returns a Sender that can be used for sending
Expand Down
78 changes: 64 additions & 14 deletions pkg/kv/kvserver/kvadmission/kvadmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,34 @@ var elasticCPUDurationPerExportRequest = settings.RegisterDurationSetting(
settings.DurationInRange(admission.MinElasticCPUDuration, admission.MaxElasticCPUDuration),
)

// elasticCPUDurationPerInternalLowPriRead controls how many CPU tokens are
// allotted for each internally submitted low priority read request.
var elasticCPUDurationPerInternalLowPriRead = settings.RegisterDurationSetting(
settings.SystemOnly,
"kvadmission.elastic_cpu.duration_per_low_pri_read",
"controls how many CPU tokens are allotted for each internally submitted low priority read request",
10*time.Millisecond,
settings.DurationInRange(admission.MinElasticCPUDuration, admission.MaxElasticCPUDuration),
)

// internalLowPriReadElasticControlEnabled determines whether internally
// submitted low pri reads integrate with elastic CPU control.
var internalLowPriReadElasticControlEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kvadmission.low_pri_read_elastic_control.enabled",
"determines whether the internally submitted low priority reads reads integrate with elastic CPU control",
true,
)

// exportRequestElasticControlEnabled determines whether export requests
// integrate with elastic CPU control.
var exportRequestElasticControlEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kvadmission.export_request_elastic_control.enabled",
"determines whether the export requests integrate with elastic CPU control",
true,
)

// elasticCPUDurationPerRangefeedScanUnit controls how many CPU tokens are
// allotted for each unit of work during rangefeed catchup scans. Only takes
// effect if kvadmission.rangefeed_catchup_scan_elastic_control.enabled is set.
Expand Down Expand Up @@ -322,21 +350,42 @@ func (n *controllerImpl) AdmitKVWork(
}
}
if admissionEnabled {
if ba.IsSingleExportRequest() {
// Backups generate batches with single export requests, which we
// admit through the elastic CPU work queue. We grant this
// CPU-intensive work a set amount of CPU time and expect it to
// terminate (cooperatively) once it exceeds its grant. The amount
// disbursed is 100ms, which we've experimentally found to be long
// enough to do enough useful work per-request while not causing too
// much in the way of scheduling delays on individual cores. Within
// admission control we have machinery that observes scheduling
// latencies periodically and reduces the total amount of CPU time
// handed out through this mechanism, as a way to provide latency
// isolation to non-elastic ("latency sensitive") work running on
// the same machine.
// - Backups generate batches with single export requests, which we
// admit through the elastic CPU work queue. We grant this
// CPU-intensive work a set amount of CPU time and expect it to
// terminate (cooperatively) once it exceeds its grant. The amount
// disbursed is 100ms, which we've experimentally found to be long
// enough to do enough useful work per-request while not causing too
// much in the way of scheduling delays on individual cores. Within
// admission control we have machinery that observes scheduling
// latencies periodically and reduces the total amount of CPU time
// handed out through this mechanism, as a way to provide latency
// isolation to non-elastic ("latency sensitive") work running on the
// same machine.
// - We do the same for internally submitted low priority reads in
// general (notably, for KV work done on the behalf of row-level TTL
// reads). Everything admissionpb.UserLowPri and above uses the slots
// mechanism.
isInternalLowPriRead := ba.IsReadOnly() && admissionInfo.Priority < admissionpb.UserLowPri
shouldUseElasticCPU :=
(exportRequestElasticControlEnabled.Get(&n.settings.SV) && ba.IsSingleExportRequest()) ||
(internalLowPriReadElasticControlEnabled.Get(&n.settings.SV) && isInternalLowPriRead)

if shouldUseElasticCPU {
var admitDuration time.Duration
if ba.IsSingleExportRequest() {
admitDuration = elasticCPUDurationPerExportRequest.Get(&n.settings.SV)
} else if isInternalLowPriRead {
admitDuration = elasticCPUDurationPerInternalLowPriRead.Get(&n.settings.SV)
}

// TODO(irfansharif): For export requests it's possible to preempt,
// i.e. once the CPU slice is used up we terminate the work. We
// don't do this for the general case of low priority internal
// reads, so in some sense, the integration is incomplete. This is
// probably harmless.
elasticWorkHandle, err := n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.Admit(
ctx, elasticCPUDurationPerExportRequest.Get(&n.settings.SV), admissionInfo,
ctx, admitDuration, admissionInfo,
)
if err != nil {
return Handle{}, err
Expand All @@ -349,6 +398,7 @@ func (n *controllerImpl) AdmitKVWork(
}
}()
} else {
// Use the slots-based mechanism for everything else.
callAdmittedWorkDoneOnKVAdmissionQ, err := n.kvAdmissionQ.Admit(ctx, admissionInfo)
if err != nil {
return Handle{}, err
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf
admissionKnobs,
)
db.SQLKVResponseAdmissionQ = gcoords.Regular.GetWorkQueue(admission.SQLKVResponseWork)
db.AdmissionPacerFactory = gcoords.Elastic
db.SettingsValues = &cfg.Settings.SV
cbID := goschedstats.RegisterRunnableCountCallback(gcoords.Regular.CPULoad)
stopper.AddCloser(stop.CloserFn(func() {
goschedstats.UnregisterRunnableCountCallback(cbID)
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/row/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,10 @@ func (rf *Fetcher) Init(ctx context.Context, args FetcherInitArgs) error {
}
if args.Txn != nil {
fetcherArgs.sendFn = makeTxnKVFetcherDefaultSendFunc(args.Txn, &batchRequestsIssued)
fetcherArgs.requestAdmissionHeader = args.Txn.AdmissionHeader()
fetcherArgs.responseAdmissionQ = args.Txn.DB().SQLKVResponseAdmissionQ
fetcherArgs.admission.requestHeader = args.Txn.AdmissionHeader()
fetcherArgs.admission.responseQ = args.Txn.DB().SQLKVResponseAdmissionQ
fetcherArgs.admission.pacerFactory = args.Txn.DB().AdmissionPacerFactory
fetcherArgs.admission.settingsValues = args.Txn.DB().SettingsValues
}
rf.kvFetcher = newKVFetcher(newTxnKVFetcherInternal(fetcherArgs))
}
Expand Down
Loading

0 comments on commit fd3a78b

Please sign in to comment.