Skip to content

Commit

Permalink
kvserver: delegate snapshots to followers
Browse files Browse the repository at this point in the history
Fixes: cockroachdb#42491

This commit allows a snapshot to be sent by a follower instead of the
leader of a range. The follower(s) are chosen based on locality to the
final recipient of the snapshot.  If the follower is not able to
quickly send the snapshot, the attempt is aborted and the leader sends
the snapshot instead.

By choosing a delegate rather than sending the snapshot directly, WAN
traffic can be minimized. Additionally the snapshot will likely be
delivered faster.

There are two settings that control this feature. The first,
`kv.snapshot_delegation.num_follower`, controls how many followers
the snapshot is attempted to be delegated through. If set to 0, then
snapshot delegation is disabled. The second,
`kv.snapshot_delegation_queue.enabled`, controls whether delegated
snapshots will queue on the delegate or return failure immediately. This
is useful to prevent a delegation request from spending a long time
waiting before it is sent.

Before the snapshot is sent from the follower checks are done to
verify that the delegate is able to send a snapshot that will be valid
for the recipient. If not the request is rerouted to the leader.

Release note (performance improvement): Adds delegated snapshots which
can reduce WAN traffic for snapshot movement.
  • Loading branch information
andrewbaptist committed Dec 30, 2022
1 parent e872ced commit 2f8de54
Show file tree
Hide file tree
Showing 20 changed files with 970 additions and 408 deletions.
3 changes: 2 additions & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@
<tr><td><div id="setting-kv-replica-circuit-breaker-slow-replication-threshold" class="anchored"><code>kv.replica_circuit_breaker.slow_replication_threshold</code></div></td><td>duration</td><td><code>1m0s</code></td><td>duration after which slow proposals trip the per-Replica circuit breaker (zero duration disables breakers)</td></tr>
<tr><td><div id="setting-kv-replica-stats-addsst-request-size-factor" class="anchored"><code>kv.replica_stats.addsst_request_size_factor</code></div></td><td>integer</td><td><code>50000</code></td><td>the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1</td></tr>
<tr><td><div id="setting-kv-replication-reports-interval" class="anchored"><code>kv.replication_reports.interval</code></div></td><td>duration</td><td><code>1m0s</code></td><td>the frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable)</td></tr>
<tr><td><div id="setting-kv-snapshot-delegation-enabled" class="anchored"><code>kv.snapshot_delegation.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>set to true to allow snapshots from follower replicas</td></tr>
<tr><td><div id="setting-kv-snapshot-delegation-num-follower" class="anchored"><code>kv.snapshot_delegation.num_follower</code></div></td><td>integer</td><td><code>1</code></td><td>the number of delegates to try when sending snapshots, before falling back to sending from the leaseholder</td></tr>
<tr><td><div id="setting-kv-snapshot-delegation-num-requests" class="anchored"><code>kv.snapshot_delegation.num_requests</code></div></td><td>integer</td><td><code>3</code></td><td>how many queued requests are allowed on a delegate before the request is rejected</td></tr>
<tr><td><div id="setting-kv-snapshot-rebalance-max-rate" class="anchored"><code>kv.snapshot_rebalance.max_rate</code></div></td><td>byte size</td><td><code>32 MiB</code></td><td>the rate limit (bytes/sec) to use for rebalance and upreplication snapshots</td></tr>
<tr><td><div id="setting-kv-snapshot-recovery-max-rate" class="anchored"><code>kv.snapshot_recovery.max_rate</code></div></td><td>byte size</td><td><code>32 MiB</code></td><td>the rate limit (bytes/sec) to use for recovery snapshots</td></tr>
<tr><td><div id="setting-kv-transaction-max-intents-bytes" class="anchored"><code>kv.transaction.max_intents_bytes</code></div></td><td>integer</td><td><code>4194304</code></td><td>maximum number of bytes used to track locks in transactions</td></tr>
Expand Down
91 changes: 53 additions & 38 deletions pkg/cmd/roachtest/tests/decommissionbench.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,15 @@ const (

type decommissionBenchSpec struct {
nodes int
cpus int
warehouses int
load bool
noLoad bool
multistore bool
snapshotRate int
duration time.Duration

// Whether the test cluster nodes are in multiple regions.
multiregion bool

// When true, the test will attempt to stop the node prior to decommission.
whileDown bool

Expand All @@ -85,6 +87,10 @@ type decommissionBenchSpec struct {
// An override for the default timeout, if needed.
timeout time.Duration

// An override for the decommission node to make it choose a predictable node
// instead of a random node.
decommissionNode int

skip string
}

Expand All @@ -95,29 +101,21 @@ func registerDecommissionBench(r registry.Registry) {
// Basic benchmark configurations, to be run nightly.
{
nodes: 4,
cpus: 16,
warehouses: 1000,
load: true,
},
{
nodes: 4,
cpus: 16,
warehouses: 1000,
load: true,
duration: 1 * time.Hour,
},
{
nodes: 4,
cpus: 16,
warehouses: 1000,
load: true,
whileDown: true,
},
{
nodes: 8,
cpus: 16,
warehouses: 3000,
load: true,
// This test can take nearly an hour to import and achieve balance, so
// we extend the timeout to let it complete.
timeout: 4 * time.Hour,
Expand All @@ -126,9 +124,7 @@ func registerDecommissionBench(r registry.Registry) {
{
// Add a new node during decommission (no drain).
nodes: 8,
cpus: 16,
warehouses: 3000,
load: true,
whileUpreplicating: true,
// This test can take nearly an hour to import and achieve balance, so
// we extend the timeout to let it complete.
Expand All @@ -138,9 +134,7 @@ func registerDecommissionBench(r registry.Registry) {
{
// Drain before decommission, without adding a new node.
nodes: 8,
cpus: 16,
warehouses: 3000,
load: true,
drainFirst: true,
// This test can take nearly an hour to import and achieve balance, so
// we extend the timeout to let it complete.
Expand All @@ -150,9 +144,7 @@ func registerDecommissionBench(r registry.Registry) {
{
// Drain before decommission, and add a new node.
nodes: 8,
cpus: 16,
warehouses: 3000,
load: true,
whileUpreplicating: true,
drainFirst: true,
// This test can take nearly an hour to import and achieve balance, so
Expand All @@ -162,25 +154,19 @@ func registerDecommissionBench(r registry.Registry) {
},
{
nodes: 4,
cpus: 16,
warehouses: 1000,
load: true,
drainFirst: true,
skip: manualBenchmarkingOnly,
},
{
nodes: 4,
cpus: 16,
warehouses: 1000,
load: true,
slowWrites: true,
skip: manualBenchmarkingOnly,
},
{
nodes: 8,
cpus: 16,
warehouses: 3000,
load: true,
slowWrites: true,
// This test can take nearly an hour to import and achieve balance, so
// we extend the timeout to let it complete.
Expand All @@ -189,9 +175,7 @@ func registerDecommissionBench(r registry.Registry) {
},
{
nodes: 12,
cpus: 16,
warehouses: 3000,
load: true,
multistore: true,
// This test can take nearly an hour to import and achieve balance, so
// we extend the timeout to let it complete.
Expand All @@ -201,14 +185,30 @@ func registerDecommissionBench(r registry.Registry) {
{
// Test to compare 12 4-store nodes vs 48 single-store nodes
nodes: 48,
cpus: 16,
warehouses: 3000,
load: true,
// This test can take nearly an hour to import and achieve balance, so
// we extend the timeout to let it complete.
timeout: 3 * time.Hour,
skip: manualBenchmarkingOnly,
},
{
// Multiregion decommission, and add a new node in the same region.
nodes: 6,
warehouses: 1000,
whileUpreplicating: true,
drainFirst: true,
multiregion: true,
decommissionNode: 2,
},
{
// Multiregion decommission, and add a new node in a different region.
nodes: 6,
warehouses: 1000,
whileUpreplicating: true,
drainFirst: true,
multiregion: true,
decommissionNode: 3,
},
} {
registerDecommissionBenchSpec(r, benchSpec)
}
Expand All @@ -223,7 +223,7 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe
}
extraNameParts := []string{""}
addlNodeCount := 0
specOptions := []spec.Option{spec.CPU(benchSpec.cpus)}
specOptions := []spec.Option{spec.CPU(16)}

if benchSpec.snapshotRate != 0 {
extraNameParts = append(extraNameParts,
Expand Down Expand Up @@ -251,19 +251,31 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe
extraNameParts = append(extraNameParts, "while-upreplicating")
}

if !benchSpec.load {
if benchSpec.noLoad {
extraNameParts = append(extraNameParts, "no-load")
}

if benchSpec.slowWrites {
extraNameParts = append(extraNameParts, "hi-read-amp")
}

if benchSpec.decommissionNode != 0 {
extraNameParts = append(extraNameParts,
fmt.Sprintf("target=%d", benchSpec.decommissionNode))
}

if benchSpec.duration > 0 {
timeout = benchSpec.duration * 3
extraNameParts = append(extraNameParts, fmt.Sprintf("duration=%s", benchSpec.duration))
}

if benchSpec.multiregion {
geoZones := []string{regionUsEast, regionUsWest, regionUsCentral}
specOptions = append(specOptions, spec.Zones(strings.Join(geoZones, ",")))
specOptions = append(specOptions, spec.Geo())
extraNameParts = append(extraNameParts, "multi-region")
}

// If run with ROACHTEST_DECOMMISSION_NOSKIP=1, roachtest will enable all specs.
noSkipFlag := os.Getenv(envDecommissionNoSkipFlag)
if noSkipFlag != "" {
Expand All @@ -273,8 +285,8 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe
extraName := strings.Join(extraNameParts, "/")

r.Add(registry.TestSpec{
Name: fmt.Sprintf("decommissionBench/nodes=%d/cpu=%d/warehouses=%d%s",
benchSpec.nodes, benchSpec.cpus, benchSpec.warehouses, extraName),
Name: fmt.Sprintf("decommissionBench/nodes=%d/warehouses=%d%s",
benchSpec.nodes, benchSpec.warehouses, extraName),
Owner: registry.OwnerKV,
Cluster: r.MakeClusterSpec(
benchSpec.nodes+addlNodeCount+1,
Expand Down Expand Up @@ -471,7 +483,7 @@ func uploadPerfArtifacts(

// Get the workload perf artifacts and move them to the pinned node, so that
// they can be used to display the workload operation rates during decommission.
if benchSpec.load {
if !benchSpec.noLoad {
workloadStatsSrc := filepath.Join(t.PerfArtifactsDir(), "stats.json")
localWorkloadStatsPath := filepath.Join(t.ArtifactsDir(), "workload_stats.json")
workloadStatsDest := filepath.Join(t.PerfArtifactsDir(), "workload_stats.json")
Expand Down Expand Up @@ -595,7 +607,7 @@ func runDecommissionBench(
workloadCtx, workloadCancel := context.WithCancel(ctx)
m := c.NewMonitor(workloadCtx, crdbNodes)

if benchSpec.load {
if !benchSpec.noLoad {
m.Go(
func(ctx context.Context) error {
close(rampStarted)
Expand Down Expand Up @@ -642,7 +654,7 @@ func runDecommissionBench(

// If we are running a workload, wait until it has started and completed its
// ramp time before initiating a decommission.
if benchSpec.load {
if !benchSpec.noLoad {
<-rampStarted
t.Status("Waiting for workload to ramp up...")
select {
Expand All @@ -666,7 +678,7 @@ func runDecommissionBench(

m.ExpectDeath()
defer m.ResetDeaths()
err := runSingleDecommission(ctx, h, pinnedNode, &targetNodeAtomic, benchSpec.snapshotRate,
err := runSingleDecommission(ctx, h, pinnedNode, benchSpec.decommissionNode, &targetNodeAtomic, benchSpec.snapshotRate,
benchSpec.whileDown, benchSpec.drainFirst, false /* reuse */, benchSpec.whileUpreplicating,
true /* estimateDuration */, benchSpec.slowWrites, tickByName,
)
Expand Down Expand Up @@ -729,7 +741,7 @@ func runDecommissionBenchLong(
workloadCtx, workloadCancel := context.WithCancel(ctx)
m := c.NewMonitor(workloadCtx, crdbNodes)

if benchSpec.load {
if !benchSpec.noLoad {
m.Go(
func(ctx context.Context) error {
close(rampStarted)
Expand Down Expand Up @@ -773,7 +785,7 @@ func runDecommissionBenchLong(

// If we are running a workload, wait until it has started and completed its
// ramp time before initiating a decommission.
if benchSpec.load {
if !benchSpec.noLoad {
<-rampStarted
t.Status("Waiting for workload to ramp up...")
select {
Expand All @@ -786,7 +798,7 @@ func runDecommissionBenchLong(

for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= benchSpec.duration; {
m.ExpectDeath()
err := runSingleDecommission(ctx, h, pinnedNode, &targetNodeAtomic, benchSpec.snapshotRate,
err := runSingleDecommission(ctx, h, pinnedNode, benchSpec.decommissionNode, &targetNodeAtomic, benchSpec.snapshotRate,
benchSpec.whileDown, benchSpec.drainFirst, true /* reuse */, benchSpec.whileUpreplicating,
true /* estimateDuration */, benchSpec.slowWrites, tickByName,
)
Expand Down Expand Up @@ -827,12 +839,15 @@ func runSingleDecommission(
ctx context.Context,
h *decommTestHelper,
pinnedNode int,
target int,
targetLogicalNodeAtomic *uint32,
snapshotRateMb int,
stopFirst, drainFirst, reuse, noBalanceWait, estimateDuration, slowWrites bool,
tickByName func(name string),
) error {
target := h.getRandNodeOtherThan(pinnedNode)
if target == 0 {
target = h.getRandNodeOtherThan(pinnedNode)
}
targetLogicalNodeID, err := h.getLogicalNodeID(ctx, target)
if err != nil {
return err
Expand Down
35 changes: 35 additions & 0 deletions pkg/kv/kvserver/allocator/storepool/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,23 @@ func (sp *StorePool) IsLive(storeID roachpb.StoreID) (bool, error) {
return status == storeStatusAvailable, nil
}

// IsStoreHealthy returns whether we believe this store can serve requests
// reliably. A healthy store can be used for follower snapshot transmission or
// follower reads. A healthy store does not imply that replicas can be moved to
// this store.
func (sp *StorePool) IsStoreHealthy(storeID roachpb.StoreID) bool {
status, err := sp.storeStatus(storeID, sp.NodeLivenessFn)
if err != nil {
return false
}
switch status {
case storeStatusAvailable, storeStatusDecommissioning, storeStatusDraining:
return true
default:
return false
}
}

func (sp *StorePool) storeStatus(
storeID roachpb.StoreID, nl NodeLivenessFunc,
) (storeStatus, error) {
Expand Down Expand Up @@ -1235,6 +1252,24 @@ func (sp *StorePool) GossipNodeIDAddress(nodeID roachpb.NodeID) (*util.Unresolve
return sp.gossip.GetNodeIDAddress(nodeID)
}

// GetLocalitiesPerReplica computes the localities for the provided replicas.
// It returns a map from the ReplicaDescriptor to the Locality of the Node.
func (sp *StorePool) GetLocalitiesPerReplica(
replicas ...roachpb.ReplicaDescriptor,
) map[roachpb.ReplicaID]roachpb.Locality {
sp.localitiesMu.RLock()
defer sp.localitiesMu.RUnlock()
localities := make(map[roachpb.ReplicaID]roachpb.Locality)
for _, replica := range replicas {
if locality, ok := sp.localitiesMu.nodeLocalities[replica.NodeID]; ok {
localities[replica.ReplicaID] = locality.locality
} else {
localities[replica.ReplicaID] = roachpb.Locality{}
}
}
return localities
}

// GetNodeLocalityString returns the locality information for the given node
// in its string format.
func (sp *StorePool) GetNodeLocalityString(nodeID roachpb.NodeID) string {
Expand Down
Loading

0 comments on commit 2f8de54

Please sign in to comment.