From 2f8de54187b8ef05c75bf3f9d60609e21382933d Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Wed, 28 Sep 2022 09:12:23 -0400 Subject: [PATCH] kvserver: delegate snapshots to followers Fixes: #42491 This commit allows a snapshot to be sent by a follower instead of the leader of a range. The follower(s) are chosen based on locality to the final recipient of the snapshot. If the follower is not able to quickly send the snapshot, the attempt is aborted and the leader sends the snapshot instead. By choosing a delegate rather than sending the snapshot directly, WAN traffic can be minimized. Additionally the snapshot will likely be delivered faster. There are two settings that control this feature. The first, `kv.snapshot_delegation.num_follower`, controls how many followers the snapshot is attempted to be delegated through. If set to 0, then snapshot delegation is disabled. The second, `kv.snapshot_delegation_queue.enabled`, controls whether delegated snapshots will queue on the delegate or return failure immediately. This is useful to prevent a delegation request from spending a long time waiting before it is sent. Before the snapshot is sent from the follower checks are done to verify that the delegate is able to send a snapshot that will be valid for the recipient. If not the request is rerouted to the leader. Release note (performance improvement): Adds delegated snapshots which can reduce WAN traffic for snapshot movement. --- docs/generated/settings/settings.html | 3 +- pkg/cmd/roachtest/tests/decommissionbench.go | 91 ++-- .../allocator/storepool/store_pool.go | 35 ++ pkg/kv/kvserver/client_raft_helpers_test.go | 63 ++- pkg/kv/kvserver/client_raft_test.go | 6 +- pkg/kv/kvserver/helpers_test.go | 2 +- pkg/kv/kvserver/kvserverpb/raft.proto | 27 +- pkg/kv/kvserver/multiqueue/BUILD.bazel | 4 +- pkg/kv/kvserver/multiqueue/multi_queue.go | 44 +- .../kvserver/multiqueue/multi_queue_test.go | 143 ++++-- pkg/kv/kvserver/raft_snapshot_queue.go | 2 +- pkg/kv/kvserver/raft_transport.go | 59 +-- pkg/kv/kvserver/raft_transport_test.go | 8 +- pkg/kv/kvserver/replica_command.go | 465 +++++++++++++----- pkg/kv/kvserver/replica_learner_test.go | 204 +++++++- pkg/kv/kvserver/storage_services.proto | 12 +- pkg/kv/kvserver/store_raft.go | 39 +- pkg/kv/kvserver/store_snapshot.go | 116 +---- pkg/kv/kvserver/store_test.go | 50 +- pkg/kv/kvserver/testing_knobs.go | 5 +- 20 files changed, 970 insertions(+), 408 deletions(-) diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 8da607270ef1..2e4eb22d12a5 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -60,7 +60,8 @@
kv.replica_circuit_breaker.slow_replication_threshold
duration1m0sduration after which slow proposals trip the per-Replica circuit breaker (zero duration disables breakers)
kv.replica_stats.addsst_request_size_factor
integer50000the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1
kv.replication_reports.interval
duration1m0sthe frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable) -
kv.snapshot_delegation.enabled
booleanfalseset to true to allow snapshots from follower replicas +
kv.snapshot_delegation.num_follower
integer1the number of delegates to try when sending snapshots, before falling back to sending from the leaseholder +
kv.snapshot_delegation.num_requests
integer3how many queued requests are allowed on a delegate before the request is rejected
kv.snapshot_rebalance.max_rate
byte size32 MiBthe rate limit (bytes/sec) to use for rebalance and upreplication snapshots
kv.snapshot_recovery.max_rate
byte size32 MiBthe rate limit (bytes/sec) to use for recovery snapshots
kv.transaction.max_intents_bytes
integer4194304maximum number of bytes used to track locks in transactions diff --git a/pkg/cmd/roachtest/tests/decommissionbench.go b/pkg/cmd/roachtest/tests/decommissionbench.go index 35c0c030146a..0c7ca78704af 100644 --- a/pkg/cmd/roachtest/tests/decommissionbench.go +++ b/pkg/cmd/roachtest/tests/decommissionbench.go @@ -61,13 +61,15 @@ const ( type decommissionBenchSpec struct { nodes int - cpus int warehouses int - load bool + noLoad bool multistore bool snapshotRate int duration time.Duration + // Whether the test cluster nodes are in multiple regions. + multiregion bool + // When true, the test will attempt to stop the node prior to decommission. whileDown bool @@ -85,6 +87,10 @@ type decommissionBenchSpec struct { // An override for the default timeout, if needed. timeout time.Duration + // An override for the decommission node to make it choose a predictable node + // instead of a random node. + decommissionNode int + skip string } @@ -95,29 +101,21 @@ func registerDecommissionBench(r registry.Registry) { // Basic benchmark configurations, to be run nightly. { nodes: 4, - cpus: 16, warehouses: 1000, - load: true, }, { nodes: 4, - cpus: 16, warehouses: 1000, - load: true, duration: 1 * time.Hour, }, { nodes: 4, - cpus: 16, warehouses: 1000, - load: true, whileDown: true, }, { nodes: 8, - cpus: 16, warehouses: 3000, - load: true, // This test can take nearly an hour to import and achieve balance, so // we extend the timeout to let it complete. timeout: 4 * time.Hour, @@ -126,9 +124,7 @@ func registerDecommissionBench(r registry.Registry) { { // Add a new node during decommission (no drain). nodes: 8, - cpus: 16, warehouses: 3000, - load: true, whileUpreplicating: true, // This test can take nearly an hour to import and achieve balance, so // we extend the timeout to let it complete. @@ -138,9 +134,7 @@ func registerDecommissionBench(r registry.Registry) { { // Drain before decommission, without adding a new node. nodes: 8, - cpus: 16, warehouses: 3000, - load: true, drainFirst: true, // This test can take nearly an hour to import and achieve balance, so // we extend the timeout to let it complete. @@ -150,9 +144,7 @@ func registerDecommissionBench(r registry.Registry) { { // Drain before decommission, and add a new node. nodes: 8, - cpus: 16, warehouses: 3000, - load: true, whileUpreplicating: true, drainFirst: true, // This test can take nearly an hour to import and achieve balance, so @@ -162,25 +154,19 @@ func registerDecommissionBench(r registry.Registry) { }, { nodes: 4, - cpus: 16, warehouses: 1000, - load: true, drainFirst: true, skip: manualBenchmarkingOnly, }, { nodes: 4, - cpus: 16, warehouses: 1000, - load: true, slowWrites: true, skip: manualBenchmarkingOnly, }, { nodes: 8, - cpus: 16, warehouses: 3000, - load: true, slowWrites: true, // This test can take nearly an hour to import and achieve balance, so // we extend the timeout to let it complete. @@ -189,9 +175,7 @@ func registerDecommissionBench(r registry.Registry) { }, { nodes: 12, - cpus: 16, warehouses: 3000, - load: true, multistore: true, // This test can take nearly an hour to import and achieve balance, so // we extend the timeout to let it complete. @@ -201,14 +185,30 @@ func registerDecommissionBench(r registry.Registry) { { // Test to compare 12 4-store nodes vs 48 single-store nodes nodes: 48, - cpus: 16, warehouses: 3000, - load: true, // This test can take nearly an hour to import and achieve balance, so // we extend the timeout to let it complete. timeout: 3 * time.Hour, skip: manualBenchmarkingOnly, }, + { + // Multiregion decommission, and add a new node in the same region. + nodes: 6, + warehouses: 1000, + whileUpreplicating: true, + drainFirst: true, + multiregion: true, + decommissionNode: 2, + }, + { + // Multiregion decommission, and add a new node in a different region. + nodes: 6, + warehouses: 1000, + whileUpreplicating: true, + drainFirst: true, + multiregion: true, + decommissionNode: 3, + }, } { registerDecommissionBenchSpec(r, benchSpec) } @@ -223,7 +223,7 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe } extraNameParts := []string{""} addlNodeCount := 0 - specOptions := []spec.Option{spec.CPU(benchSpec.cpus)} + specOptions := []spec.Option{spec.CPU(16)} if benchSpec.snapshotRate != 0 { extraNameParts = append(extraNameParts, @@ -251,7 +251,7 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe extraNameParts = append(extraNameParts, "while-upreplicating") } - if !benchSpec.load { + if benchSpec.noLoad { extraNameParts = append(extraNameParts, "no-load") } @@ -259,11 +259,23 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe extraNameParts = append(extraNameParts, "hi-read-amp") } + if benchSpec.decommissionNode != 0 { + extraNameParts = append(extraNameParts, + fmt.Sprintf("target=%d", benchSpec.decommissionNode)) + } + if benchSpec.duration > 0 { timeout = benchSpec.duration * 3 extraNameParts = append(extraNameParts, fmt.Sprintf("duration=%s", benchSpec.duration)) } + if benchSpec.multiregion { + geoZones := []string{regionUsEast, regionUsWest, regionUsCentral} + specOptions = append(specOptions, spec.Zones(strings.Join(geoZones, ","))) + specOptions = append(specOptions, spec.Geo()) + extraNameParts = append(extraNameParts, "multi-region") + } + // If run with ROACHTEST_DECOMMISSION_NOSKIP=1, roachtest will enable all specs. noSkipFlag := os.Getenv(envDecommissionNoSkipFlag) if noSkipFlag != "" { @@ -273,8 +285,8 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe extraName := strings.Join(extraNameParts, "/") r.Add(registry.TestSpec{ - Name: fmt.Sprintf("decommissionBench/nodes=%d/cpu=%d/warehouses=%d%s", - benchSpec.nodes, benchSpec.cpus, benchSpec.warehouses, extraName), + Name: fmt.Sprintf("decommissionBench/nodes=%d/warehouses=%d%s", + benchSpec.nodes, benchSpec.warehouses, extraName), Owner: registry.OwnerKV, Cluster: r.MakeClusterSpec( benchSpec.nodes+addlNodeCount+1, @@ -471,7 +483,7 @@ func uploadPerfArtifacts( // Get the workload perf artifacts and move them to the pinned node, so that // they can be used to display the workload operation rates during decommission. - if benchSpec.load { + if !benchSpec.noLoad { workloadStatsSrc := filepath.Join(t.PerfArtifactsDir(), "stats.json") localWorkloadStatsPath := filepath.Join(t.ArtifactsDir(), "workload_stats.json") workloadStatsDest := filepath.Join(t.PerfArtifactsDir(), "workload_stats.json") @@ -595,7 +607,7 @@ func runDecommissionBench( workloadCtx, workloadCancel := context.WithCancel(ctx) m := c.NewMonitor(workloadCtx, crdbNodes) - if benchSpec.load { + if !benchSpec.noLoad { m.Go( func(ctx context.Context) error { close(rampStarted) @@ -642,7 +654,7 @@ func runDecommissionBench( // If we are running a workload, wait until it has started and completed its // ramp time before initiating a decommission. - if benchSpec.load { + if !benchSpec.noLoad { <-rampStarted t.Status("Waiting for workload to ramp up...") select { @@ -666,7 +678,7 @@ func runDecommissionBench( m.ExpectDeath() defer m.ResetDeaths() - err := runSingleDecommission(ctx, h, pinnedNode, &targetNodeAtomic, benchSpec.snapshotRate, + err := runSingleDecommission(ctx, h, pinnedNode, benchSpec.decommissionNode, &targetNodeAtomic, benchSpec.snapshotRate, benchSpec.whileDown, benchSpec.drainFirst, false /* reuse */, benchSpec.whileUpreplicating, true /* estimateDuration */, benchSpec.slowWrites, tickByName, ) @@ -729,7 +741,7 @@ func runDecommissionBenchLong( workloadCtx, workloadCancel := context.WithCancel(ctx) m := c.NewMonitor(workloadCtx, crdbNodes) - if benchSpec.load { + if !benchSpec.noLoad { m.Go( func(ctx context.Context) error { close(rampStarted) @@ -773,7 +785,7 @@ func runDecommissionBenchLong( // If we are running a workload, wait until it has started and completed its // ramp time before initiating a decommission. - if benchSpec.load { + if !benchSpec.noLoad { <-rampStarted t.Status("Waiting for workload to ramp up...") select { @@ -786,7 +798,7 @@ func runDecommissionBenchLong( for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= benchSpec.duration; { m.ExpectDeath() - err := runSingleDecommission(ctx, h, pinnedNode, &targetNodeAtomic, benchSpec.snapshotRate, + err := runSingleDecommission(ctx, h, pinnedNode, benchSpec.decommissionNode, &targetNodeAtomic, benchSpec.snapshotRate, benchSpec.whileDown, benchSpec.drainFirst, true /* reuse */, benchSpec.whileUpreplicating, true /* estimateDuration */, benchSpec.slowWrites, tickByName, ) @@ -827,12 +839,15 @@ func runSingleDecommission( ctx context.Context, h *decommTestHelper, pinnedNode int, + target int, targetLogicalNodeAtomic *uint32, snapshotRateMb int, stopFirst, drainFirst, reuse, noBalanceWait, estimateDuration, slowWrites bool, tickByName func(name string), ) error { - target := h.getRandNodeOtherThan(pinnedNode) + if target == 0 { + target = h.getRandNodeOtherThan(pinnedNode) + } targetLogicalNodeID, err := h.getLogicalNodeID(ctx, target) if err != nil { return err diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool.go b/pkg/kv/kvserver/allocator/storepool/store_pool.go index 2f693cbb92f0..bc1459ee8b1b 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool.go @@ -825,6 +825,23 @@ func (sp *StorePool) IsLive(storeID roachpb.StoreID) (bool, error) { return status == storeStatusAvailable, nil } +// IsStoreHealthy returns whether we believe this store can serve requests +// reliably. A healthy store can be used for follower snapshot transmission or +// follower reads. A healthy store does not imply that replicas can be moved to +// this store. +func (sp *StorePool) IsStoreHealthy(storeID roachpb.StoreID) bool { + status, err := sp.storeStatus(storeID, sp.NodeLivenessFn) + if err != nil { + return false + } + switch status { + case storeStatusAvailable, storeStatusDecommissioning, storeStatusDraining: + return true + default: + return false + } +} + func (sp *StorePool) storeStatus( storeID roachpb.StoreID, nl NodeLivenessFunc, ) (storeStatus, error) { @@ -1235,6 +1252,24 @@ func (sp *StorePool) GossipNodeIDAddress(nodeID roachpb.NodeID) (*util.Unresolve return sp.gossip.GetNodeIDAddress(nodeID) } +// GetLocalitiesPerReplica computes the localities for the provided replicas. +// It returns a map from the ReplicaDescriptor to the Locality of the Node. +func (sp *StorePool) GetLocalitiesPerReplica( + replicas ...roachpb.ReplicaDescriptor, +) map[roachpb.ReplicaID]roachpb.Locality { + sp.localitiesMu.RLock() + defer sp.localitiesMu.RUnlock() + localities := make(map[roachpb.ReplicaID]roachpb.Locality) + for _, replica := range replicas { + if locality, ok := sp.localitiesMu.nodeLocalities[replica.NodeID]; ok { + localities[replica.ReplicaID] = locality.locality + } else { + localities[replica.ReplicaID] = roachpb.Locality{} + } + } + return localities +} + // GetNodeLocalityString returns the locality information for the given node // in its string format. func (sp *StorePool) GetNodeLocalityString(nodeID roachpb.NodeID) string { diff --git a/pkg/kv/kvserver/client_raft_helpers_test.go b/pkg/kv/kvserver/client_raft_helpers_test.go index 84a1fff66b2d..4432598fe2a6 100644 --- a/pkg/kv/kvserver/client_raft_helpers_test.go +++ b/pkg/kv/kvserver/client_raft_helpers_test.go @@ -31,8 +31,9 @@ type unreliableRaftHandlerFuncs struct { dropReq func(*kvserverpb.RaftMessageRequest) bool dropHB func(*kvserverpb.RaftHeartbeat) bool dropResp func(*kvserverpb.RaftMessageResponse) bool - // snapErr defaults to returning nil. - snapErr func(*kvserverpb.SnapshotRequest_Header) error + // snapErr and delegateErr default to returning nil. + snapErr func(*kvserverpb.SnapshotRequest_Header) error + delegateErr func(request *kvserverpb.DelegateSnapshotRequest) error } func noopRaftHandlerFuncs() unreliableRaftHandlerFuncs { @@ -134,6 +135,17 @@ func (h *unreliableRaftHandler) HandleSnapshot( return h.RaftMessageHandler.HandleSnapshot(ctx, header, respStream) } +func (h *unreliableRaftHandler) HandleDelegatedSnapshot( + ctx context.Context, req *kvserverpb.DelegateSnapshotRequest, +) (*kvserverpb.DelegateSnapshotResponse, error) { + if req.RangeID == h.rangeID && h.delegateErr != nil { + if err := h.delegateErr(req); err != nil { + return nil, err + } + } + return h.RaftMessageHandler.HandleDelegatedSnapshot(ctx, req) +} + // testClusterStoreRaftMessageHandler exists to allows a store to be stopped and // restarted while maintaining a partition using an unreliableRaftHandler. type testClusterStoreRaftMessageHandler struct { @@ -181,15 +193,13 @@ func (h *testClusterStoreRaftMessageHandler) HandleSnapshot( } func (h *testClusterStoreRaftMessageHandler) HandleDelegatedSnapshot( - ctx context.Context, - req *kvserverpb.DelegateSnapshotRequest, - stream kvserver.DelegateSnapshotResponseStream, -) error { + ctx context.Context, req *kvserverpb.DelegateSnapshotRequest, +) (*kvserverpb.DelegateSnapshotResponse, error) { store, err := h.getStore() if err != nil { - return err + return nil, err } - return store.HandleDelegatedSnapshot(ctx, req, stream) + return store.HandleDelegatedSnapshot(ctx, req) } // testClusterPartitionedRange is a convenient abstraction to create a range on a node @@ -198,7 +208,7 @@ type testClusterPartitionedRange struct { rangeID roachpb.RangeID mu struct { syncutil.RWMutex - partitionedNode int + partitionedNodeIdx int partitioned bool partitionedReplicas map[roachpb.ReplicaID]bool } @@ -232,7 +242,7 @@ func setupPartitionedRange( tc *testcluster.TestCluster, rangeID roachpb.RangeID, replicaID roachpb.ReplicaID, - partitionedNode int, + partitionedNodeIdx int, activated bool, funcs unreliableRaftHandlerFuncs, ) (*testClusterPartitionedRange, error) { @@ -243,14 +253,14 @@ func setupPartitionedRange( storeIdx: i, }) } - return setupPartitionedRangeWithHandlers(tc, rangeID, replicaID, partitionedNode, activated, handlers, funcs) + return setupPartitionedRangeWithHandlers(tc, rangeID, replicaID, partitionedNodeIdx, activated, handlers, funcs) } func setupPartitionedRangeWithHandlers( tc *testcluster.TestCluster, rangeID roachpb.RangeID, replicaID roachpb.ReplicaID, - partitionedNode int, + partitionedNodeIdx int, activated bool, handlers []kvserver.RaftMessageHandler, funcs unreliableRaftHandlerFuncs, @@ -260,9 +270,9 @@ func setupPartitionedRangeWithHandlers( handlers: make([]kvserver.RaftMessageHandler, 0, len(handlers)), } pr.mu.partitioned = activated - pr.mu.partitionedNode = partitionedNode + pr.mu.partitionedNodeIdx = partitionedNodeIdx if replicaID == 0 { - ts := tc.Servers[partitionedNode] + ts := tc.Servers[partitionedNodeIdx] store, err := ts.Stores().GetStore(ts.GetFirstStoreID()) if err != nil { return nil, err @@ -294,8 +304,8 @@ func setupPartitionedRangeWithHandlers( pr.mu.RLock() defer pr.mu.RUnlock() return pr.mu.partitioned && - (s == pr.mu.partitionedNode || - req.FromReplica.StoreID == roachpb.StoreID(pr.mu.partitionedNode)+1) + (s == pr.mu.partitionedNodeIdx || + req.FromReplica.StoreID == roachpb.StoreID(pr.mu.partitionedNodeIdx)+1) } } if h.dropHB == nil { @@ -305,12 +315,21 @@ func setupPartitionedRangeWithHandlers( if !pr.mu.partitioned { return false } - if s == partitionedNode { + if s == partitionedNodeIdx { return true } return pr.mu.partitionedReplicas[hb.FromReplicaID] } } + if h.dropResp == nil { + h.dropResp = func(resp *kvserverpb.RaftMessageResponse) bool { + pr.mu.RLock() + defer pr.mu.RUnlock() + return pr.mu.partitioned && + (s == pr.mu.partitionedNodeIdx || + resp.FromReplica.StoreID == roachpb.StoreID(pr.mu.partitionedNodeIdx)+1) + } + } if h.snapErr == nil { h.snapErr = func(header *kvserverpb.SnapshotRequest_Header) error { pr.mu.RLock() @@ -324,6 +343,16 @@ func setupPartitionedRangeWithHandlers( return nil } } + if h.delegateErr == nil { + h.delegateErr = func(resp *kvserverpb.DelegateSnapshotRequest) error { + pr.mu.RLock() + defer pr.mu.RUnlock() + if pr.mu.partitionedReplicas[resp.DelegatedSender.ReplicaID] { + return errors.New("partitioned") + } + return nil + } + } pr.handlers = append(pr.handlers, h) tc.Servers[s].RaftTransport().Listen(tc.Target(s).StoreID, h) } diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 13c5e159c997..32a93f74fb46 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -3573,10 +3573,8 @@ func (errorChannelTestHandler) HandleSnapshot( } func (errorChannelTestHandler) HandleDelegatedSnapshot( - ctx context.Context, - req *kvserverpb.DelegateSnapshotRequest, - stream kvserver.DelegateSnapshotResponseStream, -) error { + ctx context.Context, req *kvserverpb.DelegateSnapshotRequest, +) (*kvserverpb.DelegateSnapshotResponse, error) { panic("unimplemented") } diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 54f9c7b3a95e..cdd6cba1d2a0 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -203,7 +203,7 @@ func (s *Store) ManualRaftSnapshot(repl *Replica, target roachpb.ReplicaID) erro // ReservationCount counts the number of outstanding reservations that are not // running. func (s *Store) ReservationCount() int { - return int(s.cfg.SnapshotApplyLimit) - s.snapshotApplyQueue.Len() + return int(s.cfg.SnapshotApplyLimit) - s.snapshotApplyQueue.AvailableLen() } // RaftSchedulerPriorityID returns the Raft scheduler's prioritized range. diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index 4ec5d0288aed..3bab2ab829db 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -14,7 +14,6 @@ option go_package = "kvserverpb"; import "errorspb/errors.proto"; import "roachpb/errors.proto"; -import "roachpb/internal_raft.proto"; import "roachpb/metadata.proto"; import "kv/kvserver/liveness/livenesspb/liveness.proto"; import "kv/kvserver/kvserverpb/state.proto"; @@ -221,7 +220,7 @@ message SnapshotRequest { Header header = 1; - // A RocksDB BatchRepr. Multiple kv_batches may be sent across multiple request messages. + // A BatchRepr. Multiple kv_batches may be sent across multiple request messages. bytes kv_batch = 2 [(gogoproto.customname) = "KVBatch"]; bool final = 4; @@ -272,23 +271,31 @@ message DelegateSnapshotRequest { roachpb.ReplicaDescriptor delegated_sender = 4 [(gogoproto.nullable) = false]; // The priority of the snapshot. + // TODO(abaptist): Remove this field for v23.1. SnapshotRequest.Priority priority = 5; - // The sending queue's name. - SnapshotRequest.QueueName sender_queue_name = 9; - - // The sending queue's priority. - double sender_queue_priority = 10; - // The type of the snapshot. + // TODO(abaptist): Remove this field for v23.1. SnapshotRequest.Type type = 6; // The Raft term of the coordinator (in most cases the leaseholder) replica. // The term is used during snapshot receiving to reject messages from an older term. uint64 term = 7; - // The truncated state of the Raft log on the coordinator replica. - roachpb.RaftTruncatedState truncated_state = 8; + // The first index of the Raft log on the coordinator replica. + uint64 first_index = 8; + + // The sending queue's name. + SnapshotRequest.QueueName sender_queue_name = 9; + + // The sending queue's priority. + double sender_queue_priority = 10; + + // The generation of the leaseholders descriptor. + uint64 descriptor_generation = 11 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.RangeGeneration"]; + + // Max queue length on the delegate before this request is rejected. + int64 queue_on_delegate_len = 12; } message DelegateSnapshotResponse { diff --git a/pkg/kv/kvserver/multiqueue/BUILD.bazel b/pkg/kv/kvserver/multiqueue/BUILD.bazel index 5c7a1642b937..14dbae432c80 100644 --- a/pkg/kv/kvserver/multiqueue/BUILD.bazel +++ b/pkg/kv/kvserver/multiqueue/BUILD.bazel @@ -8,14 +8,16 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/util/syncutil", + "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_redact//:redact", ], ) go_test( name = "multiqueue_test", + size = "small", srcs = ["multi_queue_test.go"], - args = ["-test.timeout=295s"], + args = ["-test.timeout=55s"], embed = [":multiqueue"], deps = [ "//pkg/testutils", diff --git a/pkg/kv/kvserver/multiqueue/multi_queue.go b/pkg/kv/kvserver/multiqueue/multi_queue.go index 588c4556de46..8848612fc727 100644 --- a/pkg/kv/kvserver/multiqueue/multi_queue.go +++ b/pkg/kv/kvserver/multiqueue/multi_queue.go @@ -14,6 +14,7 @@ import ( "container/heap" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" ) @@ -145,10 +146,20 @@ func (m *MultiQueue) tryRunNextLocked() { // Add returns a Task that must be closed (calling m.Release(..)) to // release the Permit. The number of types is expected to // be relatively small and not be changing over time. -func (m *MultiQueue) Add(queueType int, priority float64) *Task { +func (m *MultiQueue) Add(queueType int, priority float64, maxQueueLength int64) (*Task, error) { m.mu.Lock() defer m.mu.Unlock() + // If there are remainingRuns we can run immediately, otherwise compute the + // queue length one we are added. If the queue is too long, return an error + // immediately so the caller doesn't have to wait. + if m.remainingRuns == 0 && maxQueueLength >= 0 { + currentLen := int64(m.queueLenLocked()) + if currentLen > maxQueueLength { + return nil, errors.Newf("queue is too long %d > %d", currentLen, maxQueueLength) + } + } + // The mutex starts locked, unlock it when we are ready to run. pos, ok := m.mapping[queueType] if !ok { @@ -169,7 +180,7 @@ func (m *MultiQueue) Add(queueType int, priority float64) *Task { // Once we are done adding a task, attempt to signal the next waiting task. m.tryRunNextLocked() - return &newTask + return &newTask, nil } // Cancel will cancel a Task that may not have started yet. This is useful if it @@ -221,11 +232,32 @@ func (m *MultiQueue) releaseLocked(permit *Permit) { m.tryRunNextLocked() } -// Len returns the number of additional tasks that can be added without -// queueing. This will return 0 if there is anything queued. This method should -// only be used for testing. -func (m *MultiQueue) Len() int { +// AvailableLen returns the number of additional tasks that can be added without +// queueing. This will return 0 if there is anything queued. +func (m *MultiQueue) AvailableLen() int { m.mu.Lock() defer m.mu.Unlock() return m.remainingRuns } + +// QueueLen returns the length of the queue if one more task is added. If this +// returns 0 then a task can be added and run without queueing. +// NB: The value returned is not a guarantee that queueing will not occur when +// the request is submitted. multiple calls to QueueLen could race. +func (m *MultiQueue) QueueLen() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.queueLenLocked() +} + +func (m *MultiQueue) queueLenLocked() int { + if m.remainingRuns > 0 { + return 0 + } + // Start counting from 1 since we will be the first in the queue if it gets added. + count := 1 + for i := 0; i < len(m.outstanding); i++ { + count += len(m.outstanding[i]) + } + return count - m.remainingRuns +} diff --git a/pkg/kv/kvserver/multiqueue/multi_queue_test.go b/pkg/kv/kvserver/multiqueue/multi_queue_test.go index 8e693a1e210f..4c50f4ac2753 100644 --- a/pkg/kv/kvserver/multiqueue/multi_queue_test.go +++ b/pkg/kv/kvserver/multiqueue/multi_queue_test.go @@ -37,10 +37,10 @@ func TestMultiQueueAddTwiceSameQueue(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) queue := NewMultiQueue(1) - blocker := queue.Add(0, 0) + blocker, _ := queue.Add(0, 0, -1) - chan1 := queue.Add(7, 1.0) - chan2 := queue.Add(7, 2.0) + chan1, _ := queue.Add(7, 1.0, -1) + chan2, _ := queue.Add(7, 2.0, -1) permit := <-blocker.GetWaitChan() queue.Release(permit) @@ -56,13 +56,13 @@ func TestMultiQueueTwoQueues(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) queue := NewMultiQueue(1) - blocker := queue.Add(0, 0) + blocker, _ := queue.Add(0, 0, -1) - a1 := queue.Add(5, 4.0) - a2 := queue.Add(5, 5.0) + a1, _ := queue.Add(5, 4.0, -1) + a2, _ := queue.Add(5, 5.0, -1) - b1 := queue.Add(6, 1.0) - b2 := queue.Add(6, 2.0) + b1, _ := queue.Add(6, 1.0, -1) + b2, _ := queue.Add(6, 2.0, -1) permit := <-blocker.GetWaitChan() queue.Release(permit) @@ -79,15 +79,15 @@ func TestMultiQueueComplex(t *testing.T) { defer log.Scope(t).Close(t) queue := NewMultiQueue(1) - blocker := queue.Add(0, 0) + blocker, _ := queue.Add(0, 0, -1) - a2 := queue.Add(1, 4.0) - b1 := queue.Add(2, 1.1) - b2 := queue.Add(2, 2.1) - c2 := queue.Add(3, 1.2) - c3 := queue.Add(3, 2.2) - a3 := queue.Add(1, 5.0) - b3 := queue.Add(2, 6.1) + a2, _ := queue.Add(1, 4.0, -1) + b1, _ := queue.Add(2, 1.1, -1) + b2, _ := queue.Add(2, 2.1, -1) + c2, _ := queue.Add(3, 1.2, -1) + c3, _ := queue.Add(3, 2.2, -1) + a3, _ := queue.Add(1, 5.0, -1) + b3, _ := queue.Add(2, 6.1, -1) permit := <-blocker.GetWaitChan() queue.Release(permit) @@ -99,15 +99,15 @@ func TestMultiQueueRemove(t *testing.T) { defer log.Scope(t).Close(t) queue := NewMultiQueue(1) - blocker := queue.Add(0, 0) + blocker, _ := queue.Add(0, 0, -1) - a2 := queue.Add(1, 4.0) - b1 := queue.Add(2, 1.1) - b2 := queue.Add(2, 2.1) - c2 := queue.Add(3, 1.2) - c3 := queue.Add(3, 2.2) - a3 := queue.Add(1, 5.0) - b3 := queue.Add(2, 6.1) + a2, _ := queue.Add(1, 4.0, -1) + b1, _ := queue.Add(2, 1.1, -1) + b2, _ := queue.Add(2, 2.1, -1) + c2, _ := queue.Add(3, 1.2, -1) + c3, _ := queue.Add(3, 2.2, -1) + a3, _ := queue.Add(1, 5.0, -1) + b3, _ := queue.Add(2, 6.1, -1) fmt.Println("Beginning cancel") @@ -125,7 +125,7 @@ func TestMultiQueueCancelOne(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) queue := NewMultiQueue(1) - task := queue.Add(1, 1) + task, _ := queue.Add(1, 1, -1) queue.Cancel(task) } @@ -139,12 +139,12 @@ func TestMultiQueueCancelInProgress(t *testing.T) { const b = 2 const c = 3 - a3 := queue.Add(a, 5.0) - a2 := queue.Add(a, 4.0) - b1 := queue.Add(b, 1.1) - b2 := queue.Add(b, 2.1) - c3 := queue.Add(c, 2.2) - b3 := queue.Add(b, 6.1) + a3, _ := queue.Add(a, 5.0, -1) + a2, _ := queue.Add(a, 4.0, -1) + b1, _ := queue.Add(b, 1.1, -1) + b2, _ := queue.Add(b, 2.1, -1) + c3, _ := queue.Add(c, 2.2, -1) + b3, _ := queue.Add(b, 6.1, -1) queue.Cancel(b2) queue.Cancel(b1) @@ -237,7 +237,7 @@ func TestMultiQueueStress(t *testing.T) { for i := 0; i < numThreads; i++ { go func(name int) { for j := 0; j < numRequests; j++ { - curTask := queue.Add(name, float64(j)) + curTask, _ := queue.Add(name, float64(j), -1) if alsoCancel && j%99 == 0 { queue.Cancel(curTask) } else { @@ -271,7 +271,7 @@ func TestMultiQueueReleaseTwice(t *testing.T) { queue := NewMultiQueue(1) - task := queue.Add(1, 1) + task, _ := queue.Add(1, 1, -1) p := <-task.GetWaitChan() queue.Release(p) require.Panics(t, func() { queue.Release(p) }) @@ -283,7 +283,7 @@ func TestMultiQueueReleaseAfterCancel(t *testing.T) { queue := NewMultiQueue(1) - task := queue.Add(1, 1) + task, _ := queue.Add(1, 1, -1) p := <-task.GetWaitChan() queue.Cancel(task) queue.Release(p) @@ -294,31 +294,84 @@ func TestMultiQueueLen(t *testing.T) { defer log.Scope(t).Close(t) queue := NewMultiQueue(2) - require.Equal(t, 2, queue.Len()) - - task1 := queue.Add(1, 1) - require.Equal(t, 1, queue.Len()) - task2 := queue.Add(1, 1) - require.Equal(t, 0, queue.Len()) - task3 := queue.Add(1, 1) - require.Equal(t, 0, queue.Len()) + require.Equal(t, 2, queue.AvailableLen()) + require.Equal(t, 0, queue.QueueLen()) + + task1, _ := queue.Add(1, 1, -1) + require.Equal(t, 1, queue.AvailableLen()) + require.Equal(t, 0, queue.QueueLen()) + task2, _ := queue.Add(1, 1, -1) + require.Equal(t, 0, queue.AvailableLen()) + require.Equal(t, 1, queue.QueueLen()) + task3, _ := queue.Add(1, 1, -1) + require.Equal(t, 0, queue.AvailableLen()) + require.Equal(t, 2, queue.QueueLen()) queue.Cancel(task1) // Finish task 1, but immediately start task3. - require.Equal(t, 0, queue.Len()) + require.Equal(t, 0, queue.AvailableLen()) + require.Equal(t, 1, queue.QueueLen()) p := <-task2.GetWaitChan() queue.Release(p) - require.Equal(t, 1, queue.Len()) + require.Equal(t, 1, queue.AvailableLen()) + require.Equal(t, 0, queue.QueueLen()) queue.Cancel(task3) - require.Equal(t, 2, queue.Len()) + require.Equal(t, 2, queue.AvailableLen()) + require.Equal(t, 0, queue.QueueLen()) +} + +func TestMultiQueueFull(t *testing.T) { + queue := NewMultiQueue(2) + require.Equal(t, 2, queue.AvailableLen()) + require.Equal(t, 0, queue.QueueLen()) + + // Task 1 starts immediately since there is no queue. + task1, err := queue.Add(1, 1, 0) + require.NoError(t, err) + require.Equal(t, 1, queue.AvailableLen()) + require.Equal(t, 0, queue.QueueLen()) + // Task 2 also starts immediately as the queue supports 2 concurrent. + task2, err := queue.Add(1, 1, 0) + require.NoError(t, err) + require.Equal(t, 0, queue.AvailableLen()) + require.Equal(t, 1, queue.QueueLen()) + // Task 3 would be queued so should not be added. + task3, err := queue.Add(1, 1, 0) + require.Error(t, err) + require.Nil(t, task3) + require.Equal(t, 0, queue.AvailableLen()) + require.Equal(t, 1, queue.QueueLen()) + // Task 4 uses a longer max queue length so should be added. + task4, err := queue.Add(1, 1, 1) + require.NoError(t, err) + require.Equal(t, 0, queue.AvailableLen()) + require.Equal(t, 2, queue.QueueLen()) + + queue.Cancel(task1) + queue.Cancel(task2) + require.Equal(t, 1, queue.AvailableLen()) + require.Equal(t, 0, queue.QueueLen()) + // After these tasks are done, make sure we can add another one. + task5, err := queue.Add(1, 1, 0) + require.NoError(t, err) + require.Equal(t, 0, queue.AvailableLen()) + require.Equal(t, 1, queue.QueueLen()) + + // Cancel all the remaining tasks. + queue.Cancel(task4) + queue.Cancel(task5) } // verifyOrder makes sure that the chans are called in the specified order. func verifyOrder(t *testing.T, queue *MultiQueue, tasks ...*Task) { // each time, verify that the only available channel is the "next" one in order for i, task := range tasks { + if task == nil { + require.Fail(t, "Task is nil", "%d", task) + } + var found *Permit for j, t2 := range tasks[i+1:] { select { diff --git a/pkg/kv/kvserver/raft_snapshot_queue.go b/pkg/kv/kvserver/raft_snapshot_queue.go index 0ba36ab6c487..f8dcdeac6549 100644 --- a/pkg/kv/kvserver/raft_snapshot_queue.go +++ b/pkg/kv/kvserver/raft_snapshot_queue.go @@ -142,7 +142,7 @@ func (rq *raftSnapshotQueue) processRaftSnapshot( } } - err := repl.sendSnapshot(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY, kvserverpb.SnapshotRequest_RAFT_SNAPSHOT_QUEUE, raftSnapshotPriority) + err := repl.sendSnapshotUsingDelegate(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY, kvserverpb.SnapshotRequest_RAFT_SNAPSHOT_QUEUE, raftSnapshotPriority) // NB: if the snapshot fails because of an overlapping replica on the // recipient which is also waiting for a snapshot, the "smart" thing is to diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 283643e551c1..818ab128d369 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -98,13 +98,6 @@ type SnapshotResponseStream interface { Recv() (*kvserverpb.SnapshotRequest, error) } -// DelegateSnapshotResponseStream is the subset of the -// MultiRaft_RaftSnapshotServer interface that is needed for sending delegated responses. -type DelegateSnapshotResponseStream interface { - Send(request *kvserverpb.DelegateSnapshotResponse) error - Recv() (*kvserverpb.DelegateSnapshotRequest, error) -} - // RaftMessageHandler is the interface that must be implemented by // arguments to RaftTransport.Listen. type RaftMessageHandler interface { @@ -133,8 +126,7 @@ type RaftMessageHandler interface { HandleDelegatedSnapshot( ctx context.Context, req *kvserverpb.DelegateSnapshotRequest, - stream DelegateSnapshotResponseStream, - ) error + ) (*kvserverpb.DelegateSnapshotResponse, error) } // RaftTransport handles the rpc messages for raft. @@ -320,21 +312,12 @@ func (t *RaftTransport) RaftMessageBatch(stream MultiRaft_RaftMessageBatchServer // DelegateRaftSnapshot handles incoming delegated snapshot requests and passes // the request to pass off to the sender store. Errors during the snapshots // process are sent back as a response. -func (t *RaftTransport) DelegateRaftSnapshot(stream MultiRaft_DelegateRaftSnapshotServer) error { - ctx, cancel := t.stopper.WithCancelOnQuiesce(stream.Context()) - defer cancel() - req, err := stream.Recv() - if err != nil { - return err - } - // Check to ensure the header is valid. +func (t *RaftTransport) DelegateRaftSnapshot( + ctx context.Context, req *kvserverpb.DelegateSnapshotRequest, +) (*kvserverpb.DelegateSnapshotResponse, error) { if req == nil { err := errors.New("client error: no message in first delegated snapshot request") - return stream.Send( - &kvserverpb.DelegateSnapshotResponse{ - SnapResponse: snapRespErr(err), - }, - ) + return &kvserverpb.DelegateSnapshotResponse{SnapResponse: snapRespErr(err)}, nil } // Get the handler of the sender store. handler, ok := t.getHandler(req.DelegatedSender.StoreID) @@ -346,11 +329,12 @@ func (t *RaftTransport) DelegateRaftSnapshot(stream MultiRaft_DelegateRaftSnapsh req.CoordinatorReplica.StoreID, req.DelegatedSender.StoreID, ) - return roachpb.NewStoreNotFoundError(req.DelegatedSender.StoreID) + err := errors.New("unable to accept Raft message: no handler registered for the sender store") + return &kvserverpb.DelegateSnapshotResponse{SnapResponse: snapRespErr(err)}, nil } // Pass off the snapshot request to the sender store. - return handler.HandleDelegatedSnapshot(ctx, req, stream) + return handler.HandleDelegatedSnapshot(ctx, req) } // RaftSnapshot handles incoming streaming snapshot requests. @@ -640,8 +624,8 @@ func (t *RaftTransport) SendSnapshot( return sendSnapshot(ctx, t.st, t.tracer, stream, storePool, header, snap, newBatch, sent, recordBytesSent) } -// DelegateSnapshot creates a rpc stream between the leaseholder and the -// new designated sender for delegated snapshot requests. +// DelegateSnapshot sends a DelegateSnapshotRequest to a remote store +// and determines if it encountered any errors when sending the snapshot. func (t *RaftTransport) DelegateSnapshot( ctx context.Context, req *kvserverpb.DelegateSnapshotRequest, ) error { @@ -653,14 +637,25 @@ func (t *RaftTransport) DelegateSnapshot( client := NewMultiRaftClient(conn) // Creates a rpc stream between the leaseholder and sender. - stream, err := client.DelegateRaftSnapshot(ctx) + resp, err := client.DelegateRaftSnapshot(ctx, req) if err != nil { return errors.Mark(err, errMarkSnapshotError) } - defer func() { - if err := stream.CloseSend(); err != nil { - log.Warningf(ctx, "failed to close snapshot stream: %+v", err) + + // Import the remotely collected spans, if any. + if len(resp.CollectedSpans) != 0 { + span := tracing.SpanFromContext(ctx) + if span == nil { + log.Warningf(ctx, "trying to ingest remote spans but there is no recording span set up") + } else { + span.ImportRemoteRecording(resp.CollectedSpans) } - }() - return delegateSnapshot(ctx, stream, req) + } + // On a delegated request, we only see the fully applied result. + if resp.SnapResponse.Status != kvserverpb.SnapshotResponse_APPLIED { + log.VEventf(ctx, 2, "%s", resp.SnapResponse) + return errors.Mark(errors.Errorf("%s: error: %s", req.DelegatedSender, resp.SnapResponse), errMarkSnapshotError) + } + log.VEventf(ctx, 2, "%s: delegated snapshot was successfully applied", req.DelegatedSender) + return nil } diff --git a/pkg/kv/kvserver/raft_transport_test.go b/pkg/kv/kvserver/raft_transport_test.go index 7cbb12d95177..638a87d9e0dd 100644 --- a/pkg/kv/kvserver/raft_transport_test.go +++ b/pkg/kv/kvserver/raft_transport_test.go @@ -97,11 +97,9 @@ func (s channelServer) HandleSnapshot( } func (s channelServer) HandleDelegatedSnapshot( - ctx context.Context, - req *kvserverpb.DelegateSnapshotRequest, - stream kvserver.DelegateSnapshotResponseStream, -) error { - panic("unimplemented") + ctx context.Context, req *kvserverpb.DelegateSnapshotRequest, +) (*kvserverpb.DelegateSnapshotResponse, error) { + panic("unexpected HandleDelegatedSnapshot") } // raftTransportTestContext contains objects needed to test RaftTransport. diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 7ef521c1993f..79331ecf23ce 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -1824,7 +1824,7 @@ func (r *Replica) initializeRaftLearners( // orphaned learner. Second, this tickled some bugs in etcd/raft around // switching between StateSnapshot and StateProbe. Even if we worked through // these, it would be susceptible to future similar issues. - if err := r.sendSnapshot( + if err := r.sendSnapshotUsingDelegate( ctx, rDesc, kvserverpb.SnapshotRequest_INITIAL, priority, senderName, senderQueuePriority, ); err != nil { return nil, err @@ -2517,34 +2517,145 @@ func recordRangeEventsInLog( return nil } -// getSenderReplica returns a replica descriptor for a follower replica to act as -// the sender for snapshots. -// TODO(amy): select a follower based on locality matching. -func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescriptor, error) { - log.Fatal(ctx, "follower snapshots not implemented") - return r.GetReplicaDescriptor() +// getSenderReplicas returns an ordered list of replica descriptor for a +// follower replica to act as the sender for delegated snapshots. The replicas +// should be tried in order, and typically the coordinator is the last entry on +// the list. +func (r *Replica) getSenderReplicas( + ctx context.Context, recipient roachpb.ReplicaDescriptor, +) ([]roachpb.ReplicaDescriptor, error) { + + coordinator, err := r.GetReplicaDescriptor() + if err != nil { + // If there is no local replica descriptor, return an empty list. + return nil, err + } + + // Unless all nodes are on V23.1, don't delegate. This prevents sending to a + // node that doesn't understand the request. + if !r.store.ClusterSettings().Version.IsActive(ctx, clusterversion.V23_1) { + return []roachpb.ReplicaDescriptor{coordinator}, nil + } + + // Check follower snapshots, if zero just self-delegate. + numFollowers := int(NumDelegateLimit.Get(&r.ClusterSettings().SV)) + if numFollowers == 0 { + return []roachpb.ReplicaDescriptor{coordinator}, nil + } + + // Get range descriptor and store pool. + storePool := r.store.cfg.StorePool + rangeDesc := r.Desc() + + if fn := r.store.cfg.TestingKnobs.SelectDelegateSnapshotSender; fn != nil { + sender := fn(rangeDesc) + // If a TestingKnob is specified use it whatever it is. + if sender != nil { + return sender, nil + } + } + + // Include voter and non-voter replicas on healthy stores as candidates. + nonRecipientReplicas := rangeDesc.Replicas().Filter( + func(rDesc roachpb.ReplicaDescriptor) bool { + return rDesc.ReplicaID != recipient.ReplicaID && storePool.IsStoreHealthy(rDesc.StoreID) + }, + ) + candidates := nonRecipientReplicas.VoterAndNonVoterDescriptors() + if len(candidates) == 0 { + // Not clear when the coordinator would be considered dead, but if it does + // happen, just return the coordinator. + return []roachpb.ReplicaDescriptor{coordinator}, nil + } + + // Get the localities of the candidate replicas, including the original sender. + localities := storePool.GetLocalitiesPerReplica(candidates...) + recipientLocality := storePool.GetLocalitiesPerReplica(recipient)[recipient.ReplicaID] + + // Construct a map from replica to its diversity score compared to the + // recipient. Also track the best score we see. + replicaDistance := make(map[roachpb.ReplicaID]float64, len(localities)) + closestStore := roachpb.MaxDiversityScore + for desc, locality := range localities { + score := recipientLocality.DiversityScore(locality) + if score < closestStore { + closestStore = score + } + replicaDistance[desc] = score + } + + // Find all replicas that tie as the most optimal sender other than the + // coordinator. The coordinator will always be added to the end of the list + // regardless of score. + var tiedReplicas []roachpb.ReplicaID + for desc, score := range replicaDistance { + if score == closestStore { + // If the coordinator is tied for closest, always use it. + // TODO(baptist): Consider using other replicas at the same distance once + // this is integrated with admission control. + if desc == coordinator.ReplicaID { + return []roachpb.ReplicaDescriptor{coordinator}, nil + } + tiedReplicas = append(tiedReplicas, desc) + } + } + + // Use a psuedo random source that is consistent across runs of this method + // for the same coordinator. Shuffle the replicas to prevent always choosing + // them in the same order. + pRand := rand.New(rand.NewSource(int64(coordinator.ReplicaID))) + pRand.Shuffle(len(tiedReplicas), func(i, j int) { tiedReplicas[i], tiedReplicas[j] = tiedReplicas[j], tiedReplicas[i] }) + + // Only keep the top numFollowers replicas. + if len(tiedReplicas) > numFollowers { + tiedReplicas = tiedReplicas[0:numFollowers] + } + + // Convert to replica descriptors before returning. The list of tiedReplicas + // is typically only one element. + replicaList := make([]roachpb.ReplicaDescriptor, len(tiedReplicas)+1) + for n, replicaId := range tiedReplicas { + found := false + for _, desc := range rangeDesc.Replicas().Descriptors() { + if replicaId == desc.ReplicaID { + replicaList[n] = desc + found = true + break + } + } + if !found { + return nil, errors.Errorf("unable to find replica for replicaId %d", replicaId) + } + } + // Set the last replica to be the coordinator. + replicaList[len(replicaList)-1] = coordinator + return replicaList, nil } -// TODO(amy): update description when patch for follower snapshots are completed. -// sendSnapshot sends a snapshot of the replica state to the specified replica. -// Currently only invoked from replicateQueue and raftSnapshotQueue. Be careful -// about adding additional calls as generating a snapshot is moderately -// expensive. +// sendSnapshotUsingDelegate sends a snapshot of the replica state to the specified +// replica through a delegate. Currently, only invoked from replicateQueue and +// raftSnapshotQueue. Be careful about adding additional calls as generating a +// snapshot is moderately expensive. // // A snapshot is a bulk transfer of all data in a range. It consists of a // consistent view of all the state needed to run some replica of a range as of -// some applied index (not as of some mvcc-time). Snapshots are used by Raft -// when a follower is far enough behind the leader that it can no longer be -// caught up using incremental diffs (because the leader has already garbage -// collected the diffs, in this case because it truncated the Raft log past -// where the follower is). +// some applied index (not as of some mvcc-time). There are two primary cases +// when a Snapshot is used. +// +// The first case is use by Raft when a voter or non-voter follower is far +// enough behind the leader that it can no longer be caught up using incremental +// diffs. This occurs because the leader has already garbage collected diffs +// past where the follower is. The quota pool is responsible for keeping a +// leader from getting too far ahead of any of the followers, so normally +// followers don't need a snapshot, however there are a number of cases where +// this can happen (restarts, paused followers, ...). // -// We also proactively send a snapshot when adding a new replica to bootstrap it +// The second case is adding a new replica to a replica set, to bootstrap it // (this is called a "learner" snapshot and is a special case of a Raft // snapshot, we just speed the process along). It's called a learner snapshot -// because it's sent to what Raft terms a learner replica. As of 19.2, when we -// add a new replica, it's first added as a learner using a Raft ConfChange, -// which means it accepts Raft traffic but doesn't vote or affect quorum. Then +// because it's sent to what Raft terms a learner replica. When we +// add a new replica, it's first added as a learner using a Raft ConfChange. +// A learner accepts Raft traffic but doesn't vote or affect quorum. Then // we immediately send it a snapshot to catch it up. After the snapshot // successfully applies, we turn it into a normal voting replica using another // ConfChange. It then uses the normal mechanisms to catch up with whatever got @@ -2552,13 +2663,20 @@ func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescript // the voting replica directly, this avoids a period of fragility when the // replica would be a full member, but very far behind. // -// Snapshots are expensive and mostly unexpected (except learner snapshots -// during rebalancing). The quota pool is responsible for keeping a leader from -// getting too far ahead of any of the followers, so ideally they'd never be far -// enough behind to need a snapshot. +// The snapshot process itself is broken into 4 parts: delegating the request, +// generating the snapshot, transmitting it, and applying it. // -// The snapshot process itself is broken into 3 parts: generating the snapshot, -// transmitting it, and applying it. +// Delegating the request: Since a snapshot is expensive to transfer from a +// network, CPU and IO perspective, the coordinator attempts to delegate the +// request to a healthy delegate who is both closer to the final destination. +// This is done by sending a DelegateSnapshotRequest to a replica. The replica +// can either reject the delegation request or process it. It will reject if it +// is too far behind, unhealthy, or has too long of a queue of snapshots to +// send. If the delegate accepts the delegation request, then the remaining +// three steps occur on that delegate. If the delegate does not decide to +// process the request, it sends an error back to the coordinator and the +// coordinator either chooses a different delegate or itself as the "delegate of +// last resort". // // Generating the snapshot: The data contained in a snapshot is a full copy of // the replicated data plus everything the replica needs to be a healthy member @@ -2566,7 +2684,7 @@ func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescript // instead of keeping it all in memory at once. The `(Replica).GetSnapshot` // method does the necessary locking and gathers the various Raft state needed // to run a replica. It also creates an iterator for the range's data as it -// looked under those locks (this is powered by a RocksDB snapshot, which is a +// looked under those locks (this is powered by a Pebble snapshot, which is a // different thing but a similar idea). Notably, GetSnapshot does not do the // data iteration. // @@ -2589,7 +2707,7 @@ func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescript // returns true, this is communicated back to the sender, which then proceeds to // call `kvBatchSnapshotStrategy.Send`. This uses the iterator captured earlier // to send the data in chunks, each chunk a streaming grpc message. The sender -// then sends a final message with an indicaton that it's done and blocks again, +// then sends a final message with an indication that it's done and blocks again, // waiting for a second and final response from the recipient which indicates if // the snapshot was a success. // @@ -2601,7 +2719,6 @@ func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescript // RocksDB. Each of the SSTs also has a range deletion tombstone to delete the // existing data in the range. // - // Applying the snapshot: After the recipient has received the message // indicating it has all the data, it hands it all to // `(Store).processRaftSnapshotRequest` to be applied. First, this re-checks @@ -2637,7 +2754,7 @@ func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescript // callers of `shouldAcceptSnapshotData` return an error so that we no longer // have to worry about racing with a second snapshot. See the comment on // ReplicaPlaceholder for details. -func (r *Replica) sendSnapshot( +func (r *Replica) sendSnapshotUsingDelegate( ctx context.Context, recipient roachpb.ReplicaDescriptor, snapType kvserverpb.SnapshotRequest_Type, @@ -2671,12 +2788,16 @@ func (r *Replica) sendSnapshot( ) } - // Check follower snapshots cluster setting. - if followerSnapshotsEnabled.Get(&r.ClusterSettings().SV) { - sender, err = r.getSenderReplica(ctx) - if err != nil { - return err - } + log.VEventf( + ctx, 2, "delegating snapshot transmission for %v to %v", recipient, sender, + ) + + status := r.RaftStatus() + if status == nil { + // This code path is sometimes hit during scatter for replicas that + // haven't woken up yet. + retErr = &benignError{errors.Wrap(errMarkSnapshotError, "raft status not initialized")} + return } // Don't send a queue name or priority if the receiver may not understand @@ -2689,51 +2810,182 @@ func (r *Replica) sendSnapshot( senderQueuePriority = 0 } - log.VEventf( - ctx, 2, "delegating snapshot transmission for %v to %v", recipient, sender, - ) - status := r.RaftStatus() + // Create new delegate snapshot request without specifying the delegate sender. + delegateRequest := &kvserverpb.DelegateSnapshotRequest{ + RangeID: r.RangeID, + CoordinatorReplica: sender, + RecipientReplica: recipient, + Priority: priority, + SenderQueueName: senderQueueName, + SenderQueuePriority: senderQueuePriority, + Type: snapType, + Term: status.Term, + DelegatedSender: sender, + FirstIndex: r.GetFirstIndex(), + DescriptorGeneration: r.Desc().Generation, + QueueOnDelegateLen: MaxQueueOnDelegateLimit.Get(&r.ClusterSettings().SV), + } + + // Get the list of senders in order. + senders, err := r.getSenderReplicas(ctx, recipient) + if err != nil { + return err + } + + if len(senders) == 0 { + return errors.Errorf("no sender found to send a snapshot from for %v", r) + } + + for n, sender := range senders { + delegateRequest.DelegatedSender = sender + log.VEventf( + ctx, 2, "delegating snapshot transmission attempt %v for %v to %v", n, recipient, sender, + ) + + // On the last attempt, always queue on the delegate to time out naturally. + if n == len(senders)-1 { + delegateRequest.QueueOnDelegateLen = -1 + } + + retErr = contextutil.RunWithTimeout( + ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { + // Sending snapshot + return r.store.cfg.Transport.DelegateSnapshot(ctx, delegateRequest) + }, + ) + // Return once we have the first success. + if retErr == nil { + return + } else { + log.Warningf(ctx, "attempt %d: delegate snapshot %+v request failed %v", n, delegateRequest, err) + } + } + return +} + +// validateSnapshotDelegationRequest will validate that this replica can send +// the snapshot that the coordinator requested. The main reasons a request can't +// be delegated are if the Generation or Term of the replica is not equal to the +// Generation or Term of the coordinator's request or the applied index on this +// replica is behind the truncated index of the coordinator. Note that the request +// is validated twice, once before "queueing" and once after. This reduces the +// chance of false positives (snapshots which are sent but can't be used), +// however it is difficult to completely eliminate them. Between the time of sending the +// original request and the delegate processing it, the leaseholder could decide +// to truncate its index, change the leaseholder or split or merge the range. +func (r *Replica) validateSnapshotDelegationRequest( + ctx context.Context, req *kvserverpb.DelegateSnapshotRequest, +) error { + desc := r.Desc() + // If the generation has changed, this snapshot will be useless, so don't + // attempt to send it. + if desc.Generation != req.DescriptorGeneration { + log.VEventf(ctx, 2, + "%s: generation has changed since snapshot was generated %s != %s", + r, req.DescriptorGeneration, desc.Generation, + ) + return errors.Errorf( + "%s: generation has changed since snapshot was generated %s != %s", + r, req.DescriptorGeneration, desc.Generation, + ) + } + + // Check that the snapshot we generated has a descriptor that includes the + // recipient. If it doesn't, the recipient will reject it, so it's better to + // not send it in the first place. It's possible to hit this case if we're not + // the leaseholder, and we haven't yet applied the configuration change that's + // adding the recipient to the range, or we are the leaseholder but have + // removed the recipient between starting to send the snapshot and this point. + if _, ok := desc.GetReplicaDescriptorByID(req.RecipientReplica.ReplicaID); !ok { + // Recipient replica not found in the current range descriptor. + // The sender replica's descriptor may be lagging behind the coordinator's. + log.VEventf(ctx, 2, + "%s: couldn't find receiver replica %s in sender descriptor %s", + r, req.DescriptorGeneration, r.Desc(), + ) + return errors.Errorf( + "%s: couldn't find receiver replica %s in sender descriptor %s", + r, req.RecipientReplica, r.Desc(), + ) + } + + // Check the raft applied state index and term to determine if this replica + // is not too far behind the leaseholder. If the delegate is too far behind + // that is also needs a snapshot, then any snapshot it sends will be useless. + r.mu.RLock() + replIdx := r.mu.state.RaftAppliedIndex + 1 + + status := r.raftStatusRLocked() if status == nil { // This code path is sometimes hit during scatter for replicas that // haven't woken up yet. - return &benignError{errors.Wrap(errMarkSnapshotError, "raft status not initialized")} + return errors.Errorf("raft status not initialized") } + replTerm := status.Term + r.mu.RUnlock() - // Create new delegate snapshot request with only required metadata. - delegateRequest := &kvserverpb.DelegateSnapshotRequest{ - RangeID: r.RangeID, - CoordinatorReplica: sender, - RecipientReplica: recipient, - Priority: priority, - SenderQueueName: senderQueueName, - SenderQueuePriority: senderQueuePriority, - Type: snapType, - Term: status.Term, - DelegatedSender: sender, - } - err = contextutil.RunWithTimeout( - ctx, "delegate-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { - return r.store.cfg.Transport.DelegateSnapshot( - ctx, - delegateRequest, - ) - }, - ) - // Only mark explicitly as snapshot error (which is retriable) if we timed out. - // Otherwise, it's up to the remote to add this mark where appropriate. - if errors.HasType(err, (*contextutil.TimeoutError)(nil)) { - err = errors.Mark(err, errMarkSnapshotError) + // Delegate has a different term than the coordinator. This typically means + // the lease has been transferred, and we should not process this request. + // There is a potential race where the leaseholder sends a delegate request to + // itself and then the term changes before this request is sent. In that case + // this code path will not be checked and the snapshot will still be sent. + if replTerm != req.Term { + log.Infof( + ctx, + "sender: %v is not fit to send snapshot for %v; sender term: %v coordinator term: %v", + req.DelegatedSender, req.CoordinatorReplica, replTerm, req.Term, + ) + return errors.Errorf( + "sender: %v is not fit to send snapshot for %v; sender term: %v, coordinator term: %v", + req.DelegatedSender, req.CoordinatorReplica, replTerm, req.Term, + ) } - return err + + // Sender replica's snapshot will be rejected if the sender replica's raft + // applied index is lower than or equal to the truncated state on the + // leaseholder, as this replica's snapshot will be wasted. Note that it is + // possible that we can enforce strictly lesser than if etcd does not require + // previous raft log entries for appending. + if replIdx <= req.FirstIndex { + log.Infof( + ctx, "sender: %v is not fit to send snapshot;"+" sender first index: %v, "+ + "coordinator first index: %v", req.DelegatedSender, replIdx, req.FirstIndex, + ) + return errors.Mark(errors.Errorf( + "sender: %v is not fit to send snapshot due to needing snapshot", req.DelegatedSender, + ), errMarkSnapshotError) + } + return nil } -// followerSnapshotsEnabled is used to enable or disable follower snapshots. -var followerSnapshotsEnabled = func() *settings.BoolSetting { - s := settings.RegisterBoolSetting( +// NumDelegateLimit is used to control the number of delegate followers +// to use for snapshots. To disable follower snapshots, set this to 0. If +// enabled, the leaseholder / leader will attempt to find a closer delegate than +// itself to send the snapshot through. This can save on network bandwidth at a +// cost in some cases to snapshot send latency. +var NumDelegateLimit = func() *settings.IntSetting { + s := settings.RegisterIntSetting( settings.SystemOnly, - "kv.snapshot_delegation.enabled", - "set to true to allow snapshots from follower replicas", - false, + "kv.snapshot_delegation.num_follower", + "the number of delegates to try when sending snapshots, before falling back to sending from the leaseholder", + 1, + ) + s.SetVisibility(settings.Public) + return s +}() + +// MaxQueueOnDelegateLimit is used to control how long the outgoing snapshot +// queue can be before we reject delegation requests. Setting to -1 allows +// unlimited requests. The purpose of this setting is to prevent a long snapshot +// queue from delaying a delegated snapshot from being sent. Once the queue +// length is longer than the configured value, an additional delegation requests +// will be rejected with an error. +var MaxQueueOnDelegateLimit = func() *settings.IntSetting { + s := settings.RegisterIntSetting( + settings.SystemOnly, + "kv.snapshot_delegation.num_requests", + "how many queued requests are allowed on a delegate before the request is rejected", + 3, ) s.SetVisibility(settings.Public) return s @@ -2752,11 +3004,8 @@ var traceSnapshotThreshold = settings.RegisterDurationSetting( // snapshot from this replica. The entire process of generating and transmitting // the snapshot is handled, and errors are propagated back to the leaseholder. func (r *Replica) followerSendSnapshot( - ctx context.Context, - recipient roachpb.ReplicaDescriptor, - req *kvserverpb.DelegateSnapshotRequest, - stream DelegateSnapshotResponseStream, -) (retErr error) { + ctx context.Context, recipient roachpb.ReplicaDescriptor, req *kvserverpb.DelegateSnapshotRequest, +) error { ctx = r.AnnotateCtx(ctx) sendThreshold := traceSnapshotThreshold.Get(&r.ClusterSettings().SV) if sendThreshold > 0 { @@ -2776,55 +3025,49 @@ func (r *Replica) followerSendSnapshot( }() } - // TODO(amy): when delegating to different senders, check raft applied state - // to determine if this follower replica is fit to send. - // Acknowledge that the request has been accepted. - if err := stream.Send( - &kvserverpb.DelegateSnapshotResponse{ - SnapResponse: &kvserverpb.SnapshotResponse{ - Status: kvserverpb.SnapshotResponse_ACCEPTED, - }, - }, - ); err != nil { + // Check the validity conditions twice, once before and once after we obtain + // the send semaphore. We check after to make sure the snapshot request still + // makes sense (e.g the range hasn't split under us). The check is + // lightweight. Even if the second check succeeds, the snapshot we send might + // still not be usable due to the leaseholder making a change that we don't + // know about, but we try to minimize that possibility since snapshots are + // expensive to send. + err := r.validateSnapshotDelegationRequest(ctx, req) + if err != nil { return err } - // Throttle snapshot sending. + // Throttle snapshot sending. Obtain the send semaphore and determine the rate limit. rangeSize := r.GetMVCCStats().Total() cleanup, err := r.store.reserveSendSnapshot(ctx, req, rangeSize) if err != nil { - return err + return errors.Wrap(err, "Unable to reserve space for sending this snapshot") } defer cleanup() + // Check validity again, it is possible that the pending request should not be + // sent after we are doing waiting. + err = r.validateSnapshotDelegationRequest(ctx, req) + if err != nil { + return err + } + + // TODO: Review this to make sure that there isn't any inconsistency since we + // may be on a delegate. The GetSnapshot call will add a LogTruncation + // constraint, but it isn't clear that this will help. snapType := req.Type snap, err := r.GetSnapshot(ctx, snapType, recipient.StoreID) if err != nil { - err = errors.Wrapf(err, "%s: failed to generate %s snapshot", r, snapType) - return errors.Mark(err, errMarkSnapshotError) + return errors.Wrapf(err, "%s: failed to generate %s snapshot", r, snapType) } defer snap.Close() log.Event(ctx, "generated snapshot") - // Check that the snapshot we generated has a descriptor that includes the - // recipient. If it doesn't, the recipient will reject it, so it's better to - // not send it in the first place. It's possible to hit this case if we're not - // the leaseholder and we haven't yet applied the configuration change that's - // adding the recipient to the range. - if _, ok := snap.State.Desc.GetReplicaDescriptor(recipient.StoreID); !ok { - return errors.Wrapf( - errMarkSnapshotError, - "attempting to send snapshot that does not contain the recipient as a replica; "+ - "snapshot type: %s, recipient: s%d, desc: %s", snapType, recipient, snap.State.Desc, - ) - } - - // We avoid shipping over the past Raft log in the snapshot by changing - // the truncated state (we're allowed to -- it's an unreplicated key and not - // subject to mapping across replicas). The actual sending happens here: - _ = (*kvBatchSnapshotStrategy)(nil).Send - // and results in no log entries being sent at all. Note that - // Metadata.Index is really the applied index of the replica. + // We avoid shipping over the past Raft log in the snapshot by changing the + // truncated state (we're allowed to -- it's an unreplicated key and not + // subject to mapping across replicas). The actual sending happens in + // kvBatchSnapshotStrategy.Send and results in no log entries being sent at + // all. Note that Metadata.Index is really the applied index of the replica. snap.State.TruncatedState = &roachpb.RaftTruncatedState{ Index: snap.RaftSnap.Metadata.Index, Term: snap.RaftSnap.Metadata.Term, diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 635c01ae7b91..3a5aa145a98b 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -40,6 +41,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" @@ -271,6 +273,13 @@ func TestAddReplicaWithReceiverThrottling(t *testing.T) { ) defer tc.Stopper().Stop(ctx) + + // Disable delegating snapshots to different senders, which would otherwise + // fail this test as snapshots could queue on different stores. + settings := cluster.MakeTestingClusterSettings() + sv := &settings.SV + kvserver.NumDelegateLimit.Override(ctx, sv, 0) + scratch := tc.ScratchRange(t) replicationChange := make(chan error, 2) g := ctxgroup.WithContext(ctx) @@ -338,6 +347,197 @@ func TestAddReplicaWithReceiverThrottling(t *testing.T) { require.NoError(t, g.Wait()) } +// TestDelegateSnapshot verifies that the correct delegate is chosen when +// sending snapshots to stores. +// TODO: It is currently disabled because with raft snapshots sometimes being +// required and not going through snapshot delegation, the results are +// unpredictable. +func TestDelegateSnapshot(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + skip.UnderStress(t, "Occasionally fails until 87553 is resolved") + + ctx := context.Background() + + // Synchronize on the moment before the snapshot gets sent to measure the + // state at that time. + // FIXME: This should only require size 1, sometimes extra snapshots are sent. + // Solve this before merging. + requestChannel := make(chan *kvserverpb.DelegateSnapshotRequest, 10) + + setupFn := func(t *testing.T) ( + *testcluster.TestCluster, + roachpb.Key, + ) { + knobs, ltk := makeReplicationTestKnobs() + ltk.storeKnobs.DisableRaftSnapshotQueue = true + + ltk.storeKnobs.SendSnapshot = func(request *kvserverpb.DelegateSnapshotRequest) { + requestChannel <- request + } + + localityA := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "a"}}} + localityB := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "b"}}} + + localityServerArgs := make(map[int]base.TestServerArgs) + localityServerArgs[0] = base.TestServerArgs{Knobs: knobs, Locality: localityA} + localityServerArgs[1] = base.TestServerArgs{Knobs: knobs, Locality: localityA} + localityServerArgs[2] = base.TestServerArgs{Knobs: knobs, Locality: localityB} + localityServerArgs[3] = base.TestServerArgs{Knobs: knobs, Locality: localityB} + + tc := testcluster.StartTestCluster( + t, 4, base.TestClusterArgs{ + ServerArgsPerNode: localityServerArgs, + ReplicationMode: base.ReplicationManual, + }, + ) + scratchKey := tc.ScratchRange(t) + return tc, scratchKey + } + + tc, scratchKey := setupFn(t) + defer tc.Stopper().Stop(ctx) + + // Node 3 (loc B) can only get the data from node 1 as its the only one that has it. + _ = tc.AddVotersOrFatal(t, scratchKey, tc.Targets(2)...) + request := <-requestChannel + require.Equal(t, request.DelegatedSender.StoreID, roachpb.StoreID(1)) + // Drain the channel. Unfortunately there are occasionally spurious raft snapshots sent. + for len(requestChannel) > 0 { + <-requestChannel + } + + // Node 4 (loc B) should get the snapshot from node 3 as its the same locality. + _ = tc.AddVotersOrFatal(t, scratchKey, tc.Targets(3)...) + request = <-requestChannel + require.Equal(t, request.DelegatedSender.StoreID, roachpb.StoreID(3)) + for len(requestChannel) > 0 { + <-requestChannel + } + + // Node 2 (loc A) should get the snapshot from node 1 as it is the same locality. + _ = tc.AddVotersOrFatal(t, scratchKey, tc.Targets(1)...) + request = <-requestChannel + require.Equal(t, request.DelegatedSender.StoreID, roachpb.StoreID(1)) +} + +// TestDelegateSnapshotFails is a test that ensure we fail fast when the +// sender or receiver store crashes during delegated snapshot sending. +func TestDelegateSnapshotFails(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + var senders struct { + mu syncutil.Mutex + desc []roachpb.ReplicaDescriptor + } + + setupFn := func(t *testing.T) ( + *testcluster.TestCluster, + roachpb.Key, + ) { + senders.desc = nil + knobs, ltk := makeReplicationTestKnobs() + ltk.storeKnobs.ThrottleEmptySnapshots = true + + ltk.storeKnobs.SelectDelegateSnapshotSender = + func(descriptor *roachpb.RangeDescriptor) []roachpb.ReplicaDescriptor { + senders.mu.Lock() + defer senders.mu.Unlock() + return senders.desc + } + + tc := testcluster.StartTestCluster( + t, 4, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{Knobs: knobs}, + ReplicationMode: base.ReplicationManual, + }, + ) + + scratchKey := tc.ScratchRange(t) + return tc, scratchKey + } + + // Add a learner replica that will need a snapshot, kill the server + // the learner is on. Assert that the failure is detected and change replicas + // fails fast. + t.Run("receiver", func(t *testing.T) { + tc, scratchKey := setupFn(t) + defer tc.Stopper().Stop(ctx) + + desc, err := tc.LookupRange(scratchKey) + require.NoError(t, err, "Unable to lookup the range") + + _, err = setupPartitionedRange(tc, desc.RangeID, 0, 0, true, unreliableRaftHandlerFuncs{}) + require.NoError(t, err) + + _, err = tc.Servers[0].DB().AdminChangeReplicas( + ctx, scratchKey, desc, roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)), + ) + + require.True(t, testutils.IsError(err, "partitioned"), `expected partitioned error got: %+v`, err) + }) + + // Add a follower replica to act as the snapshot sender, and kill the server + // the sender is on. Assert that the failure is detected and change replicas + // fails fast. + t.Run("sender_no_fallback", func(t *testing.T) { + tc, scratchKey := setupFn(t) + defer tc.Stopper().Stop(ctx) + + // Add a replica that will be the delegated sender, and another so we have + // quorum with this node down + desc := tc.AddVotersOrFatal(t, scratchKey, tc.Targets(2, 3)...) + + replicaDesc, ok := desc.GetReplicaDescriptor(3) + require.True(t, ok) + // Always use node 3 (index 2) as the only delegate. + senders.mu.Lock() + senders.desc = append(senders.desc, replicaDesc) + senders.mu.Unlock() + + // Now stop accepting traffic to node 3 (index 2). + _, err := setupPartitionedRange(tc, desc.RangeID, 0, 2, true, unreliableRaftHandlerFuncs{}) + require.NoError(t, err) + + _, err = tc.Servers[0].DB().AdminChangeReplicas( + ctx, scratchKey, desc, roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)), + ) + log.Infof(ctx, "Err=%v", err) + require.True(t, testutils.IsError(err, "partitioned"), `expected partitioned error got: %+v`, err) + }) + + // Identical setup as the previous test, but allow a fallback to the leaseholder. + t.Run("sender_with_fallback", func(t *testing.T) { + tc, scratchKey := setupFn(t) + defer tc.Stopper().Stop(ctx) + + // Add a replica that will be the delegated sender, and another so we have + // quorum with this node down + desc := tc.AddVotersOrFatal(t, scratchKey, tc.Targets(2, 3)...) + + replicaDesc, ok := desc.GetReplicaDescriptor(3) + require.True(t, ok) + leaseholderDesc, ok := desc.GetReplicaDescriptor(1) + require.True(t, ok) + // First try to use node 3 (index 2) as the delegate, but fall back to the leaseholder on failure. + senders.mu.Lock() + senders.desc = append(senders.desc, replicaDesc) + senders.desc = append(senders.desc, leaseholderDesc) + senders.mu.Unlock() + + // Now stop accepting traffic to node 3 (index 2). + _, err := setupPartitionedRange(tc, desc.RangeID, 0, 2, true, unreliableRaftHandlerFuncs{}) + require.NoError(t, err) + + _, err = tc.Servers[0].DB().AdminChangeReplicas( + ctx, scratchKey, desc, roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)), + ) + require.NoError(t, err) + }) +} + func TestLearnerRaftConfState(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -502,7 +702,7 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) { // the state at that time & gather metrics. blockUntilSnapshotSendCh := make(chan struct{}) blockSnapshotSendCh := make(chan struct{}) - ltk.storeKnobs.SendSnapshot = func() { + ltk.storeKnobs.SendSnapshot = func(request *kvserverpb.DelegateSnapshotRequest) { close(blockUntilSnapshotSendCh) select { case <-blockSnapshotSendCh: @@ -1837,7 +2037,7 @@ func TestRebalancingSnapshotMetrics(t *testing.T) { // the state at that time. blockUntilSnapshotSendCh := make(chan struct{}) blockSnapshotSendCh := make(chan struct{}) - ltk.storeKnobs.SendSnapshot = func() { + ltk.storeKnobs.SendSnapshot = func(request *kvserverpb.DelegateSnapshotRequest) { close(blockUntilSnapshotSendCh) select { case <-blockSnapshotSendCh: diff --git a/pkg/kv/kvserver/storage_services.proto b/pkg/kv/kvserver/storage_services.proto index a876b768dbeb..b20f737ab3a6 100644 --- a/pkg/kv/kvserver/storage_services.proto +++ b/pkg/kv/kvserver/storage_services.proto @@ -27,16 +27,10 @@ service MultiRaft { // ERROR, including any collected traces from processing. rpc RaftSnapshot (stream cockroach.kv.kvserver.kvserverpb.SnapshotRequest) returns (stream cockroach.kv.kvserver.kvserverpb.SnapshotResponse) {} // DelegateRaftSnapshot asks the server to send a range snapshot to a target - // (so the client delegates the sending of the snapshot to the server). The - // server responds in two phases. + // (so the client delegates the sending of the snapshot to the server). // - // TODO(nvanbenschoten): This RPC is bi-directional streaming (as opposed to - // only server-streaming) because of future aspirations; at the moment the - // request is unary. In the future, we wanted to pause all log truncation, - // then handshake with the delegated sender, then weaken log truncation - // protection to just below the index that the sender was sending the - // snapshot at. - rpc DelegateRaftSnapshot(stream cockroach.kv.kvserver.kvserverpb.DelegateSnapshotRequest) returns (stream cockroach.kv.kvserver.kvserverpb.DelegateSnapshotResponse) {} + // This sends a DelegateRequest to the chosen sender. + rpc DelegateRaftSnapshot(cockroach.kv.kvserver.kvserverpb.DelegateSnapshotRequest) returns (cockroach.kv.kvserver.kvserverpb.DelegateSnapshotResponse) {} } service PerReplica { diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 1615a2136ab5..20a6b6ec4ad7 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -160,41 +160,44 @@ func (qs *raftReceiveQueues) Delete(rangeID roachpb.RangeID) { // HandleDelegatedSnapshot reads the incoming delegated snapshot message and // throttles sending snapshots before passing the request to the sender replica. func (s *Store) HandleDelegatedSnapshot( - ctx context.Context, - req *kvserverpb.DelegateSnapshotRequest, - stream DelegateSnapshotResponseStream, -) error { + ctx context.Context, req *kvserverpb.DelegateSnapshotRequest, +) (*kvserverpb.DelegateSnapshotResponse, error) { ctx = s.AnnotateCtx(ctx) if fn := s.cfg.TestingKnobs.SendSnapshot; fn != nil { - fn() + fn(req) } + sp := tracing.SpanFromContext(ctx) + + // This can happen if the delegate doesn't know about the range yet. Return an + // error immediately. sender, err := s.GetReplica(req.RangeID) if err != nil { - return err + //nolint:returnerrcheck + return &kvserverpb.DelegateSnapshotResponse{ + SnapResponse: snapRespErr(err), + CollectedSpans: sp.GetConfiguredRecording(), + }, nil } - sp := tracing.SpanFromContext(ctx) // Pass the request to the sender replica. - if err := sender.followerSendSnapshot(ctx, req.RecipientReplica, req, stream); err != nil { - return stream.Send( - &kvserverpb.DelegateSnapshotResponse{ - SnapResponse: snapRespErr(err), - CollectedSpans: sp.GetConfiguredRecording(), - }, - ) + if err := sender.followerSendSnapshot(ctx, req.RecipientReplica, req); err != nil { + // If an error occurred during snapshot sending, send an error response. + //nolint:returnerrcheck + return &kvserverpb.DelegateSnapshotResponse{ + SnapResponse: snapRespErr(err), + CollectedSpans: sp.GetConfiguredRecording(), + }, nil } - resp := &kvserverpb.DelegateSnapshotResponse{ + return &kvserverpb.DelegateSnapshotResponse{ SnapResponse: &kvserverpb.SnapshotResponse{ Status: kvserverpb.SnapshotResponse_APPLIED, DeprecatedMessage: "Snapshot successfully applied by recipient", }, CollectedSpans: sp.GetConfiguredRecording(), - } - // Send a final response that snapshot sending is completed. - return stream.Send(resp) + }, nil } // HandleSnapshot reads an incoming streaming snapshot and applies it if diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index e3e07b07297b..a254576ae7f9 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -91,13 +91,6 @@ type outgoingSnapshotStream interface { Recv() (*kvserverpb.SnapshotResponse, error) } -// outgoingSnapshotStream is the minimal interface on a GRPC stream required -// to send a snapshot over the network. -type outgoingDelegatedStream interface { - Send(*kvserverpb.DelegateSnapshotRequest) error - Recv() (*kvserverpb.DelegateSnapshotResponse, error) -} - // snapshotRecordMetrics is a wrapper function that increments a set of metrics // related to the number of snapshot bytes sent/received. The definer of the // function specifies which metrics are incremented. @@ -674,7 +667,11 @@ func (kvSS *kvBatchSnapshotStrategy) Close(ctx context.Context) { } } -// reserveReceiveSnapshot throttles incoming snapshots. +// reserveReceiveSnapshot reserves space for this snapshot which will attempt to +// prevent overload of system resources as this snapshot is being sent. +// Snapshots are often sent in bulk (due to operations like store decommission) +// so it is necessary to prevent snapshot transfers from overly impacting +// foreground traffic. func (s *Store) reserveReceiveSnapshot( ctx context.Context, header *kvserverpb.SnapshotRequest_Header, ) (_cleanup func(), _err error) { @@ -683,8 +680,9 @@ func (s *Store) reserveReceiveSnapshot( return s.throttleSnapshot(ctx, s.snapshotApplyQueue, int(header.SenderQueueName), header.SenderQueuePriority, + -1, header.RangeSize, - header.RaftMessageRequest.RangeID, header.RaftMessageRequest.ToReplica.ReplicaID, + header.RaftMessageRequest.RangeID, s.metrics.RangeSnapshotRecvQueueLength, s.metrics.RangeSnapshotRecvInProgress, s.metrics.RangeSnapshotRecvTotalInProgress, ) @@ -701,9 +699,11 @@ func (s *Store) reserveSendSnapshot( } return s.throttleSnapshot(ctx, s.snapshotSendQueue, - int(req.SenderQueueName), req.SenderQueuePriority, + int(req.SenderQueueName), + req.SenderQueuePriority, + req.QueueOnDelegateLen, rangeSize, - req.RangeID, req.DelegatedSender.ReplicaID, + req.RangeID, s.metrics.RangeSnapshotSendQueueLength, s.metrics.RangeSnapshotSendInProgress, s.metrics.RangeSnapshotSendTotalInProgress, ) @@ -717,11 +717,12 @@ func (s *Store) throttleSnapshot( snapshotQueue *multiqueue.MultiQueue, requestSource int, requestPriority float64, + maxQueueLength int64, rangeSize int64, rangeID roachpb.RangeID, - replicaID roachpb.ReplicaID, waitingSnapshotMetric, inProgressSnapshotMetric, totalInProgressSnapshotMetric *metric.Gauge, -) (cleanup func(), err error) { +) (cleanup func(), funcErr error) { + tBegin := timeutil.Now() var permit *multiqueue.Permit // Empty snapshots are exempt from rate limits because they're so cheap to @@ -729,9 +730,14 @@ func (s *Store) throttleSnapshot( // RESTORE or manual SPLIT AT, since it prevents these empty snapshots from // getting stuck behind large snapshots managed by the replicate queue. if rangeSize != 0 || s.cfg.TestingKnobs.ThrottleEmptySnapshots { - task := snapshotQueue.Add(requestSource, requestPriority) + task, err := snapshotQueue.Add(requestSource, requestPriority, maxQueueLength) + if err != nil { + return nil, err + } + // After this point, the task is on the queue, so any future errors need to + // be handled by cancelling the task to release the permit. defer func() { - if err != nil { + if funcErr != nil { snapshotQueue.Cancel(task) } }() @@ -787,10 +793,9 @@ func (s *Store) throttleSnapshot( if elapsed > snapshotReservationWaitWarnThreshold && !buildutil.CrdbTestBuild { log.Infof( ctx, - "waited for %.1fs to acquire snapshot reservation to r%d/%d", + "waited for %.1fs to acquire snapshot reservation to r%d", elapsed.Seconds(), rangeID, - replicaID, ) } @@ -1127,7 +1132,7 @@ func maybeHandleDeprecatedSnapErr(deprecated bool, err error) error { return errors.Mark(err, errMarkSnapshotError) } -// SnapshotStorePool narrows StorePool to make sendSnapshot easier to test. +// SnapshotStorePool narrows StorePool to make sendSnapshotUsingDelegate easier to test. type SnapshotStorePool interface { Throttle(reason storepool.ThrottleReason, why string, toStoreID roachpb.StoreID) } @@ -1500,7 +1505,7 @@ func sendSnapshot( recordBytesSent snapshotRecordMetrics, ) error { if recordBytesSent == nil { - // NB: Some tests and an offline tool (ResetQuorum) call into `sendSnapshot` + // NB: Some tests and an offline tool (ResetQuorum) call into `sendSnapshotUsingDelegate` // with a nil metrics tracking function. We pass in a fake metrics tracking function here that isn't // hooked up to anything. recordBytesSent = func(inc int64) {} @@ -1625,76 +1630,3 @@ func sendSnapshot( ) } } - -// delegateSnapshot sends an outgoing delegated snapshot request via a -// pre-opened GRPC stream. It sends the delegated snapshot request to the -// sender and waits for confirmation that the snapshot has been applied. -func delegateSnapshot( - ctx context.Context, stream outgoingDelegatedStream, req *kvserverpb.DelegateSnapshotRequest, -) error { - - delegatedSender := req.DelegatedSender - if err := stream.Send(req); err != nil { - return err - } - // Wait for a response from the sender. - resp, err := stream.Recv() - if err != nil { - return err - } - switch resp.SnapResponse.Status { - case kvserverpb.SnapshotResponse_ERROR: - return errors.Wrapf( - maybeHandleDeprecatedSnapErr(resp.Error()), - "%s: sender couldn't accept %s", delegatedSender, req) - case kvserverpb.SnapshotResponse_ACCEPTED: - // The sender accepted the request, it will continue with sending. - log.VEventf( - ctx, 2, "sender %s accepted snapshot request %s", delegatedSender, - req, - ) - default: - err := errors.Errorf( - "%s: server sent an invalid status while negotiating %s: %s", - delegatedSender, req, resp.SnapResponse.Status, - ) - return err - } - - // Wait for response to see if the receiver successfully applied the snapshot. - resp, err = stream.Recv() - if err != nil { - return errors.Mark( - errors.Wrapf(err, "%s: remote failed to send snapshot", delegatedSender), errMarkSnapshotError, - ) - } - // Wait for EOF to ensure server side processing is complete. - if unexpectedResp, err := stream.Recv(); err != io.EOF { - if err != nil { - return errors.Mark(errors.Wrapf( - err, "%s: expected EOF, got resp=%v with error", - delegatedSender.StoreID, unexpectedResp), errMarkSnapshotError) - } - return errors.Mark(errors.Newf( - "%s: expected EOF, got resp=%v", delegatedSender.StoreID, - unexpectedResp), errMarkSnapshotError) - } - sp := tracing.SpanFromContext(ctx) - if sp != nil { - sp.ImportRemoteRecording(resp.CollectedSpans) - } - switch resp.SnapResponse.Status { - case kvserverpb.SnapshotResponse_ERROR: - return maybeHandleDeprecatedSnapErr(resp.Error()) - case kvserverpb.SnapshotResponse_APPLIED: - // This is the response we're expecting. Snapshot successfully applied. - log.VEventf(ctx, 2, "%s: delegated snapshot was successfully applied", delegatedSender) - return nil - default: - return errors.Errorf( - "%s: server sent an invalid status during finalization: %s", - delegatedSender, resp.SnapResponse.Status, - ) - } - -} diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 31b034ace867..b10a0a2fd6af 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -3011,7 +3011,7 @@ func (sp *fakeStorePool) Throttle( } // TestSendSnapshotThrottling tests the store pool throttling behavior of -// store.sendSnapshot, ensuring that it properly updates the StorePool on +// store.sendSnapshotUsingDelegate, ensuring that it properly updates the StorePool on // various exceptional conditions and new capacity estimates. func TestSendSnapshotThrottling(t *testing.T) { defer leaktest.AfterTest(t)() @@ -3082,21 +3082,35 @@ func TestSendSnapshotConcurrency(t *testing.T) { s := tc.store // Checking this now makes sure that if the defaults change this test will also. - require.Equal(t, 2, s.snapshotSendQueue.Len()) + require.Equal(t, 2, s.snapshotSendQueue.AvailableLen()) + require.Equal(t, 0, s.snapshotSendQueue.QueueLen()) cleanup1, err := s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSnapshotRequest{ SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE, SenderQueuePriority: 1, }, 1) - require.Nil(t, err) - require.Equal(t, 1, s.snapshotSendQueue.Len()) + require.NoError(t, err) cleanup2, err := s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSnapshotRequest{ SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE, SenderQueuePriority: 1, }, 1) - require.Nil(t, err) - require.Equal(t, 0, s.snapshotSendQueue.Len()) + require.NoError(t, err) + require.Equal(t, 0, s.snapshotSendQueue.AvailableLen()) + require.Equal(t, 1, s.snapshotSendQueue.QueueLen()) + // At this point both the first two tasks will be holding reservations and - // waiting for cleanup, a third task will block. + // waiting for cleanup, a third task will block or fail First send one with + // the queue length set to 0 - this will fail since the first tasks are still + // running. + _, err = s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSnapshotRequest{ + SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE, + SenderQueuePriority: 1, + QueueOnDelegateLen: 0, + }, 1) + require.Error(t, err) + require.Equal(t, 0, s.snapshotSendQueue.AvailableLen()) + require.Equal(t, 1, s.snapshotSendQueue.QueueLen()) + + // Now add a task that will wait indefinitely for another task to finish. var wg sync.WaitGroup wg.Add(2) go func() { @@ -3104,18 +3118,20 @@ func TestSendSnapshotConcurrency(t *testing.T) { cleanup3, err := s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSnapshotRequest{ SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE, SenderQueuePriority: 1, + QueueOnDelegateLen: -1, }, 1) after := timeutil.Now() + require.NoError(t, err) + require.GreaterOrEqual(t, after.Sub(before), 10*time.Millisecond) cleanup3() wg.Done() - require.Nil(t, err) - require.GreaterOrEqual(t, after.Sub(before), 10*time.Millisecond) }() - // This task will not block for more than a few MS, but we want to wait for - // it to complete to make sure it frees the permit. + // Now add one that will queue up and wait for another task to finish. This + // task will not block for more than 8ms, but we want to wait for it to + // complete to make sure it frees the permit. go func() { - deadlineCtx, cancel := context.WithTimeout(ctx, 1*time.Millisecond) + deadlineCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond) defer cancel() // This will time out since the deadline is set artificially low. Make sure @@ -3123,19 +3139,25 @@ func TestSendSnapshotConcurrency(t *testing.T) { _, err := s.reserveSendSnapshot(deadlineCtx, &kvserverpb.DelegateSnapshotRequest{ SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE, SenderQueuePriority: 1, + QueueOnDelegateLen: -1, }, 1) + require.Error(t, err) wg.Done() - require.NotNil(t, err) }() // Wait a little time before calling signaling the first two as complete. time.Sleep(100 * time.Millisecond) + require.Equal(t, 0, s.snapshotSendQueue.AvailableLen()) + // One remaining task are queued at this point. + require.Equal(t, 2, s.snapshotSendQueue.QueueLen()) + cleanup1() cleanup2() // Wait until all cleanup run before checking the number of permits. wg.Wait() - require.Equal(t, 2, s.snapshotSendQueue.Len()) + require.Equal(t, 2, s.snapshotSendQueue.AvailableLen()) + require.Equal(t, 0, s.snapshotSendQueue.QueueLen()) } func TestReserveSnapshotThrottling(t *testing.T) { diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 3835ee8a694f..f94753d30be2 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -284,7 +284,7 @@ type StoreTestingKnobs struct { // SendSnapshot is run after receiving a DelegateRaftSnapshot request but // before any throttling or sending logic. - SendSnapshot func() + SendSnapshot func(*kvserverpb.DelegateSnapshotRequest) // ReceiveSnapshot is run after receiving a snapshot header but before // acquiring snapshot quota or doing shouldAcceptSnapshotData checks. If an // error is returned from the hook, it's sent as an ERROR SnapshotResponse. @@ -431,6 +431,9 @@ type StoreTestingKnobs struct { // AfterSendSnapshotThrottle intercepts replicas after receiving a spot in the // send snapshot semaphore. AfterSendSnapshotThrottle func() + // SelectDelegateSnapshotSender returns an ordered list of replica which will + // be used as delegates for sending a snapshot. + SelectDelegateSnapshotSender func(*roachpb.RangeDescriptor) []roachpb.ReplicaDescriptor // This method, if set, gets to see (and mutate, if desired) any local // StoreDescriptor before it is being sent out on the Gossip network.