Skip to content

Commit

Permalink
kvserver: introduce cpu rebalancing
Browse files Browse the repository at this point in the history
This patch allows the store rebalancer to use CPU in place of QPS when
balancing load on a cluster. This patch adds `cpu` as an option with the
cluster setting:

`kv.allocator.load_based_rebalancing.objective`

When set to `cpu`, rather than `qps`. The store rebalancer will perform
a mostly identical function, however target balancing the sum of all
replica's cpu time on each store, rather than qps. The default remains
as `qps` here.

Similar to QPS, the rebalance threshold can be set to allow controlling
the range above and below the mean store CPU is considered imbalanced,
either overfull or underfull respectively:

`kv.allocator.cpu_rebalance_threshold`: 0.1

In order to manage with mixed versions during upgrade and some
architectures not supporting the cpu sampling method, a rebalance
objective manager is introduced in `rebalance_objective.go`. The manager
mediates access to the rebalance objective and overwrites it in cases
where the objective set in the cluster setting cannot be supported.

resolves: #95380

Release note (ops change) Add option to balance cpu time (cpu)
instead of queries per second (qps) among stores in a cluster. This is
done by setting `kv.allocator.load_based_rebalancing.objective='cpu'`.
`kv.allocator.cpu_rebalance_threshold` is also added, similar to
`kv.allocator.qps_rebalance_threshold` to control the target range for
store cpu above and below the cluster mean.
  • Loading branch information
kvoli committed Jan 27, 2023
1 parent c9ee697 commit c8cf548
Show file tree
Hide file tree
Showing 20 changed files with 852 additions and 259 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -297,4 +297,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 1000022.2-34 set the active cluster version in the format '<major>.<minor>'
version version 1000022.2-36 set the active cluster version in the format '<major>.<minor>'
4 changes: 3 additions & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@
<tr><td><div id="setting-jobs-retention-time" class="anchored"><code>jobs.retention_time</code></div></td><td>duration</td><td><code>336h0m0s</code></td><td>the amount of time to retain records for completed jobs before</td></tr>
<tr><td><div id="setting-kv-allocator-load-based-lease-rebalancing-enabled" class="anchored"><code>kv.allocator.load_based_lease_rebalancing.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>set to enable rebalancing of range leases based on load and latency</td></tr>
<tr><td><div id="setting-kv-allocator-load-based-rebalancing" class="anchored"><code>kv.allocator.load_based_rebalancing</code></div></td><td>enumeration</td><td><code>leases and replicas</code></td><td>whether to rebalance based on the distribution of load across stores [off = 0, leases = 1, leases and replicas = 2]</td></tr>
<tr><td><div id="setting-kv-allocator-load-based-rebalancing-objective" class="anchored"><code>kv.allocator.load_based_rebalancing.objective</code></div></td><td>enumeration</td><td><code>qps</code></td><td>what objective does the cluster use to rebalance; if set to `qps` the cluster will attempt to balance qps among stores, if set to `cpu` the cluster will attempt to balance cpu usage among stores [qps = 0, cpu = 1]</td></tr>
<tr><td><div id="setting-kv-allocator-load-based-rebalancing-interval" class="anchored"><code>kv.allocator.load_based_rebalancing_interval</code></div></td><td>duration</td><td><code>1m0s</code></td><td>the rough interval at which each store will check for load-based lease / replica rebalancing opportunities</td></tr>
<tr><td><div id="setting-kv-allocator-qps-rebalance-threshold" class="anchored"><code>kv.allocator.qps_rebalance_threshold</code></div></td><td>float</td><td><code>0.1</code></td><td>minimum fraction away from the mean a store&#39;s QPS (such as queries per second) can be before it is considered overfull or underfull</td></tr>
<tr><td><div id="setting-kv-allocator-range-rebalance-threshold" class="anchored"><code>kv.allocator.range_rebalance_threshold</code></div></td><td>float</td><td><code>0.05</code></td><td>minimum fraction away from the mean a store&#39;s range count can be before it is considered overfull or underfull</td></tr>
<tr><td><div id="setting-kv-allocator-store-cpu-rebalance-threshold" class="anchored"><code>kv.allocator.store_cpu_rebalance_threshold</code></div></td><td>float</td><td><code>0.1</code></td><td>minimum fraction away from the mean a store&#39;s cpu usage can be before it is considered overfull or underfull</td></tr>
<tr><td><div id="setting-kv-bulk-io-write-max-rate" class="anchored"><code>kv.bulk_io_write.max_rate</code></div></td><td>byte size</td><td><code>1.0 TiB</code></td><td>the rate limit (bytes/sec) to use for writes to disk on behalf of bulk io ops</td></tr>
<tr><td><div id="setting-kv-bulk-sst-max-allowed-overage" class="anchored"><code>kv.bulk_sst.max_allowed_overage</code></div></td><td>byte size</td><td><code>64 MiB</code></td><td>if positive, allowed size in excess of target size for SSTs from export requests; export requests (i.e. BACKUP) may buffer up to the sum of kv.bulk_sst.target_size and kv.bulk_sst.max_allowed_overage in memory</td></tr>
<tr><td><div id="setting-kv-bulk-sst-target-size" class="anchored"><code>kv.bulk_sst.target_size</code></div></td><td>byte size</td><td><code>16 MiB</code></td><td>target size for SSTs emitted from export requests; export requests (i.e. BACKUP) may buffer up to the sum of kv.bulk_sst.target_size and kv.bulk_sst.max_allowed_overage in memory</td></tr>
Expand Down Expand Up @@ -235,6 +237,6 @@
<tr><td><div id="setting-trace-opentelemetry-collector" class="anchored"><code>trace.opentelemetry.collector</code></div></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 4317 will be used.</td></tr>
<tr><td><div id="setting-trace-span-registry-enabled" class="anchored"><code>trace.span_registry.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://&lt;ui&gt;/#/debug/tracez</td></tr>
<tr><td><div id="setting-trace-zipkin-collector" class="anchored"><code>trace.zipkin.collector</code></div></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 9411 will be used.</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000022.2-34</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000022.2-36</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td></tr>
</tbody>
</table>
9 changes: 8 additions & 1 deletion pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ const (
// columnar scans in the KV layer.
V23_1_KVDirectColumnarScans

// V23_1AllocatorCPUBalancing adds balancing CPU usage among stores using
// the allocator and store rebalancer. It assumes that at this version,
// stores now include their CPU in the StoreCapacity proto when gossiping.
V23_1AllocatorCPUBalancing
// *************************************************
// Step (1): Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -702,7 +706,10 @@ var rawVersionsSingleton = keyedVersions{
Key: V23_1_KVDirectColumnarScans,
Version: roachpb.Version{Major: 22, Minor: 2, Internal: 34},
},

{
Key: V23_1AllocatorCPUBalancing,
Version: roachpb.Version{Major: 22, Minor: 2, Internal: 36},
},
// *************************************************
// Step (2): Add new versions here.
// Do not add new versions to a patch release.
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"raft_transport_metrics.go",
"raft_truncator_replica.go",
"range_log.go",
"rebalance_objective.go",
"replica.go",
"replica_app_batch.go",
"replica_application_cmd.go",
Expand Down Expand Up @@ -273,6 +274,7 @@ go_test(
"raft_transport_test.go",
"raft_transport_unit_test.go",
"range_log_test.go",
"rebalance_objective_test.go",
"replica_application_cmd_buf_test.go",
"replica_application_state_machine_test.go",
"replica_batch_updates_test.go",
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/allocator/allocatorimpl/threshold.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ func getLoadThreshold(dim load.Dimension, sv *settings.Values) float64 {
switch dim {
case load.Queries:
return allocator.QPSRebalanceThreshold.Get(sv)
case load.CPU:
return allocator.CPURebalanceThreshold.Get(sv)
default:
panic(errors.AssertionFailedf("Unkown load dimension %d", dim))
}
Expand All @@ -51,6 +53,8 @@ func getLoadMinThreshold(dim load.Dimension) float64 {
switch dim {
case load.Queries:
return allocator.MinQPSThresholdDifference
case load.CPU:
return allocator.MinCPUThresholdDifference
default:
panic(errors.AssertionFailedf("Unkown load dimension %d", dim))
}
Expand All @@ -76,6 +80,8 @@ func getLoadRebalanceMinRequiredDiff(dim load.Dimension, sv *settings.Values) fl
switch dim {
case load.Queries:
return allocator.MinQPSDifferenceForTransfers.Get(sv)
case load.CPU:
return allocator.MinCPUDifferenceForTransfers
default:
panic(errors.AssertionFailedf("Unkown load dimension %d", dim))
}
Expand Down Expand Up @@ -117,3 +123,13 @@ func MakeQPSOnlyDim(v float64) load.Load {
dims[load.Queries] = v
return dims
}

// SetAllDims returns a load vector with all dimensions filled in with the
// value given.
func SetAllDims(v float64) load.Load {
dims := load.Vector{}
for i := range dims {
dims[i] = v
}
return dims
}
45 changes: 45 additions & 0 deletions pkg/kv/kvserver/allocator/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,30 @@ const (
// lightly loaded clusters.
MinQPSThresholdDifference = 100

// MinCPUThresholdDifference is the minimum CPU difference from the cluster
// mean that this system should care about. The system won't attempt to
// take action if a store's CPU differs from the mean by less than this
// amount even if it is greater than the percentage threshold. This
// prevents too many lease transfers or range rebalances in lightly loaded
// clusters.
//
// NB: This represents 5% (1/20) utilization of 1 cpu on average. This
// number was arrived at from testing to minimize thrashing. This number is
// set independent of processor speed and assumes identical value of cpu
// time across all stores. i.e. all cpu's are identical.
MinCPUThresholdDifference = float64(50 * time.Millisecond)

// MinCPUDifferenceForTransfers is the minimum CPU difference that a
// store rebalncer would care about to reconcile (via lease or replica
// rebalancing) between any two stores.
//
// NB: This is set to be two times the minimum threshold that a store needs
// to be above or below the mean to be considered overfull or underfull
// respectively. This is to make lease transfers and replica rebalances
// less sensistive to jitters in any given workload by introducing
// additional friction before taking these actions.
MinCPUDifferenceForTransfers = 2 * MinCPUThresholdDifference

// defaultLoadBasedRebalancingInterval is how frequently to check the store-level
// balance of the cluster.
defaultLoadBasedRebalancingInterval = time.Minute
Expand Down Expand Up @@ -107,6 +131,27 @@ var QPSRebalanceThreshold = func() *settings.FloatSetting {
return s
}()

// CPURebalanceThreshold is the minimum ratio of a store's cpu time to the mean
// cpu time at which that store is considered overfull or underfull of cpu
// usage.
var CPURebalanceThreshold = func() *settings.FloatSetting {
s := settings.RegisterFloatSetting(
settings.SystemOnly,
"kv.allocator.store_cpu_rebalance_threshold",
"minimum fraction away from the mean a store's cpu usage can be before it is considered overfull or underfull",
0.10,
settings.NonNegativeFloat,
func(f float64) error {
if f < 0.01 {
return errors.Errorf("cannot set kv.allocator.store_cpu_rebalance_threshold to less than 0.01")
}
return nil
},
)
s.SetVisibility(settings.Public)
return s
}()

// LoadBasedRebalanceInterval controls how frequently each store checks for
// load-base lease/replica rebalancing opportunties.
var LoadBasedRebalanceInterval = settings.RegisterPublicDurationSettingWithExplicitUnit(
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/allocator/load/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load",
visibility = ["//visibility:public"],
deps = ["//pkg/util/humanizeutil"],
)

get_x_data(name = "get_x_data")
13 changes: 12 additions & 1 deletion pkg/kv/kvserver/allocator/load/dimension.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,21 @@

package load

import "fmt"
import (
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
)

// Dimension is a singe dimension of load that a component may track.
type Dimension int

const (
// Queries refers to the number of queries.
Queries Dimension = iota
// CPU refers to the cpu time (ns) used in processing.
CPU

nDimensionsTyped
nDimensions = int(nDimensionsTyped)
Expand All @@ -28,6 +35,8 @@ func (d Dimension) String() string {
switch d {
case Queries:
return "queries-per-second"
case CPU:
return "cpu-per-second"
default:
panic(fmt.Sprintf("cannot name: unknown dimension with ordinal %d", d))
}
Expand All @@ -38,6 +47,8 @@ func (d Dimension) Format(value float64) string {
switch d {
case Queries:
return fmt.Sprintf("%.1f", value)
case CPU:
return string(humanizeutil.Duration(time.Duration(int64(value))))
default:
panic(fmt.Sprintf("cannot format value: unknown dimension with ordinal %d", d))
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/kv/kvserver/allocator/load/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,29 @@ func ElementWiseProduct(a, b Load) Load {
return bimap(a, b, func(ai, bi float64) float64 { return ai * bi })
}

// Scale applies the factor given against every dimension.
func Scale(l Load, factor float64) Load {
return nmap(l, func(_ Dimension, li float64) float64 { return li * factor })
}

// Set returns a new Load with every dimension equal to the value given.
func Set(val float64) Load {
l := Vector{}
return nmap(l, func(_ Dimension, li float64) float64 { return val })
}

func bimap(a, b Load, op func(ai, bi float64) float64) Load {
mapped := Vector{}
for dim := Dimension(0); dim < Dimension(nDimensions); dim++ {
mapped[dim] = op(a.Dim(dim), b.Dim(dim))
}
return mapped
}

func nmap(l Load, op func(d Dimension, li float64) float64) Load {
mapped := Vector{}
for dim := Dimension(0); dim < Dimension(nDimensions); dim++ {
mapped[dim] = op(dim, l.Dim(dim))
}
return mapped
}
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/allocator/range_usage_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type RangeRequestLocalityInfo struct {
func (r RangeUsageInfo) Load() load.Load {
dims := load.Vector{}
dims[load.Queries] = r.QueriesPerSecond
dims[load.CPU] = r.RequestCPUNanosPerSecond + r.RaftCPUNanosPerSecond
return dims
}

Expand All @@ -47,5 +48,11 @@ func (r RangeUsageInfo) Load() load.Load {
func (r RangeUsageInfo) TransferImpact() load.Load {
dims := load.Vector{}
dims[load.Queries] = r.QueriesPerSecond
// Only use the request recorded cpu. This assumes that all replicas will
// use the same amount of raft cpu - which may be dubious.
//
// TODO(kvoli): Look to separate out leaseholder vs replica cpu usage in
// accounting to account for follower reads if able.
dims[load.CPU] = r.RequestCPUNanosPerSecond
return dims
}
37 changes: 35 additions & 2 deletions pkg/kv/kvserver/allocator/storepool/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,11 +573,14 @@ func (sp *StorePool) UpdateLocalStoreAfterRebalance(
// network). We can't update the local store at this time.
return
}
// Only apply the raft cpu delta on rebalance. This estimate assumes that
// the raft cpu usage is approximately equal across replicas for a range.
switch changeType {
case roachpb.ADD_VOTER, roachpb.ADD_NON_VOTER:
detail.Desc.Capacity.RangeCount++
detail.Desc.Capacity.LogicalBytes += rangeUsageInfo.LogicalBytes
detail.Desc.Capacity.WritesPerSecond += rangeUsageInfo.WritesPerSecond
detail.Desc.Capacity.CPUPerSecond += rangeUsageInfo.RaftCPUNanosPerSecond
case roachpb.REMOVE_VOTER, roachpb.REMOVE_NON_VOTER:
detail.Desc.Capacity.RangeCount--
if detail.Desc.Capacity.LogicalBytes <= rangeUsageInfo.LogicalBytes {
Expand All @@ -590,6 +593,11 @@ func (sp *StorePool) UpdateLocalStoreAfterRebalance(
} else {
detail.Desc.Capacity.WritesPerSecond -= rangeUsageInfo.WritesPerSecond
}
if detail.Desc.Capacity.CPUPerSecond <= rangeUsageInfo.RaftCPUNanosPerSecond {
detail.Desc.Capacity.CPUPerSecond = 0
} else {
detail.Desc.Capacity.CPUPerSecond -= rangeUsageInfo.RaftCPUNanosPerSecond
}
default:
return
}
Expand Down Expand Up @@ -618,17 +626,23 @@ func (sp *StorePool) UpdateLocalStoreAfterRelocate(
sp.DetailsMu.Lock()
defer sp.DetailsMu.Unlock()

// Only apply the raft cpu delta on rebalance. This estimate assumes that
// the raft cpu usage is approximately equal across replicas for a range.
// TODO(kvoli): Separate into LH vs Replica, similar to the comment on
// range_usage_info.
updateTargets := func(targets []roachpb.ReplicationTarget) {
for _, target := range targets {
if toDetail := sp.GetStoreDetailLocked(target.StoreID); toDetail != nil {
toDetail.Desc.Capacity.RangeCount++
toDetail.Desc.Capacity.CPUPerSecond += rangeUsageInfo.RaftCPUNanosPerSecond
}
}
}
updatePrevious := func(previous []roachpb.ReplicaDescriptor) {
for _, old := range previous {
if toDetail := sp.GetStoreDetailLocked(old.StoreID); toDetail != nil {
toDetail.Desc.Capacity.RangeCount--
toDetail.Desc.Capacity.CPUPerSecond -= rangeUsageInfo.RaftCPUNanosPerSecond
}
}
}
Expand All @@ -655,13 +669,24 @@ func (sp *StorePool) UpdateLocalStoresAfterLeaseTransfer(
} else {
fromDetail.Desc.Capacity.QueriesPerSecond -= rangeUsageInfo.QueriesPerSecond
}
// Only apply the request cpu (leaseholder + follower-reads) delta on
// transfers. Note this does not correctly account for follower reads
// remaining on the prior leaseholder after lease transfer. Instead,
// only a cpu delta specific to the lease should be applied.
if fromDetail.Desc.Capacity.CPUPerSecond <= rangeUsageInfo.RequestCPUNanosPerSecond {
fromDetail.Desc.Capacity.CPUPerSecond = 0
} else {
fromDetail.Desc.Capacity.CPUPerSecond -= rangeUsageInfo.RequestCPUNanosPerSecond
}

sp.DetailsMu.StoreDetails[from] = &fromDetail
}

toDetail := *sp.GetStoreDetailLocked(to)
if toDetail.Desc != nil {
toDetail.Desc.Capacity.LeaseCount++
toDetail.Desc.Capacity.QueriesPerSecond += rangeUsageInfo.QueriesPerSecond
toDetail.Desc.Capacity.CPUPerSecond += rangeUsageInfo.RequestCPUNanosPerSecond
sp.DetailsMu.StoreDetails[to] = &toDetail
}
}
Expand Down Expand Up @@ -931,6 +956,10 @@ type StoreList struct {
// to be rebalance targets.
candidateLogicalBytes Stat

// CandidateCPU tracks store-cpu-per-second stats for Stores that are
// eligible to be rebalance targets.
CandidateCPU Stat

// CandidateQueriesPerSecond tracks queries-per-second stats for Stores that
// are eligible to be rebalance targets.
CandidateQueriesPerSecond Stat
Expand All @@ -957,29 +986,32 @@ func MakeStoreList(descriptors []roachpb.StoreDescriptor) StoreList {
sl.CandidateQueriesPerSecond.update(desc.Capacity.QueriesPerSecond)
sl.candidateWritesPerSecond.update(desc.Capacity.WritesPerSecond)
sl.CandidateL0Sublevels.update(float64(desc.Capacity.L0Sublevels))
sl.CandidateCPU.update(desc.Capacity.CPUPerSecond)
}
return sl
}

func (sl StoreList) String() string {
var buf bytes.Buffer
fmt.Fprintf(&buf,
" candidate: avg-ranges=%v avg-leases=%v avg-disk-usage=%v avg-queries-per-second=%v",
" candidate: avg-ranges=%v avg-leases=%v avg-disk-usage=%v avg-queries-per-second=%v avg-store-cpu-per-second=%v",
sl.CandidateRanges.Mean,
sl.CandidateLeases.Mean,
humanizeutil.IBytes(int64(sl.candidateLogicalBytes.Mean)),
sl.CandidateQueriesPerSecond.Mean,
humanizeutil.Duration(time.Duration(int64(sl.CandidateCPU.Mean))),
)
if len(sl.Stores) > 0 {
fmt.Fprintf(&buf, "\n")
} else {
fmt.Fprintf(&buf, " <no candidates>")
}
for _, desc := range sl.Stores {
fmt.Fprintf(&buf, " %d: ranges=%d leases=%d disk-usage=%s queries-per-second=%.2f l0-sublevels=%d\n",
fmt.Fprintf(&buf, " %d: ranges=%d leases=%d disk-usage=%s queries-per-second=%.2f store-cpu-per-second=%s l0-sublevels=%d\n",
desc.StoreID, desc.Capacity.RangeCount,
desc.Capacity.LeaseCount, humanizeutil.IBytes(desc.Capacity.LogicalBytes),
desc.Capacity.QueriesPerSecond,
humanizeutil.Duration(time.Duration(int64(desc.Capacity.CPUPerSecond))),
desc.Capacity.L0Sublevels,
)
}
Expand All @@ -1006,6 +1038,7 @@ func (sl StoreList) ExcludeInvalid(constraints []roachpb.ConstraintsConjunction)
func (sl StoreList) LoadMeans() load.Load {
dims := load.Vector{}
dims[load.Queries] = sl.CandidateQueriesPerSecond.Mean
dims[load.CPU] = sl.CandidateCPU.Mean
return dims
}

Expand Down
Loading

0 comments on commit c8cf548

Please sign in to comment.