diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 8da607270ef1..2e4eb22d12a5 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -60,7 +60,8 @@
kv.replica_circuit_breaker.slow_replication_threshold
duration1m0sduration after which slow proposals trip the per-Replica circuit breaker (zero duration disables breakers)
kv.replica_stats.addsst_request_size_factor
integer50000the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1
kv.replication_reports.interval
duration1m0sthe frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable) -
kv.snapshot_delegation.enabled
booleanfalseset to true to allow snapshots from follower replicas +
kv.snapshot_delegation.num_follower
integer1the number of delegates to try when sending snapshots, before falling back to sending from the leaseholder +
kv.snapshot_delegation.num_requests
integer3how many queued requests are allowed on a delegate before the request is rejected
kv.snapshot_rebalance.max_rate
byte size32 MiBthe rate limit (bytes/sec) to use for rebalance and upreplication snapshots
kv.snapshot_recovery.max_rate
byte size32 MiBthe rate limit (bytes/sec) to use for recovery snapshots
kv.transaction.max_intents_bytes
integer4194304maximum number of bytes used to track locks in transactions diff --git a/pkg/cmd/roachtest/tests/decommissionbench.go b/pkg/cmd/roachtest/tests/decommissionbench.go index 35c0c030146a..0c7ca78704af 100644 --- a/pkg/cmd/roachtest/tests/decommissionbench.go +++ b/pkg/cmd/roachtest/tests/decommissionbench.go @@ -61,13 +61,15 @@ const ( type decommissionBenchSpec struct { nodes int - cpus int warehouses int - load bool + noLoad bool multistore bool snapshotRate int duration time.Duration + // Whether the test cluster nodes are in multiple regions. + multiregion bool + // When true, the test will attempt to stop the node prior to decommission. whileDown bool @@ -85,6 +87,10 @@ type decommissionBenchSpec struct { // An override for the default timeout, if needed. timeout time.Duration + // An override for the decommission node to make it choose a predictable node + // instead of a random node. + decommissionNode int + skip string } @@ -95,29 +101,21 @@ func registerDecommissionBench(r registry.Registry) { // Basic benchmark configurations, to be run nightly. { nodes: 4, - cpus: 16, warehouses: 1000, - load: true, }, { nodes: 4, - cpus: 16, warehouses: 1000, - load: true, duration: 1 * time.Hour, }, { nodes: 4, - cpus: 16, warehouses: 1000, - load: true, whileDown: true, }, { nodes: 8, - cpus: 16, warehouses: 3000, - load: true, // This test can take nearly an hour to import and achieve balance, so // we extend the timeout to let it complete. timeout: 4 * time.Hour, @@ -126,9 +124,7 @@ func registerDecommissionBench(r registry.Registry) { { // Add a new node during decommission (no drain). nodes: 8, - cpus: 16, warehouses: 3000, - load: true, whileUpreplicating: true, // This test can take nearly an hour to import and achieve balance, so // we extend the timeout to let it complete. @@ -138,9 +134,7 @@ func registerDecommissionBench(r registry.Registry) { { // Drain before decommission, without adding a new node. nodes: 8, - cpus: 16, warehouses: 3000, - load: true, drainFirst: true, // This test can take nearly an hour to import and achieve balance, so // we extend the timeout to let it complete. @@ -150,9 +144,7 @@ func registerDecommissionBench(r registry.Registry) { { // Drain before decommission, and add a new node. nodes: 8, - cpus: 16, warehouses: 3000, - load: true, whileUpreplicating: true, drainFirst: true, // This test can take nearly an hour to import and achieve balance, so @@ -162,25 +154,19 @@ func registerDecommissionBench(r registry.Registry) { }, { nodes: 4, - cpus: 16, warehouses: 1000, - load: true, drainFirst: true, skip: manualBenchmarkingOnly, }, { nodes: 4, - cpus: 16, warehouses: 1000, - load: true, slowWrites: true, skip: manualBenchmarkingOnly, }, { nodes: 8, - cpus: 16, warehouses: 3000, - load: true, slowWrites: true, // This test can take nearly an hour to import and achieve balance, so // we extend the timeout to let it complete. @@ -189,9 +175,7 @@ func registerDecommissionBench(r registry.Registry) { }, { nodes: 12, - cpus: 16, warehouses: 3000, - load: true, multistore: true, // This test can take nearly an hour to import and achieve balance, so // we extend the timeout to let it complete. @@ -201,14 +185,30 @@ func registerDecommissionBench(r registry.Registry) { { // Test to compare 12 4-store nodes vs 48 single-store nodes nodes: 48, - cpus: 16, warehouses: 3000, - load: true, // This test can take nearly an hour to import and achieve balance, so // we extend the timeout to let it complete. timeout: 3 * time.Hour, skip: manualBenchmarkingOnly, }, + { + // Multiregion decommission, and add a new node in the same region. + nodes: 6, + warehouses: 1000, + whileUpreplicating: true, + drainFirst: true, + multiregion: true, + decommissionNode: 2, + }, + { + // Multiregion decommission, and add a new node in a different region. + nodes: 6, + warehouses: 1000, + whileUpreplicating: true, + drainFirst: true, + multiregion: true, + decommissionNode: 3, + }, } { registerDecommissionBenchSpec(r, benchSpec) } @@ -223,7 +223,7 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe } extraNameParts := []string{""} addlNodeCount := 0 - specOptions := []spec.Option{spec.CPU(benchSpec.cpus)} + specOptions := []spec.Option{spec.CPU(16)} if benchSpec.snapshotRate != 0 { extraNameParts = append(extraNameParts, @@ -251,7 +251,7 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe extraNameParts = append(extraNameParts, "while-upreplicating") } - if !benchSpec.load { + if benchSpec.noLoad { extraNameParts = append(extraNameParts, "no-load") } @@ -259,11 +259,23 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe extraNameParts = append(extraNameParts, "hi-read-amp") } + if benchSpec.decommissionNode != 0 { + extraNameParts = append(extraNameParts, + fmt.Sprintf("target=%d", benchSpec.decommissionNode)) + } + if benchSpec.duration > 0 { timeout = benchSpec.duration * 3 extraNameParts = append(extraNameParts, fmt.Sprintf("duration=%s", benchSpec.duration)) } + if benchSpec.multiregion { + geoZones := []string{regionUsEast, regionUsWest, regionUsCentral} + specOptions = append(specOptions, spec.Zones(strings.Join(geoZones, ","))) + specOptions = append(specOptions, spec.Geo()) + extraNameParts = append(extraNameParts, "multi-region") + } + // If run with ROACHTEST_DECOMMISSION_NOSKIP=1, roachtest will enable all specs. noSkipFlag := os.Getenv(envDecommissionNoSkipFlag) if noSkipFlag != "" { @@ -273,8 +285,8 @@ func registerDecommissionBenchSpec(r registry.Registry, benchSpec decommissionBe extraName := strings.Join(extraNameParts, "/") r.Add(registry.TestSpec{ - Name: fmt.Sprintf("decommissionBench/nodes=%d/cpu=%d/warehouses=%d%s", - benchSpec.nodes, benchSpec.cpus, benchSpec.warehouses, extraName), + Name: fmt.Sprintf("decommissionBench/nodes=%d/warehouses=%d%s", + benchSpec.nodes, benchSpec.warehouses, extraName), Owner: registry.OwnerKV, Cluster: r.MakeClusterSpec( benchSpec.nodes+addlNodeCount+1, @@ -471,7 +483,7 @@ func uploadPerfArtifacts( // Get the workload perf artifacts and move them to the pinned node, so that // they can be used to display the workload operation rates during decommission. - if benchSpec.load { + if !benchSpec.noLoad { workloadStatsSrc := filepath.Join(t.PerfArtifactsDir(), "stats.json") localWorkloadStatsPath := filepath.Join(t.ArtifactsDir(), "workload_stats.json") workloadStatsDest := filepath.Join(t.PerfArtifactsDir(), "workload_stats.json") @@ -595,7 +607,7 @@ func runDecommissionBench( workloadCtx, workloadCancel := context.WithCancel(ctx) m := c.NewMonitor(workloadCtx, crdbNodes) - if benchSpec.load { + if !benchSpec.noLoad { m.Go( func(ctx context.Context) error { close(rampStarted) @@ -642,7 +654,7 @@ func runDecommissionBench( // If we are running a workload, wait until it has started and completed its // ramp time before initiating a decommission. - if benchSpec.load { + if !benchSpec.noLoad { <-rampStarted t.Status("Waiting for workload to ramp up...") select { @@ -666,7 +678,7 @@ func runDecommissionBench( m.ExpectDeath() defer m.ResetDeaths() - err := runSingleDecommission(ctx, h, pinnedNode, &targetNodeAtomic, benchSpec.snapshotRate, + err := runSingleDecommission(ctx, h, pinnedNode, benchSpec.decommissionNode, &targetNodeAtomic, benchSpec.snapshotRate, benchSpec.whileDown, benchSpec.drainFirst, false /* reuse */, benchSpec.whileUpreplicating, true /* estimateDuration */, benchSpec.slowWrites, tickByName, ) @@ -729,7 +741,7 @@ func runDecommissionBenchLong( workloadCtx, workloadCancel := context.WithCancel(ctx) m := c.NewMonitor(workloadCtx, crdbNodes) - if benchSpec.load { + if !benchSpec.noLoad { m.Go( func(ctx context.Context) error { close(rampStarted) @@ -773,7 +785,7 @@ func runDecommissionBenchLong( // If we are running a workload, wait until it has started and completed its // ramp time before initiating a decommission. - if benchSpec.load { + if !benchSpec.noLoad { <-rampStarted t.Status("Waiting for workload to ramp up...") select { @@ -786,7 +798,7 @@ func runDecommissionBenchLong( for tBegin := timeutil.Now(); timeutil.Since(tBegin) <= benchSpec.duration; { m.ExpectDeath() - err := runSingleDecommission(ctx, h, pinnedNode, &targetNodeAtomic, benchSpec.snapshotRate, + err := runSingleDecommission(ctx, h, pinnedNode, benchSpec.decommissionNode, &targetNodeAtomic, benchSpec.snapshotRate, benchSpec.whileDown, benchSpec.drainFirst, true /* reuse */, benchSpec.whileUpreplicating, true /* estimateDuration */, benchSpec.slowWrites, tickByName, ) @@ -827,12 +839,15 @@ func runSingleDecommission( ctx context.Context, h *decommTestHelper, pinnedNode int, + target int, targetLogicalNodeAtomic *uint32, snapshotRateMb int, stopFirst, drainFirst, reuse, noBalanceWait, estimateDuration, slowWrites bool, tickByName func(name string), ) error { - target := h.getRandNodeOtherThan(pinnedNode) + if target == 0 { + target = h.getRandNodeOtherThan(pinnedNode) + } targetLogicalNodeID, err := h.getLogicalNodeID(ctx, target) if err != nil { return err diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool.go b/pkg/kv/kvserver/allocator/storepool/store_pool.go index 2f693cbb92f0..bc1459ee8b1b 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool.go @@ -825,6 +825,23 @@ func (sp *StorePool) IsLive(storeID roachpb.StoreID) (bool, error) { return status == storeStatusAvailable, nil } +// IsStoreHealthy returns whether we believe this store can serve requests +// reliably. A healthy store can be used for follower snapshot transmission or +// follower reads. A healthy store does not imply that replicas can be moved to +// this store. +func (sp *StorePool) IsStoreHealthy(storeID roachpb.StoreID) bool { + status, err := sp.storeStatus(storeID, sp.NodeLivenessFn) + if err != nil { + return false + } + switch status { + case storeStatusAvailable, storeStatusDecommissioning, storeStatusDraining: + return true + default: + return false + } +} + func (sp *StorePool) storeStatus( storeID roachpb.StoreID, nl NodeLivenessFunc, ) (storeStatus, error) { @@ -1235,6 +1252,24 @@ func (sp *StorePool) GossipNodeIDAddress(nodeID roachpb.NodeID) (*util.Unresolve return sp.gossip.GetNodeIDAddress(nodeID) } +// GetLocalitiesPerReplica computes the localities for the provided replicas. +// It returns a map from the ReplicaDescriptor to the Locality of the Node. +func (sp *StorePool) GetLocalitiesPerReplica( + replicas ...roachpb.ReplicaDescriptor, +) map[roachpb.ReplicaID]roachpb.Locality { + sp.localitiesMu.RLock() + defer sp.localitiesMu.RUnlock() + localities := make(map[roachpb.ReplicaID]roachpb.Locality) + for _, replica := range replicas { + if locality, ok := sp.localitiesMu.nodeLocalities[replica.NodeID]; ok { + localities[replica.ReplicaID] = locality.locality + } else { + localities[replica.ReplicaID] = roachpb.Locality{} + } + } + return localities +} + // GetNodeLocalityString returns the locality information for the given node // in its string format. func (sp *StorePool) GetNodeLocalityString(nodeID roachpb.NodeID) string { diff --git a/pkg/kv/kvserver/client_raft_helpers_test.go b/pkg/kv/kvserver/client_raft_helpers_test.go index 84a1fff66b2d..4432598fe2a6 100644 --- a/pkg/kv/kvserver/client_raft_helpers_test.go +++ b/pkg/kv/kvserver/client_raft_helpers_test.go @@ -31,8 +31,9 @@ type unreliableRaftHandlerFuncs struct { dropReq func(*kvserverpb.RaftMessageRequest) bool dropHB func(*kvserverpb.RaftHeartbeat) bool dropResp func(*kvserverpb.RaftMessageResponse) bool - // snapErr defaults to returning nil. - snapErr func(*kvserverpb.SnapshotRequest_Header) error + // snapErr and delegateErr default to returning nil. + snapErr func(*kvserverpb.SnapshotRequest_Header) error + delegateErr func(request *kvserverpb.DelegateSnapshotRequest) error } func noopRaftHandlerFuncs() unreliableRaftHandlerFuncs { @@ -134,6 +135,17 @@ func (h *unreliableRaftHandler) HandleSnapshot( return h.RaftMessageHandler.HandleSnapshot(ctx, header, respStream) } +func (h *unreliableRaftHandler) HandleDelegatedSnapshot( + ctx context.Context, req *kvserverpb.DelegateSnapshotRequest, +) (*kvserverpb.DelegateSnapshotResponse, error) { + if req.RangeID == h.rangeID && h.delegateErr != nil { + if err := h.delegateErr(req); err != nil { + return nil, err + } + } + return h.RaftMessageHandler.HandleDelegatedSnapshot(ctx, req) +} + // testClusterStoreRaftMessageHandler exists to allows a store to be stopped and // restarted while maintaining a partition using an unreliableRaftHandler. type testClusterStoreRaftMessageHandler struct { @@ -181,15 +193,13 @@ func (h *testClusterStoreRaftMessageHandler) HandleSnapshot( } func (h *testClusterStoreRaftMessageHandler) HandleDelegatedSnapshot( - ctx context.Context, - req *kvserverpb.DelegateSnapshotRequest, - stream kvserver.DelegateSnapshotResponseStream, -) error { + ctx context.Context, req *kvserverpb.DelegateSnapshotRequest, +) (*kvserverpb.DelegateSnapshotResponse, error) { store, err := h.getStore() if err != nil { - return err + return nil, err } - return store.HandleDelegatedSnapshot(ctx, req, stream) + return store.HandleDelegatedSnapshot(ctx, req) } // testClusterPartitionedRange is a convenient abstraction to create a range on a node @@ -198,7 +208,7 @@ type testClusterPartitionedRange struct { rangeID roachpb.RangeID mu struct { syncutil.RWMutex - partitionedNode int + partitionedNodeIdx int partitioned bool partitionedReplicas map[roachpb.ReplicaID]bool } @@ -232,7 +242,7 @@ func setupPartitionedRange( tc *testcluster.TestCluster, rangeID roachpb.RangeID, replicaID roachpb.ReplicaID, - partitionedNode int, + partitionedNodeIdx int, activated bool, funcs unreliableRaftHandlerFuncs, ) (*testClusterPartitionedRange, error) { @@ -243,14 +253,14 @@ func setupPartitionedRange( storeIdx: i, }) } - return setupPartitionedRangeWithHandlers(tc, rangeID, replicaID, partitionedNode, activated, handlers, funcs) + return setupPartitionedRangeWithHandlers(tc, rangeID, replicaID, partitionedNodeIdx, activated, handlers, funcs) } func setupPartitionedRangeWithHandlers( tc *testcluster.TestCluster, rangeID roachpb.RangeID, replicaID roachpb.ReplicaID, - partitionedNode int, + partitionedNodeIdx int, activated bool, handlers []kvserver.RaftMessageHandler, funcs unreliableRaftHandlerFuncs, @@ -260,9 +270,9 @@ func setupPartitionedRangeWithHandlers( handlers: make([]kvserver.RaftMessageHandler, 0, len(handlers)), } pr.mu.partitioned = activated - pr.mu.partitionedNode = partitionedNode + pr.mu.partitionedNodeIdx = partitionedNodeIdx if replicaID == 0 { - ts := tc.Servers[partitionedNode] + ts := tc.Servers[partitionedNodeIdx] store, err := ts.Stores().GetStore(ts.GetFirstStoreID()) if err != nil { return nil, err @@ -294,8 +304,8 @@ func setupPartitionedRangeWithHandlers( pr.mu.RLock() defer pr.mu.RUnlock() return pr.mu.partitioned && - (s == pr.mu.partitionedNode || - req.FromReplica.StoreID == roachpb.StoreID(pr.mu.partitionedNode)+1) + (s == pr.mu.partitionedNodeIdx || + req.FromReplica.StoreID == roachpb.StoreID(pr.mu.partitionedNodeIdx)+1) } } if h.dropHB == nil { @@ -305,12 +315,21 @@ func setupPartitionedRangeWithHandlers( if !pr.mu.partitioned { return false } - if s == partitionedNode { + if s == partitionedNodeIdx { return true } return pr.mu.partitionedReplicas[hb.FromReplicaID] } } + if h.dropResp == nil { + h.dropResp = func(resp *kvserverpb.RaftMessageResponse) bool { + pr.mu.RLock() + defer pr.mu.RUnlock() + return pr.mu.partitioned && + (s == pr.mu.partitionedNodeIdx || + resp.FromReplica.StoreID == roachpb.StoreID(pr.mu.partitionedNodeIdx)+1) + } + } if h.snapErr == nil { h.snapErr = func(header *kvserverpb.SnapshotRequest_Header) error { pr.mu.RLock() @@ -324,6 +343,16 @@ func setupPartitionedRangeWithHandlers( return nil } } + if h.delegateErr == nil { + h.delegateErr = func(resp *kvserverpb.DelegateSnapshotRequest) error { + pr.mu.RLock() + defer pr.mu.RUnlock() + if pr.mu.partitionedReplicas[resp.DelegatedSender.ReplicaID] { + return errors.New("partitioned") + } + return nil + } + } pr.handlers = append(pr.handlers, h) tc.Servers[s].RaftTransport().Listen(tc.Target(s).StoreID, h) } diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 13c5e159c997..32a93f74fb46 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -3573,10 +3573,8 @@ func (errorChannelTestHandler) HandleSnapshot( } func (errorChannelTestHandler) HandleDelegatedSnapshot( - ctx context.Context, - req *kvserverpb.DelegateSnapshotRequest, - stream kvserver.DelegateSnapshotResponseStream, -) error { + ctx context.Context, req *kvserverpb.DelegateSnapshotRequest, +) (*kvserverpb.DelegateSnapshotResponse, error) { panic("unimplemented") } diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 54f9c7b3a95e..cdd6cba1d2a0 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -203,7 +203,7 @@ func (s *Store) ManualRaftSnapshot(repl *Replica, target roachpb.ReplicaID) erro // ReservationCount counts the number of outstanding reservations that are not // running. func (s *Store) ReservationCount() int { - return int(s.cfg.SnapshotApplyLimit) - s.snapshotApplyQueue.Len() + return int(s.cfg.SnapshotApplyLimit) - s.snapshotApplyQueue.AvailableLen() } // RaftSchedulerPriorityID returns the Raft scheduler's prioritized range. diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index 4ec5d0288aed..3bab2ab829db 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -14,7 +14,6 @@ option go_package = "kvserverpb"; import "errorspb/errors.proto"; import "roachpb/errors.proto"; -import "roachpb/internal_raft.proto"; import "roachpb/metadata.proto"; import "kv/kvserver/liveness/livenesspb/liveness.proto"; import "kv/kvserver/kvserverpb/state.proto"; @@ -221,7 +220,7 @@ message SnapshotRequest { Header header = 1; - // A RocksDB BatchRepr. Multiple kv_batches may be sent across multiple request messages. + // A BatchRepr. Multiple kv_batches may be sent across multiple request messages. bytes kv_batch = 2 [(gogoproto.customname) = "KVBatch"]; bool final = 4; @@ -272,23 +271,31 @@ message DelegateSnapshotRequest { roachpb.ReplicaDescriptor delegated_sender = 4 [(gogoproto.nullable) = false]; // The priority of the snapshot. + // TODO(abaptist): Remove this field for v23.1. SnapshotRequest.Priority priority = 5; - // The sending queue's name. - SnapshotRequest.QueueName sender_queue_name = 9; - - // The sending queue's priority. - double sender_queue_priority = 10; - // The type of the snapshot. + // TODO(abaptist): Remove this field for v23.1. SnapshotRequest.Type type = 6; // The Raft term of the coordinator (in most cases the leaseholder) replica. // The term is used during snapshot receiving to reject messages from an older term. uint64 term = 7; - // The truncated state of the Raft log on the coordinator replica. - roachpb.RaftTruncatedState truncated_state = 8; + // The first index of the Raft log on the coordinator replica. + uint64 first_index = 8; + + // The sending queue's name. + SnapshotRequest.QueueName sender_queue_name = 9; + + // The sending queue's priority. + double sender_queue_priority = 10; + + // The generation of the leaseholders descriptor. + uint64 descriptor_generation = 11 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.RangeGeneration"]; + + // Max queue length on the delegate before this request is rejected. + int64 queue_on_delegate_len = 12; } message DelegateSnapshotResponse { diff --git a/pkg/kv/kvserver/multiqueue/BUILD.bazel b/pkg/kv/kvserver/multiqueue/BUILD.bazel index 5c7a1642b937..14dbae432c80 100644 --- a/pkg/kv/kvserver/multiqueue/BUILD.bazel +++ b/pkg/kv/kvserver/multiqueue/BUILD.bazel @@ -8,14 +8,16 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/util/syncutil", + "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_redact//:redact", ], ) go_test( name = "multiqueue_test", + size = "small", srcs = ["multi_queue_test.go"], - args = ["-test.timeout=295s"], + args = ["-test.timeout=55s"], embed = [":multiqueue"], deps = [ "//pkg/testutils", diff --git a/pkg/kv/kvserver/multiqueue/multi_queue.go b/pkg/kv/kvserver/multiqueue/multi_queue.go index 588c4556de46..8848612fc727 100644 --- a/pkg/kv/kvserver/multiqueue/multi_queue.go +++ b/pkg/kv/kvserver/multiqueue/multi_queue.go @@ -14,6 +14,7 @@ import ( "container/heap" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" ) @@ -145,10 +146,20 @@ func (m *MultiQueue) tryRunNextLocked() { // Add returns a Task that must be closed (calling m.Release(..)) to // release the Permit. The number of types is expected to // be relatively small and not be changing over time. -func (m *MultiQueue) Add(queueType int, priority float64) *Task { +func (m *MultiQueue) Add(queueType int, priority float64, maxQueueLength int64) (*Task, error) { m.mu.Lock() defer m.mu.Unlock() + // If there are remainingRuns we can run immediately, otherwise compute the + // queue length one we are added. If the queue is too long, return an error + // immediately so the caller doesn't have to wait. + if m.remainingRuns == 0 && maxQueueLength >= 0 { + currentLen := int64(m.queueLenLocked()) + if currentLen > maxQueueLength { + return nil, errors.Newf("queue is too long %d > %d", currentLen, maxQueueLength) + } + } + // The mutex starts locked, unlock it when we are ready to run. pos, ok := m.mapping[queueType] if !ok { @@ -169,7 +180,7 @@ func (m *MultiQueue) Add(queueType int, priority float64) *Task { // Once we are done adding a task, attempt to signal the next waiting task. m.tryRunNextLocked() - return &newTask + return &newTask, nil } // Cancel will cancel a Task that may not have started yet. This is useful if it @@ -221,11 +232,32 @@ func (m *MultiQueue) releaseLocked(permit *Permit) { m.tryRunNextLocked() } -// Len returns the number of additional tasks that can be added without -// queueing. This will return 0 if there is anything queued. This method should -// only be used for testing. -func (m *MultiQueue) Len() int { +// AvailableLen returns the number of additional tasks that can be added without +// queueing. This will return 0 if there is anything queued. +func (m *MultiQueue) AvailableLen() int { m.mu.Lock() defer m.mu.Unlock() return m.remainingRuns } + +// QueueLen returns the length of the queue if one more task is added. If this +// returns 0 then a task can be added and run without queueing. +// NB: The value returned is not a guarantee that queueing will not occur when +// the request is submitted. multiple calls to QueueLen could race. +func (m *MultiQueue) QueueLen() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.queueLenLocked() +} + +func (m *MultiQueue) queueLenLocked() int { + if m.remainingRuns > 0 { + return 0 + } + // Start counting from 1 since we will be the first in the queue if it gets added. + count := 1 + for i := 0; i < len(m.outstanding); i++ { + count += len(m.outstanding[i]) + } + return count - m.remainingRuns +} diff --git a/pkg/kv/kvserver/multiqueue/multi_queue_test.go b/pkg/kv/kvserver/multiqueue/multi_queue_test.go index 8e693a1e210f..4c50f4ac2753 100644 --- a/pkg/kv/kvserver/multiqueue/multi_queue_test.go +++ b/pkg/kv/kvserver/multiqueue/multi_queue_test.go @@ -37,10 +37,10 @@ func TestMultiQueueAddTwiceSameQueue(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) queue := NewMultiQueue(1) - blocker := queue.Add(0, 0) + blocker, _ := queue.Add(0, 0, -1) - chan1 := queue.Add(7, 1.0) - chan2 := queue.Add(7, 2.0) + chan1, _ := queue.Add(7, 1.0, -1) + chan2, _ := queue.Add(7, 2.0, -1) permit := <-blocker.GetWaitChan() queue.Release(permit) @@ -56,13 +56,13 @@ func TestMultiQueueTwoQueues(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) queue := NewMultiQueue(1) - blocker := queue.Add(0, 0) + blocker, _ := queue.Add(0, 0, -1) - a1 := queue.Add(5, 4.0) - a2 := queue.Add(5, 5.0) + a1, _ := queue.Add(5, 4.0, -1) + a2, _ := queue.Add(5, 5.0, -1) - b1 := queue.Add(6, 1.0) - b2 := queue.Add(6, 2.0) + b1, _ := queue.Add(6, 1.0, -1) + b2, _ := queue.Add(6, 2.0, -1) permit := <-blocker.GetWaitChan() queue.Release(permit) @@ -79,15 +79,15 @@ func TestMultiQueueComplex(t *testing.T) { defer log.Scope(t).Close(t) queue := NewMultiQueue(1) - blocker := queue.Add(0, 0) + blocker, _ := queue.Add(0, 0, -1) - a2 := queue.Add(1, 4.0) - b1 := queue.Add(2, 1.1) - b2 := queue.Add(2, 2.1) - c2 := queue.Add(3, 1.2) - c3 := queue.Add(3, 2.2) - a3 := queue.Add(1, 5.0) - b3 := queue.Add(2, 6.1) + a2, _ := queue.Add(1, 4.0, -1) + b1, _ := queue.Add(2, 1.1, -1) + b2, _ := queue.Add(2, 2.1, -1) + c2, _ := queue.Add(3, 1.2, -1) + c3, _ := queue.Add(3, 2.2, -1) + a3, _ := queue.Add(1, 5.0, -1) + b3, _ := queue.Add(2, 6.1, -1) permit := <-blocker.GetWaitChan() queue.Release(permit) @@ -99,15 +99,15 @@ func TestMultiQueueRemove(t *testing.T) { defer log.Scope(t).Close(t) queue := NewMultiQueue(1) - blocker := queue.Add(0, 0) + blocker, _ := queue.Add(0, 0, -1) - a2 := queue.Add(1, 4.0) - b1 := queue.Add(2, 1.1) - b2 := queue.Add(2, 2.1) - c2 := queue.Add(3, 1.2) - c3 := queue.Add(3, 2.2) - a3 := queue.Add(1, 5.0) - b3 := queue.Add(2, 6.1) + a2, _ := queue.Add(1, 4.0, -1) + b1, _ := queue.Add(2, 1.1, -1) + b2, _ := queue.Add(2, 2.1, -1) + c2, _ := queue.Add(3, 1.2, -1) + c3, _ := queue.Add(3, 2.2, -1) + a3, _ := queue.Add(1, 5.0, -1) + b3, _ := queue.Add(2, 6.1, -1) fmt.Println("Beginning cancel") @@ -125,7 +125,7 @@ func TestMultiQueueCancelOne(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) queue := NewMultiQueue(1) - task := queue.Add(1, 1) + task, _ := queue.Add(1, 1, -1) queue.Cancel(task) } @@ -139,12 +139,12 @@ func TestMultiQueueCancelInProgress(t *testing.T) { const b = 2 const c = 3 - a3 := queue.Add(a, 5.0) - a2 := queue.Add(a, 4.0) - b1 := queue.Add(b, 1.1) - b2 := queue.Add(b, 2.1) - c3 := queue.Add(c, 2.2) - b3 := queue.Add(b, 6.1) + a3, _ := queue.Add(a, 5.0, -1) + a2, _ := queue.Add(a, 4.0, -1) + b1, _ := queue.Add(b, 1.1, -1) + b2, _ := queue.Add(b, 2.1, -1) + c3, _ := queue.Add(c, 2.2, -1) + b3, _ := queue.Add(b, 6.1, -1) queue.Cancel(b2) queue.Cancel(b1) @@ -237,7 +237,7 @@ func TestMultiQueueStress(t *testing.T) { for i := 0; i < numThreads; i++ { go func(name int) { for j := 0; j < numRequests; j++ { - curTask := queue.Add(name, float64(j)) + curTask, _ := queue.Add(name, float64(j), -1) if alsoCancel && j%99 == 0 { queue.Cancel(curTask) } else { @@ -271,7 +271,7 @@ func TestMultiQueueReleaseTwice(t *testing.T) { queue := NewMultiQueue(1) - task := queue.Add(1, 1) + task, _ := queue.Add(1, 1, -1) p := <-task.GetWaitChan() queue.Release(p) require.Panics(t, func() { queue.Release(p) }) @@ -283,7 +283,7 @@ func TestMultiQueueReleaseAfterCancel(t *testing.T) { queue := NewMultiQueue(1) - task := queue.Add(1, 1) + task, _ := queue.Add(1, 1, -1) p := <-task.GetWaitChan() queue.Cancel(task) queue.Release(p) @@ -294,31 +294,84 @@ func TestMultiQueueLen(t *testing.T) { defer log.Scope(t).Close(t) queue := NewMultiQueue(2) - require.Equal(t, 2, queue.Len()) - - task1 := queue.Add(1, 1) - require.Equal(t, 1, queue.Len()) - task2 := queue.Add(1, 1) - require.Equal(t, 0, queue.Len()) - task3 := queue.Add(1, 1) - require.Equal(t, 0, queue.Len()) + require.Equal(t, 2, queue.AvailableLen()) + require.Equal(t, 0, queue.QueueLen()) + + task1, _ := queue.Add(1, 1, -1) + require.Equal(t, 1, queue.AvailableLen()) + require.Equal(t, 0, queue.QueueLen()) + task2, _ := queue.Add(1, 1, -1) + require.Equal(t, 0, queue.AvailableLen()) + require.Equal(t, 1, queue.QueueLen()) + task3, _ := queue.Add(1, 1, -1) + require.Equal(t, 0, queue.AvailableLen()) + require.Equal(t, 2, queue.QueueLen()) queue.Cancel(task1) // Finish task 1, but immediately start task3. - require.Equal(t, 0, queue.Len()) + require.Equal(t, 0, queue.AvailableLen()) + require.Equal(t, 1, queue.QueueLen()) p := <-task2.GetWaitChan() queue.Release(p) - require.Equal(t, 1, queue.Len()) + require.Equal(t, 1, queue.AvailableLen()) + require.Equal(t, 0, queue.QueueLen()) queue.Cancel(task3) - require.Equal(t, 2, queue.Len()) + require.Equal(t, 2, queue.AvailableLen()) + require.Equal(t, 0, queue.QueueLen()) +} + +func TestMultiQueueFull(t *testing.T) { + queue := NewMultiQueue(2) + require.Equal(t, 2, queue.AvailableLen()) + require.Equal(t, 0, queue.QueueLen()) + + // Task 1 starts immediately since there is no queue. + task1, err := queue.Add(1, 1, 0) + require.NoError(t, err) + require.Equal(t, 1, queue.AvailableLen()) + require.Equal(t, 0, queue.QueueLen()) + // Task 2 also starts immediately as the queue supports 2 concurrent. + task2, err := queue.Add(1, 1, 0) + require.NoError(t, err) + require.Equal(t, 0, queue.AvailableLen()) + require.Equal(t, 1, queue.QueueLen()) + // Task 3 would be queued so should not be added. + task3, err := queue.Add(1, 1, 0) + require.Error(t, err) + require.Nil(t, task3) + require.Equal(t, 0, queue.AvailableLen()) + require.Equal(t, 1, queue.QueueLen()) + // Task 4 uses a longer max queue length so should be added. + task4, err := queue.Add(1, 1, 1) + require.NoError(t, err) + require.Equal(t, 0, queue.AvailableLen()) + require.Equal(t, 2, queue.QueueLen()) + + queue.Cancel(task1) + queue.Cancel(task2) + require.Equal(t, 1, queue.AvailableLen()) + require.Equal(t, 0, queue.QueueLen()) + // After these tasks are done, make sure we can add another one. + task5, err := queue.Add(1, 1, 0) + require.NoError(t, err) + require.Equal(t, 0, queue.AvailableLen()) + require.Equal(t, 1, queue.QueueLen()) + + // Cancel all the remaining tasks. + queue.Cancel(task4) + queue.Cancel(task5) } // verifyOrder makes sure that the chans are called in the specified order. func verifyOrder(t *testing.T, queue *MultiQueue, tasks ...*Task) { // each time, verify that the only available channel is the "next" one in order for i, task := range tasks { + if task == nil { + require.Fail(t, "Task is nil", "%d", task) + } + var found *Permit for j, t2 := range tasks[i+1:] { select { diff --git a/pkg/kv/kvserver/raft_snapshot_queue.go b/pkg/kv/kvserver/raft_snapshot_queue.go index 0ba36ab6c487..f8dcdeac6549 100644 --- a/pkg/kv/kvserver/raft_snapshot_queue.go +++ b/pkg/kv/kvserver/raft_snapshot_queue.go @@ -142,7 +142,7 @@ func (rq *raftSnapshotQueue) processRaftSnapshot( } } - err := repl.sendSnapshot(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY, kvserverpb.SnapshotRequest_RAFT_SNAPSHOT_QUEUE, raftSnapshotPriority) + err := repl.sendSnapshotUsingDelegate(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY, kvserverpb.SnapshotRequest_RAFT_SNAPSHOT_QUEUE, raftSnapshotPriority) // NB: if the snapshot fails because of an overlapping replica on the // recipient which is also waiting for a snapshot, the "smart" thing is to diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 283643e551c1..818ab128d369 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -98,13 +98,6 @@ type SnapshotResponseStream interface { Recv() (*kvserverpb.SnapshotRequest, error) } -// DelegateSnapshotResponseStream is the subset of the -// MultiRaft_RaftSnapshotServer interface that is needed for sending delegated responses. -type DelegateSnapshotResponseStream interface { - Send(request *kvserverpb.DelegateSnapshotResponse) error - Recv() (*kvserverpb.DelegateSnapshotRequest, error) -} - // RaftMessageHandler is the interface that must be implemented by // arguments to RaftTransport.Listen. type RaftMessageHandler interface { @@ -133,8 +126,7 @@ type RaftMessageHandler interface { HandleDelegatedSnapshot( ctx context.Context, req *kvserverpb.DelegateSnapshotRequest, - stream DelegateSnapshotResponseStream, - ) error + ) (*kvserverpb.DelegateSnapshotResponse, error) } // RaftTransport handles the rpc messages for raft. @@ -320,21 +312,12 @@ func (t *RaftTransport) RaftMessageBatch(stream MultiRaft_RaftMessageBatchServer // DelegateRaftSnapshot handles incoming delegated snapshot requests and passes // the request to pass off to the sender store. Errors during the snapshots // process are sent back as a response. -func (t *RaftTransport) DelegateRaftSnapshot(stream MultiRaft_DelegateRaftSnapshotServer) error { - ctx, cancel := t.stopper.WithCancelOnQuiesce(stream.Context()) - defer cancel() - req, err := stream.Recv() - if err != nil { - return err - } - // Check to ensure the header is valid. +func (t *RaftTransport) DelegateRaftSnapshot( + ctx context.Context, req *kvserverpb.DelegateSnapshotRequest, +) (*kvserverpb.DelegateSnapshotResponse, error) { if req == nil { err := errors.New("client error: no message in first delegated snapshot request") - return stream.Send( - &kvserverpb.DelegateSnapshotResponse{ - SnapResponse: snapRespErr(err), - }, - ) + return &kvserverpb.DelegateSnapshotResponse{SnapResponse: snapRespErr(err)}, nil } // Get the handler of the sender store. handler, ok := t.getHandler(req.DelegatedSender.StoreID) @@ -346,11 +329,12 @@ func (t *RaftTransport) DelegateRaftSnapshot(stream MultiRaft_DelegateRaftSnapsh req.CoordinatorReplica.StoreID, req.DelegatedSender.StoreID, ) - return roachpb.NewStoreNotFoundError(req.DelegatedSender.StoreID) + err := errors.New("unable to accept Raft message: no handler registered for the sender store") + return &kvserverpb.DelegateSnapshotResponse{SnapResponse: snapRespErr(err)}, nil } // Pass off the snapshot request to the sender store. - return handler.HandleDelegatedSnapshot(ctx, req, stream) + return handler.HandleDelegatedSnapshot(ctx, req) } // RaftSnapshot handles incoming streaming snapshot requests. @@ -640,8 +624,8 @@ func (t *RaftTransport) SendSnapshot( return sendSnapshot(ctx, t.st, t.tracer, stream, storePool, header, snap, newBatch, sent, recordBytesSent) } -// DelegateSnapshot creates a rpc stream between the leaseholder and the -// new designated sender for delegated snapshot requests. +// DelegateSnapshot sends a DelegateSnapshotRequest to a remote store +// and determines if it encountered any errors when sending the snapshot. func (t *RaftTransport) DelegateSnapshot( ctx context.Context, req *kvserverpb.DelegateSnapshotRequest, ) error { @@ -653,14 +637,25 @@ func (t *RaftTransport) DelegateSnapshot( client := NewMultiRaftClient(conn) // Creates a rpc stream between the leaseholder and sender. - stream, err := client.DelegateRaftSnapshot(ctx) + resp, err := client.DelegateRaftSnapshot(ctx, req) if err != nil { return errors.Mark(err, errMarkSnapshotError) } - defer func() { - if err := stream.CloseSend(); err != nil { - log.Warningf(ctx, "failed to close snapshot stream: %+v", err) + + // Import the remotely collected spans, if any. + if len(resp.CollectedSpans) != 0 { + span := tracing.SpanFromContext(ctx) + if span == nil { + log.Warningf(ctx, "trying to ingest remote spans but there is no recording span set up") + } else { + span.ImportRemoteRecording(resp.CollectedSpans) } - }() - return delegateSnapshot(ctx, stream, req) + } + // On a delegated request, we only see the fully applied result. + if resp.SnapResponse.Status != kvserverpb.SnapshotResponse_APPLIED { + log.VEventf(ctx, 2, "%s", resp.SnapResponse) + return errors.Mark(errors.Errorf("%s: error: %s", req.DelegatedSender, resp.SnapResponse), errMarkSnapshotError) + } + log.VEventf(ctx, 2, "%s: delegated snapshot was successfully applied", req.DelegatedSender) + return nil } diff --git a/pkg/kv/kvserver/raft_transport_test.go b/pkg/kv/kvserver/raft_transport_test.go index 7cbb12d95177..638a87d9e0dd 100644 --- a/pkg/kv/kvserver/raft_transport_test.go +++ b/pkg/kv/kvserver/raft_transport_test.go @@ -97,11 +97,9 @@ func (s channelServer) HandleSnapshot( } func (s channelServer) HandleDelegatedSnapshot( - ctx context.Context, - req *kvserverpb.DelegateSnapshotRequest, - stream kvserver.DelegateSnapshotResponseStream, -) error { - panic("unimplemented") + ctx context.Context, req *kvserverpb.DelegateSnapshotRequest, +) (*kvserverpb.DelegateSnapshotResponse, error) { + panic("unexpected HandleDelegatedSnapshot") } // raftTransportTestContext contains objects needed to test RaftTransport. diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 7ef521c1993f..79331ecf23ce 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -1824,7 +1824,7 @@ func (r *Replica) initializeRaftLearners( // orphaned learner. Second, this tickled some bugs in etcd/raft around // switching between StateSnapshot and StateProbe. Even if we worked through // these, it would be susceptible to future similar issues. - if err := r.sendSnapshot( + if err := r.sendSnapshotUsingDelegate( ctx, rDesc, kvserverpb.SnapshotRequest_INITIAL, priority, senderName, senderQueuePriority, ); err != nil { return nil, err @@ -2517,34 +2517,145 @@ func recordRangeEventsInLog( return nil } -// getSenderReplica returns a replica descriptor for a follower replica to act as -// the sender for snapshots. -// TODO(amy): select a follower based on locality matching. -func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescriptor, error) { - log.Fatal(ctx, "follower snapshots not implemented") - return r.GetReplicaDescriptor() +// getSenderReplicas returns an ordered list of replica descriptor for a +// follower replica to act as the sender for delegated snapshots. The replicas +// should be tried in order, and typically the coordinator is the last entry on +// the list. +func (r *Replica) getSenderReplicas( + ctx context.Context, recipient roachpb.ReplicaDescriptor, +) ([]roachpb.ReplicaDescriptor, error) { + + coordinator, err := r.GetReplicaDescriptor() + if err != nil { + // If there is no local replica descriptor, return an empty list. + return nil, err + } + + // Unless all nodes are on V23.1, don't delegate. This prevents sending to a + // node that doesn't understand the request. + if !r.store.ClusterSettings().Version.IsActive(ctx, clusterversion.V23_1) { + return []roachpb.ReplicaDescriptor{coordinator}, nil + } + + // Check follower snapshots, if zero just self-delegate. + numFollowers := int(NumDelegateLimit.Get(&r.ClusterSettings().SV)) + if numFollowers == 0 { + return []roachpb.ReplicaDescriptor{coordinator}, nil + } + + // Get range descriptor and store pool. + storePool := r.store.cfg.StorePool + rangeDesc := r.Desc() + + if fn := r.store.cfg.TestingKnobs.SelectDelegateSnapshotSender; fn != nil { + sender := fn(rangeDesc) + // If a TestingKnob is specified use it whatever it is. + if sender != nil { + return sender, nil + } + } + + // Include voter and non-voter replicas on healthy stores as candidates. + nonRecipientReplicas := rangeDesc.Replicas().Filter( + func(rDesc roachpb.ReplicaDescriptor) bool { + return rDesc.ReplicaID != recipient.ReplicaID && storePool.IsStoreHealthy(rDesc.StoreID) + }, + ) + candidates := nonRecipientReplicas.VoterAndNonVoterDescriptors() + if len(candidates) == 0 { + // Not clear when the coordinator would be considered dead, but if it does + // happen, just return the coordinator. + return []roachpb.ReplicaDescriptor{coordinator}, nil + } + + // Get the localities of the candidate replicas, including the original sender. + localities := storePool.GetLocalitiesPerReplica(candidates...) + recipientLocality := storePool.GetLocalitiesPerReplica(recipient)[recipient.ReplicaID] + + // Construct a map from replica to its diversity score compared to the + // recipient. Also track the best score we see. + replicaDistance := make(map[roachpb.ReplicaID]float64, len(localities)) + closestStore := roachpb.MaxDiversityScore + for desc, locality := range localities { + score := recipientLocality.DiversityScore(locality) + if score < closestStore { + closestStore = score + } + replicaDistance[desc] = score + } + + // Find all replicas that tie as the most optimal sender other than the + // coordinator. The coordinator will always be added to the end of the list + // regardless of score. + var tiedReplicas []roachpb.ReplicaID + for desc, score := range replicaDistance { + if score == closestStore { + // If the coordinator is tied for closest, always use it. + // TODO(baptist): Consider using other replicas at the same distance once + // this is integrated with admission control. + if desc == coordinator.ReplicaID { + return []roachpb.ReplicaDescriptor{coordinator}, nil + } + tiedReplicas = append(tiedReplicas, desc) + } + } + + // Use a psuedo random source that is consistent across runs of this method + // for the same coordinator. Shuffle the replicas to prevent always choosing + // them in the same order. + pRand := rand.New(rand.NewSource(int64(coordinator.ReplicaID))) + pRand.Shuffle(len(tiedReplicas), func(i, j int) { tiedReplicas[i], tiedReplicas[j] = tiedReplicas[j], tiedReplicas[i] }) + + // Only keep the top numFollowers replicas. + if len(tiedReplicas) > numFollowers { + tiedReplicas = tiedReplicas[0:numFollowers] + } + + // Convert to replica descriptors before returning. The list of tiedReplicas + // is typically only one element. + replicaList := make([]roachpb.ReplicaDescriptor, len(tiedReplicas)+1) + for n, replicaId := range tiedReplicas { + found := false + for _, desc := range rangeDesc.Replicas().Descriptors() { + if replicaId == desc.ReplicaID { + replicaList[n] = desc + found = true + break + } + } + if !found { + return nil, errors.Errorf("unable to find replica for replicaId %d", replicaId) + } + } + // Set the last replica to be the coordinator. + replicaList[len(replicaList)-1] = coordinator + return replicaList, nil } -// TODO(amy): update description when patch for follower snapshots are completed. -// sendSnapshot sends a snapshot of the replica state to the specified replica. -// Currently only invoked from replicateQueue and raftSnapshotQueue. Be careful -// about adding additional calls as generating a snapshot is moderately -// expensive. +// sendSnapshotUsingDelegate sends a snapshot of the replica state to the specified +// replica through a delegate. Currently, only invoked from replicateQueue and +// raftSnapshotQueue. Be careful about adding additional calls as generating a +// snapshot is moderately expensive. // // A snapshot is a bulk transfer of all data in a range. It consists of a // consistent view of all the state needed to run some replica of a range as of -// some applied index (not as of some mvcc-time). Snapshots are used by Raft -// when a follower is far enough behind the leader that it can no longer be -// caught up using incremental diffs (because the leader has already garbage -// collected the diffs, in this case because it truncated the Raft log past -// where the follower is). +// some applied index (not as of some mvcc-time). There are two primary cases +// when a Snapshot is used. +// +// The first case is use by Raft when a voter or non-voter follower is far +// enough behind the leader that it can no longer be caught up using incremental +// diffs. This occurs because the leader has already garbage collected diffs +// past where the follower is. The quota pool is responsible for keeping a +// leader from getting too far ahead of any of the followers, so normally +// followers don't need a snapshot, however there are a number of cases where +// this can happen (restarts, paused followers, ...). // -// We also proactively send a snapshot when adding a new replica to bootstrap it +// The second case is adding a new replica to a replica set, to bootstrap it // (this is called a "learner" snapshot and is a special case of a Raft // snapshot, we just speed the process along). It's called a learner snapshot -// because it's sent to what Raft terms a learner replica. As of 19.2, when we -// add a new replica, it's first added as a learner using a Raft ConfChange, -// which means it accepts Raft traffic but doesn't vote or affect quorum. Then +// because it's sent to what Raft terms a learner replica. When we +// add a new replica, it's first added as a learner using a Raft ConfChange. +// A learner accepts Raft traffic but doesn't vote or affect quorum. Then // we immediately send it a snapshot to catch it up. After the snapshot // successfully applies, we turn it into a normal voting replica using another // ConfChange. It then uses the normal mechanisms to catch up with whatever got @@ -2552,13 +2663,20 @@ func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescript // the voting replica directly, this avoids a period of fragility when the // replica would be a full member, but very far behind. // -// Snapshots are expensive and mostly unexpected (except learner snapshots -// during rebalancing). The quota pool is responsible for keeping a leader from -// getting too far ahead of any of the followers, so ideally they'd never be far -// enough behind to need a snapshot. +// The snapshot process itself is broken into 4 parts: delegating the request, +// generating the snapshot, transmitting it, and applying it. // -// The snapshot process itself is broken into 3 parts: generating the snapshot, -// transmitting it, and applying it. +// Delegating the request: Since a snapshot is expensive to transfer from a +// network, CPU and IO perspective, the coordinator attempts to delegate the +// request to a healthy delegate who is both closer to the final destination. +// This is done by sending a DelegateSnapshotRequest to a replica. The replica +// can either reject the delegation request or process it. It will reject if it +// is too far behind, unhealthy, or has too long of a queue of snapshots to +// send. If the delegate accepts the delegation request, then the remaining +// three steps occur on that delegate. If the delegate does not decide to +// process the request, it sends an error back to the coordinator and the +// coordinator either chooses a different delegate or itself as the "delegate of +// last resort". // // Generating the snapshot: The data contained in a snapshot is a full copy of // the replicated data plus everything the replica needs to be a healthy member @@ -2566,7 +2684,7 @@ func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescript // instead of keeping it all in memory at once. The `(Replica).GetSnapshot` // method does the necessary locking and gathers the various Raft state needed // to run a replica. It also creates an iterator for the range's data as it -// looked under those locks (this is powered by a RocksDB snapshot, which is a +// looked under those locks (this is powered by a Pebble snapshot, which is a // different thing but a similar idea). Notably, GetSnapshot does not do the // data iteration. // @@ -2589,7 +2707,7 @@ func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescript // returns true, this is communicated back to the sender, which then proceeds to // call `kvBatchSnapshotStrategy.Send`. This uses the iterator captured earlier // to send the data in chunks, each chunk a streaming grpc message. The sender -// then sends a final message with an indicaton that it's done and blocks again, +// then sends a final message with an indication that it's done and blocks again, // waiting for a second and final response from the recipient which indicates if // the snapshot was a success. // @@ -2601,7 +2719,6 @@ func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescript // RocksDB. Each of the SSTs also has a range deletion tombstone to delete the // existing data in the range. // - // Applying the snapshot: After the recipient has received the message // indicating it has all the data, it hands it all to // `(Store).processRaftSnapshotRequest` to be applied. First, this re-checks @@ -2637,7 +2754,7 @@ func (r *Replica) getSenderReplica(ctx context.Context) (roachpb.ReplicaDescript // callers of `shouldAcceptSnapshotData` return an error so that we no longer // have to worry about racing with a second snapshot. See the comment on // ReplicaPlaceholder for details. -func (r *Replica) sendSnapshot( +func (r *Replica) sendSnapshotUsingDelegate( ctx context.Context, recipient roachpb.ReplicaDescriptor, snapType kvserverpb.SnapshotRequest_Type, @@ -2671,12 +2788,16 @@ func (r *Replica) sendSnapshot( ) } - // Check follower snapshots cluster setting. - if followerSnapshotsEnabled.Get(&r.ClusterSettings().SV) { - sender, err = r.getSenderReplica(ctx) - if err != nil { - return err - } + log.VEventf( + ctx, 2, "delegating snapshot transmission for %v to %v", recipient, sender, + ) + + status := r.RaftStatus() + if status == nil { + // This code path is sometimes hit during scatter for replicas that + // haven't woken up yet. + retErr = &benignError{errors.Wrap(errMarkSnapshotError, "raft status not initialized")} + return } // Don't send a queue name or priority if the receiver may not understand @@ -2689,51 +2810,182 @@ func (r *Replica) sendSnapshot( senderQueuePriority = 0 } - log.VEventf( - ctx, 2, "delegating snapshot transmission for %v to %v", recipient, sender, - ) - status := r.RaftStatus() + // Create new delegate snapshot request without specifying the delegate sender. + delegateRequest := &kvserverpb.DelegateSnapshotRequest{ + RangeID: r.RangeID, + CoordinatorReplica: sender, + RecipientReplica: recipient, + Priority: priority, + SenderQueueName: senderQueueName, + SenderQueuePriority: senderQueuePriority, + Type: snapType, + Term: status.Term, + DelegatedSender: sender, + FirstIndex: r.GetFirstIndex(), + DescriptorGeneration: r.Desc().Generation, + QueueOnDelegateLen: MaxQueueOnDelegateLimit.Get(&r.ClusterSettings().SV), + } + + // Get the list of senders in order. + senders, err := r.getSenderReplicas(ctx, recipient) + if err != nil { + return err + } + + if len(senders) == 0 { + return errors.Errorf("no sender found to send a snapshot from for %v", r) + } + + for n, sender := range senders { + delegateRequest.DelegatedSender = sender + log.VEventf( + ctx, 2, "delegating snapshot transmission attempt %v for %v to %v", n, recipient, sender, + ) + + // On the last attempt, always queue on the delegate to time out naturally. + if n == len(senders)-1 { + delegateRequest.QueueOnDelegateLen = -1 + } + + retErr = contextutil.RunWithTimeout( + ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { + // Sending snapshot + return r.store.cfg.Transport.DelegateSnapshot(ctx, delegateRequest) + }, + ) + // Return once we have the first success. + if retErr == nil { + return + } else { + log.Warningf(ctx, "attempt %d: delegate snapshot %+v request failed %v", n, delegateRequest, err) + } + } + return +} + +// validateSnapshotDelegationRequest will validate that this replica can send +// the snapshot that the coordinator requested. The main reasons a request can't +// be delegated are if the Generation or Term of the replica is not equal to the +// Generation or Term of the coordinator's request or the applied index on this +// replica is behind the truncated index of the coordinator. Note that the request +// is validated twice, once before "queueing" and once after. This reduces the +// chance of false positives (snapshots which are sent but can't be used), +// however it is difficult to completely eliminate them. Between the time of sending the +// original request and the delegate processing it, the leaseholder could decide +// to truncate its index, change the leaseholder or split or merge the range. +func (r *Replica) validateSnapshotDelegationRequest( + ctx context.Context, req *kvserverpb.DelegateSnapshotRequest, +) error { + desc := r.Desc() + // If the generation has changed, this snapshot will be useless, so don't + // attempt to send it. + if desc.Generation != req.DescriptorGeneration { + log.VEventf(ctx, 2, + "%s: generation has changed since snapshot was generated %s != %s", + r, req.DescriptorGeneration, desc.Generation, + ) + return errors.Errorf( + "%s: generation has changed since snapshot was generated %s != %s", + r, req.DescriptorGeneration, desc.Generation, + ) + } + + // Check that the snapshot we generated has a descriptor that includes the + // recipient. If it doesn't, the recipient will reject it, so it's better to + // not send it in the first place. It's possible to hit this case if we're not + // the leaseholder, and we haven't yet applied the configuration change that's + // adding the recipient to the range, or we are the leaseholder but have + // removed the recipient between starting to send the snapshot and this point. + if _, ok := desc.GetReplicaDescriptorByID(req.RecipientReplica.ReplicaID); !ok { + // Recipient replica not found in the current range descriptor. + // The sender replica's descriptor may be lagging behind the coordinator's. + log.VEventf(ctx, 2, + "%s: couldn't find receiver replica %s in sender descriptor %s", + r, req.DescriptorGeneration, r.Desc(), + ) + return errors.Errorf( + "%s: couldn't find receiver replica %s in sender descriptor %s", + r, req.RecipientReplica, r.Desc(), + ) + } + + // Check the raft applied state index and term to determine if this replica + // is not too far behind the leaseholder. If the delegate is too far behind + // that is also needs a snapshot, then any snapshot it sends will be useless. + r.mu.RLock() + replIdx := r.mu.state.RaftAppliedIndex + 1 + + status := r.raftStatusRLocked() if status == nil { // This code path is sometimes hit during scatter for replicas that // haven't woken up yet. - return &benignError{errors.Wrap(errMarkSnapshotError, "raft status not initialized")} + return errors.Errorf("raft status not initialized") } + replTerm := status.Term + r.mu.RUnlock() - // Create new delegate snapshot request with only required metadata. - delegateRequest := &kvserverpb.DelegateSnapshotRequest{ - RangeID: r.RangeID, - CoordinatorReplica: sender, - RecipientReplica: recipient, - Priority: priority, - SenderQueueName: senderQueueName, - SenderQueuePriority: senderQueuePriority, - Type: snapType, - Term: status.Term, - DelegatedSender: sender, - } - err = contextutil.RunWithTimeout( - ctx, "delegate-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { - return r.store.cfg.Transport.DelegateSnapshot( - ctx, - delegateRequest, - ) - }, - ) - // Only mark explicitly as snapshot error (which is retriable) if we timed out. - // Otherwise, it's up to the remote to add this mark where appropriate. - if errors.HasType(err, (*contextutil.TimeoutError)(nil)) { - err = errors.Mark(err, errMarkSnapshotError) + // Delegate has a different term than the coordinator. This typically means + // the lease has been transferred, and we should not process this request. + // There is a potential race where the leaseholder sends a delegate request to + // itself and then the term changes before this request is sent. In that case + // this code path will not be checked and the snapshot will still be sent. + if replTerm != req.Term { + log.Infof( + ctx, + "sender: %v is not fit to send snapshot for %v; sender term: %v coordinator term: %v", + req.DelegatedSender, req.CoordinatorReplica, replTerm, req.Term, + ) + return errors.Errorf( + "sender: %v is not fit to send snapshot for %v; sender term: %v, coordinator term: %v", + req.DelegatedSender, req.CoordinatorReplica, replTerm, req.Term, + ) } - return err + + // Sender replica's snapshot will be rejected if the sender replica's raft + // applied index is lower than or equal to the truncated state on the + // leaseholder, as this replica's snapshot will be wasted. Note that it is + // possible that we can enforce strictly lesser than if etcd does not require + // previous raft log entries for appending. + if replIdx <= req.FirstIndex { + log.Infof( + ctx, "sender: %v is not fit to send snapshot;"+" sender first index: %v, "+ + "coordinator first index: %v", req.DelegatedSender, replIdx, req.FirstIndex, + ) + return errors.Mark(errors.Errorf( + "sender: %v is not fit to send snapshot due to needing snapshot", req.DelegatedSender, + ), errMarkSnapshotError) + } + return nil } -// followerSnapshotsEnabled is used to enable or disable follower snapshots. -var followerSnapshotsEnabled = func() *settings.BoolSetting { - s := settings.RegisterBoolSetting( +// NumDelegateLimit is used to control the number of delegate followers +// to use for snapshots. To disable follower snapshots, set this to 0. If +// enabled, the leaseholder / leader will attempt to find a closer delegate than +// itself to send the snapshot through. This can save on network bandwidth at a +// cost in some cases to snapshot send latency. +var NumDelegateLimit = func() *settings.IntSetting { + s := settings.RegisterIntSetting( settings.SystemOnly, - "kv.snapshot_delegation.enabled", - "set to true to allow snapshots from follower replicas", - false, + "kv.snapshot_delegation.num_follower", + "the number of delegates to try when sending snapshots, before falling back to sending from the leaseholder", + 1, + ) + s.SetVisibility(settings.Public) + return s +}() + +// MaxQueueOnDelegateLimit is used to control how long the outgoing snapshot +// queue can be before we reject delegation requests. Setting to -1 allows +// unlimited requests. The purpose of this setting is to prevent a long snapshot +// queue from delaying a delegated snapshot from being sent. Once the queue +// length is longer than the configured value, an additional delegation requests +// will be rejected with an error. +var MaxQueueOnDelegateLimit = func() *settings.IntSetting { + s := settings.RegisterIntSetting( + settings.SystemOnly, + "kv.snapshot_delegation.num_requests", + "how many queued requests are allowed on a delegate before the request is rejected", + 3, ) s.SetVisibility(settings.Public) return s @@ -2752,11 +3004,8 @@ var traceSnapshotThreshold = settings.RegisterDurationSetting( // snapshot from this replica. The entire process of generating and transmitting // the snapshot is handled, and errors are propagated back to the leaseholder. func (r *Replica) followerSendSnapshot( - ctx context.Context, - recipient roachpb.ReplicaDescriptor, - req *kvserverpb.DelegateSnapshotRequest, - stream DelegateSnapshotResponseStream, -) (retErr error) { + ctx context.Context, recipient roachpb.ReplicaDescriptor, req *kvserverpb.DelegateSnapshotRequest, +) error { ctx = r.AnnotateCtx(ctx) sendThreshold := traceSnapshotThreshold.Get(&r.ClusterSettings().SV) if sendThreshold > 0 { @@ -2776,55 +3025,49 @@ func (r *Replica) followerSendSnapshot( }() } - // TODO(amy): when delegating to different senders, check raft applied state - // to determine if this follower replica is fit to send. - // Acknowledge that the request has been accepted. - if err := stream.Send( - &kvserverpb.DelegateSnapshotResponse{ - SnapResponse: &kvserverpb.SnapshotResponse{ - Status: kvserverpb.SnapshotResponse_ACCEPTED, - }, - }, - ); err != nil { + // Check the validity conditions twice, once before and once after we obtain + // the send semaphore. We check after to make sure the snapshot request still + // makes sense (e.g the range hasn't split under us). The check is + // lightweight. Even if the second check succeeds, the snapshot we send might + // still not be usable due to the leaseholder making a change that we don't + // know about, but we try to minimize that possibility since snapshots are + // expensive to send. + err := r.validateSnapshotDelegationRequest(ctx, req) + if err != nil { return err } - // Throttle snapshot sending. + // Throttle snapshot sending. Obtain the send semaphore and determine the rate limit. rangeSize := r.GetMVCCStats().Total() cleanup, err := r.store.reserveSendSnapshot(ctx, req, rangeSize) if err != nil { - return err + return errors.Wrap(err, "Unable to reserve space for sending this snapshot") } defer cleanup() + // Check validity again, it is possible that the pending request should not be + // sent after we are doing waiting. + err = r.validateSnapshotDelegationRequest(ctx, req) + if err != nil { + return err + } + + // TODO: Review this to make sure that there isn't any inconsistency since we + // may be on a delegate. The GetSnapshot call will add a LogTruncation + // constraint, but it isn't clear that this will help. snapType := req.Type snap, err := r.GetSnapshot(ctx, snapType, recipient.StoreID) if err != nil { - err = errors.Wrapf(err, "%s: failed to generate %s snapshot", r, snapType) - return errors.Mark(err, errMarkSnapshotError) + return errors.Wrapf(err, "%s: failed to generate %s snapshot", r, snapType) } defer snap.Close() log.Event(ctx, "generated snapshot") - // Check that the snapshot we generated has a descriptor that includes the - // recipient. If it doesn't, the recipient will reject it, so it's better to - // not send it in the first place. It's possible to hit this case if we're not - // the leaseholder and we haven't yet applied the configuration change that's - // adding the recipient to the range. - if _, ok := snap.State.Desc.GetReplicaDescriptor(recipient.StoreID); !ok { - return errors.Wrapf( - errMarkSnapshotError, - "attempting to send snapshot that does not contain the recipient as a replica; "+ - "snapshot type: %s, recipient: s%d, desc: %s", snapType, recipient, snap.State.Desc, - ) - } - - // We avoid shipping over the past Raft log in the snapshot by changing - // the truncated state (we're allowed to -- it's an unreplicated key and not - // subject to mapping across replicas). The actual sending happens here: - _ = (*kvBatchSnapshotStrategy)(nil).Send - // and results in no log entries being sent at all. Note that - // Metadata.Index is really the applied index of the replica. + // We avoid shipping over the past Raft log in the snapshot by changing the + // truncated state (we're allowed to -- it's an unreplicated key and not + // subject to mapping across replicas). The actual sending happens in + // kvBatchSnapshotStrategy.Send and results in no log entries being sent at + // all. Note that Metadata.Index is really the applied index of the replica. snap.State.TruncatedState = &roachpb.RaftTruncatedState{ Index: snap.RaftSnap.Metadata.Index, Term: snap.RaftSnap.Metadata.Term, diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 635c01ae7b91..3a5aa145a98b 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -40,6 +41,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" @@ -271,6 +273,13 @@ func TestAddReplicaWithReceiverThrottling(t *testing.T) { ) defer tc.Stopper().Stop(ctx) + + // Disable delegating snapshots to different senders, which would otherwise + // fail this test as snapshots could queue on different stores. + settings := cluster.MakeTestingClusterSettings() + sv := &settings.SV + kvserver.NumDelegateLimit.Override(ctx, sv, 0) + scratch := tc.ScratchRange(t) replicationChange := make(chan error, 2) g := ctxgroup.WithContext(ctx) @@ -338,6 +347,197 @@ func TestAddReplicaWithReceiverThrottling(t *testing.T) { require.NoError(t, g.Wait()) } +// TestDelegateSnapshot verifies that the correct delegate is chosen when +// sending snapshots to stores. +// TODO: It is currently disabled because with raft snapshots sometimes being +// required and not going through snapshot delegation, the results are +// unpredictable. +func TestDelegateSnapshot(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + skip.UnderStress(t, "Occasionally fails until 87553 is resolved") + + ctx := context.Background() + + // Synchronize on the moment before the snapshot gets sent to measure the + // state at that time. + // FIXME: This should only require size 1, sometimes extra snapshots are sent. + // Solve this before merging. + requestChannel := make(chan *kvserverpb.DelegateSnapshotRequest, 10) + + setupFn := func(t *testing.T) ( + *testcluster.TestCluster, + roachpb.Key, + ) { + knobs, ltk := makeReplicationTestKnobs() + ltk.storeKnobs.DisableRaftSnapshotQueue = true + + ltk.storeKnobs.SendSnapshot = func(request *kvserverpb.DelegateSnapshotRequest) { + requestChannel <- request + } + + localityA := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "a"}}} + localityB := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "b"}}} + + localityServerArgs := make(map[int]base.TestServerArgs) + localityServerArgs[0] = base.TestServerArgs{Knobs: knobs, Locality: localityA} + localityServerArgs[1] = base.TestServerArgs{Knobs: knobs, Locality: localityA} + localityServerArgs[2] = base.TestServerArgs{Knobs: knobs, Locality: localityB} + localityServerArgs[3] = base.TestServerArgs{Knobs: knobs, Locality: localityB} + + tc := testcluster.StartTestCluster( + t, 4, base.TestClusterArgs{ + ServerArgsPerNode: localityServerArgs, + ReplicationMode: base.ReplicationManual, + }, + ) + scratchKey := tc.ScratchRange(t) + return tc, scratchKey + } + + tc, scratchKey := setupFn(t) + defer tc.Stopper().Stop(ctx) + + // Node 3 (loc B) can only get the data from node 1 as its the only one that has it. + _ = tc.AddVotersOrFatal(t, scratchKey, tc.Targets(2)...) + request := <-requestChannel + require.Equal(t, request.DelegatedSender.StoreID, roachpb.StoreID(1)) + // Drain the channel. Unfortunately there are occasionally spurious raft snapshots sent. + for len(requestChannel) > 0 { + <-requestChannel + } + + // Node 4 (loc B) should get the snapshot from node 3 as its the same locality. + _ = tc.AddVotersOrFatal(t, scratchKey, tc.Targets(3)...) + request = <-requestChannel + require.Equal(t, request.DelegatedSender.StoreID, roachpb.StoreID(3)) + for len(requestChannel) > 0 { + <-requestChannel + } + + // Node 2 (loc A) should get the snapshot from node 1 as it is the same locality. + _ = tc.AddVotersOrFatal(t, scratchKey, tc.Targets(1)...) + request = <-requestChannel + require.Equal(t, request.DelegatedSender.StoreID, roachpb.StoreID(1)) +} + +// TestDelegateSnapshotFails is a test that ensure we fail fast when the +// sender or receiver store crashes during delegated snapshot sending. +func TestDelegateSnapshotFails(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + var senders struct { + mu syncutil.Mutex + desc []roachpb.ReplicaDescriptor + } + + setupFn := func(t *testing.T) ( + *testcluster.TestCluster, + roachpb.Key, + ) { + senders.desc = nil + knobs, ltk := makeReplicationTestKnobs() + ltk.storeKnobs.ThrottleEmptySnapshots = true + + ltk.storeKnobs.SelectDelegateSnapshotSender = + func(descriptor *roachpb.RangeDescriptor) []roachpb.ReplicaDescriptor { + senders.mu.Lock() + defer senders.mu.Unlock() + return senders.desc + } + + tc := testcluster.StartTestCluster( + t, 4, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{Knobs: knobs}, + ReplicationMode: base.ReplicationManual, + }, + ) + + scratchKey := tc.ScratchRange(t) + return tc, scratchKey + } + + // Add a learner replica that will need a snapshot, kill the server + // the learner is on. Assert that the failure is detected and change replicas + // fails fast. + t.Run("receiver", func(t *testing.T) { + tc, scratchKey := setupFn(t) + defer tc.Stopper().Stop(ctx) + + desc, err := tc.LookupRange(scratchKey) + require.NoError(t, err, "Unable to lookup the range") + + _, err = setupPartitionedRange(tc, desc.RangeID, 0, 0, true, unreliableRaftHandlerFuncs{}) + require.NoError(t, err) + + _, err = tc.Servers[0].DB().AdminChangeReplicas( + ctx, scratchKey, desc, roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)), + ) + + require.True(t, testutils.IsError(err, "partitioned"), `expected partitioned error got: %+v`, err) + }) + + // Add a follower replica to act as the snapshot sender, and kill the server + // the sender is on. Assert that the failure is detected and change replicas + // fails fast. + t.Run("sender_no_fallback", func(t *testing.T) { + tc, scratchKey := setupFn(t) + defer tc.Stopper().Stop(ctx) + + // Add a replica that will be the delegated sender, and another so we have + // quorum with this node down + desc := tc.AddVotersOrFatal(t, scratchKey, tc.Targets(2, 3)...) + + replicaDesc, ok := desc.GetReplicaDescriptor(3) + require.True(t, ok) + // Always use node 3 (index 2) as the only delegate. + senders.mu.Lock() + senders.desc = append(senders.desc, replicaDesc) + senders.mu.Unlock() + + // Now stop accepting traffic to node 3 (index 2). + _, err := setupPartitionedRange(tc, desc.RangeID, 0, 2, true, unreliableRaftHandlerFuncs{}) + require.NoError(t, err) + + _, err = tc.Servers[0].DB().AdminChangeReplicas( + ctx, scratchKey, desc, roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)), + ) + log.Infof(ctx, "Err=%v", err) + require.True(t, testutils.IsError(err, "partitioned"), `expected partitioned error got: %+v`, err) + }) + + // Identical setup as the previous test, but allow a fallback to the leaseholder. + t.Run("sender_with_fallback", func(t *testing.T) { + tc, scratchKey := setupFn(t) + defer tc.Stopper().Stop(ctx) + + // Add a replica that will be the delegated sender, and another so we have + // quorum with this node down + desc := tc.AddVotersOrFatal(t, scratchKey, tc.Targets(2, 3)...) + + replicaDesc, ok := desc.GetReplicaDescriptor(3) + require.True(t, ok) + leaseholderDesc, ok := desc.GetReplicaDescriptor(1) + require.True(t, ok) + // First try to use node 3 (index 2) as the delegate, but fall back to the leaseholder on failure. + senders.mu.Lock() + senders.desc = append(senders.desc, replicaDesc) + senders.desc = append(senders.desc, leaseholderDesc) + senders.mu.Unlock() + + // Now stop accepting traffic to node 3 (index 2). + _, err := setupPartitionedRange(tc, desc.RangeID, 0, 2, true, unreliableRaftHandlerFuncs{}) + require.NoError(t, err) + + _, err = tc.Servers[0].DB().AdminChangeReplicas( + ctx, scratchKey, desc, roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)), + ) + require.NoError(t, err) + }) +} + func TestLearnerRaftConfState(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -502,7 +702,7 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) { // the state at that time & gather metrics. blockUntilSnapshotSendCh := make(chan struct{}) blockSnapshotSendCh := make(chan struct{}) - ltk.storeKnobs.SendSnapshot = func() { + ltk.storeKnobs.SendSnapshot = func(request *kvserverpb.DelegateSnapshotRequest) { close(blockUntilSnapshotSendCh) select { case <-blockSnapshotSendCh: @@ -1837,7 +2037,7 @@ func TestRebalancingSnapshotMetrics(t *testing.T) { // the state at that time. blockUntilSnapshotSendCh := make(chan struct{}) blockSnapshotSendCh := make(chan struct{}) - ltk.storeKnobs.SendSnapshot = func() { + ltk.storeKnobs.SendSnapshot = func(request *kvserverpb.DelegateSnapshotRequest) { close(blockUntilSnapshotSendCh) select { case <-blockSnapshotSendCh: diff --git a/pkg/kv/kvserver/storage_services.proto b/pkg/kv/kvserver/storage_services.proto index a876b768dbeb..b20f737ab3a6 100644 --- a/pkg/kv/kvserver/storage_services.proto +++ b/pkg/kv/kvserver/storage_services.proto @@ -27,16 +27,10 @@ service MultiRaft { // ERROR, including any collected traces from processing. rpc RaftSnapshot (stream cockroach.kv.kvserver.kvserverpb.SnapshotRequest) returns (stream cockroach.kv.kvserver.kvserverpb.SnapshotResponse) {} // DelegateRaftSnapshot asks the server to send a range snapshot to a target - // (so the client delegates the sending of the snapshot to the server). The - // server responds in two phases. + // (so the client delegates the sending of the snapshot to the server). // - // TODO(nvanbenschoten): This RPC is bi-directional streaming (as opposed to - // only server-streaming) because of future aspirations; at the moment the - // request is unary. In the future, we wanted to pause all log truncation, - // then handshake with the delegated sender, then weaken log truncation - // protection to just below the index that the sender was sending the - // snapshot at. - rpc DelegateRaftSnapshot(stream cockroach.kv.kvserver.kvserverpb.DelegateSnapshotRequest) returns (stream cockroach.kv.kvserver.kvserverpb.DelegateSnapshotResponse) {} + // This sends a DelegateRequest to the chosen sender. + rpc DelegateRaftSnapshot(cockroach.kv.kvserver.kvserverpb.DelegateSnapshotRequest) returns (cockroach.kv.kvserver.kvserverpb.DelegateSnapshotResponse) {} } service PerReplica { diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 1615a2136ab5..20a6b6ec4ad7 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -160,41 +160,44 @@ func (qs *raftReceiveQueues) Delete(rangeID roachpb.RangeID) { // HandleDelegatedSnapshot reads the incoming delegated snapshot message and // throttles sending snapshots before passing the request to the sender replica. func (s *Store) HandleDelegatedSnapshot( - ctx context.Context, - req *kvserverpb.DelegateSnapshotRequest, - stream DelegateSnapshotResponseStream, -) error { + ctx context.Context, req *kvserverpb.DelegateSnapshotRequest, +) (*kvserverpb.DelegateSnapshotResponse, error) { ctx = s.AnnotateCtx(ctx) if fn := s.cfg.TestingKnobs.SendSnapshot; fn != nil { - fn() + fn(req) } + sp := tracing.SpanFromContext(ctx) + + // This can happen if the delegate doesn't know about the range yet. Return an + // error immediately. sender, err := s.GetReplica(req.RangeID) if err != nil { - return err + //nolint:returnerrcheck + return &kvserverpb.DelegateSnapshotResponse{ + SnapResponse: snapRespErr(err), + CollectedSpans: sp.GetConfiguredRecording(), + }, nil } - sp := tracing.SpanFromContext(ctx) // Pass the request to the sender replica. - if err := sender.followerSendSnapshot(ctx, req.RecipientReplica, req, stream); err != nil { - return stream.Send( - &kvserverpb.DelegateSnapshotResponse{ - SnapResponse: snapRespErr(err), - CollectedSpans: sp.GetConfiguredRecording(), - }, - ) + if err := sender.followerSendSnapshot(ctx, req.RecipientReplica, req); err != nil { + // If an error occurred during snapshot sending, send an error response. + //nolint:returnerrcheck + return &kvserverpb.DelegateSnapshotResponse{ + SnapResponse: snapRespErr(err), + CollectedSpans: sp.GetConfiguredRecording(), + }, nil } - resp := &kvserverpb.DelegateSnapshotResponse{ + return &kvserverpb.DelegateSnapshotResponse{ SnapResponse: &kvserverpb.SnapshotResponse{ Status: kvserverpb.SnapshotResponse_APPLIED, DeprecatedMessage: "Snapshot successfully applied by recipient", }, CollectedSpans: sp.GetConfiguredRecording(), - } - // Send a final response that snapshot sending is completed. - return stream.Send(resp) + }, nil } // HandleSnapshot reads an incoming streaming snapshot and applies it if diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index e3e07b07297b..a254576ae7f9 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -91,13 +91,6 @@ type outgoingSnapshotStream interface { Recv() (*kvserverpb.SnapshotResponse, error) } -// outgoingSnapshotStream is the minimal interface on a GRPC stream required -// to send a snapshot over the network. -type outgoingDelegatedStream interface { - Send(*kvserverpb.DelegateSnapshotRequest) error - Recv() (*kvserverpb.DelegateSnapshotResponse, error) -} - // snapshotRecordMetrics is a wrapper function that increments a set of metrics // related to the number of snapshot bytes sent/received. The definer of the // function specifies which metrics are incremented. @@ -674,7 +667,11 @@ func (kvSS *kvBatchSnapshotStrategy) Close(ctx context.Context) { } } -// reserveReceiveSnapshot throttles incoming snapshots. +// reserveReceiveSnapshot reserves space for this snapshot which will attempt to +// prevent overload of system resources as this snapshot is being sent. +// Snapshots are often sent in bulk (due to operations like store decommission) +// so it is necessary to prevent snapshot transfers from overly impacting +// foreground traffic. func (s *Store) reserveReceiveSnapshot( ctx context.Context, header *kvserverpb.SnapshotRequest_Header, ) (_cleanup func(), _err error) { @@ -683,8 +680,9 @@ func (s *Store) reserveReceiveSnapshot( return s.throttleSnapshot(ctx, s.snapshotApplyQueue, int(header.SenderQueueName), header.SenderQueuePriority, + -1, header.RangeSize, - header.RaftMessageRequest.RangeID, header.RaftMessageRequest.ToReplica.ReplicaID, + header.RaftMessageRequest.RangeID, s.metrics.RangeSnapshotRecvQueueLength, s.metrics.RangeSnapshotRecvInProgress, s.metrics.RangeSnapshotRecvTotalInProgress, ) @@ -701,9 +699,11 @@ func (s *Store) reserveSendSnapshot( } return s.throttleSnapshot(ctx, s.snapshotSendQueue, - int(req.SenderQueueName), req.SenderQueuePriority, + int(req.SenderQueueName), + req.SenderQueuePriority, + req.QueueOnDelegateLen, rangeSize, - req.RangeID, req.DelegatedSender.ReplicaID, + req.RangeID, s.metrics.RangeSnapshotSendQueueLength, s.metrics.RangeSnapshotSendInProgress, s.metrics.RangeSnapshotSendTotalInProgress, ) @@ -717,11 +717,12 @@ func (s *Store) throttleSnapshot( snapshotQueue *multiqueue.MultiQueue, requestSource int, requestPriority float64, + maxQueueLength int64, rangeSize int64, rangeID roachpb.RangeID, - replicaID roachpb.ReplicaID, waitingSnapshotMetric, inProgressSnapshotMetric, totalInProgressSnapshotMetric *metric.Gauge, -) (cleanup func(), err error) { +) (cleanup func(), funcErr error) { + tBegin := timeutil.Now() var permit *multiqueue.Permit // Empty snapshots are exempt from rate limits because they're so cheap to @@ -729,9 +730,14 @@ func (s *Store) throttleSnapshot( // RESTORE or manual SPLIT AT, since it prevents these empty snapshots from // getting stuck behind large snapshots managed by the replicate queue. if rangeSize != 0 || s.cfg.TestingKnobs.ThrottleEmptySnapshots { - task := snapshotQueue.Add(requestSource, requestPriority) + task, err := snapshotQueue.Add(requestSource, requestPriority, maxQueueLength) + if err != nil { + return nil, err + } + // After this point, the task is on the queue, so any future errors need to + // be handled by cancelling the task to release the permit. defer func() { - if err != nil { + if funcErr != nil { snapshotQueue.Cancel(task) } }() @@ -787,10 +793,9 @@ func (s *Store) throttleSnapshot( if elapsed > snapshotReservationWaitWarnThreshold && !buildutil.CrdbTestBuild { log.Infof( ctx, - "waited for %.1fs to acquire snapshot reservation to r%d/%d", + "waited for %.1fs to acquire snapshot reservation to r%d", elapsed.Seconds(), rangeID, - replicaID, ) } @@ -1127,7 +1132,7 @@ func maybeHandleDeprecatedSnapErr(deprecated bool, err error) error { return errors.Mark(err, errMarkSnapshotError) } -// SnapshotStorePool narrows StorePool to make sendSnapshot easier to test. +// SnapshotStorePool narrows StorePool to make sendSnapshotUsingDelegate easier to test. type SnapshotStorePool interface { Throttle(reason storepool.ThrottleReason, why string, toStoreID roachpb.StoreID) } @@ -1500,7 +1505,7 @@ func sendSnapshot( recordBytesSent snapshotRecordMetrics, ) error { if recordBytesSent == nil { - // NB: Some tests and an offline tool (ResetQuorum) call into `sendSnapshot` + // NB: Some tests and an offline tool (ResetQuorum) call into `sendSnapshotUsingDelegate` // with a nil metrics tracking function. We pass in a fake metrics tracking function here that isn't // hooked up to anything. recordBytesSent = func(inc int64) {} @@ -1625,76 +1630,3 @@ func sendSnapshot( ) } } - -// delegateSnapshot sends an outgoing delegated snapshot request via a -// pre-opened GRPC stream. It sends the delegated snapshot request to the -// sender and waits for confirmation that the snapshot has been applied. -func delegateSnapshot( - ctx context.Context, stream outgoingDelegatedStream, req *kvserverpb.DelegateSnapshotRequest, -) error { - - delegatedSender := req.DelegatedSender - if err := stream.Send(req); err != nil { - return err - } - // Wait for a response from the sender. - resp, err := stream.Recv() - if err != nil { - return err - } - switch resp.SnapResponse.Status { - case kvserverpb.SnapshotResponse_ERROR: - return errors.Wrapf( - maybeHandleDeprecatedSnapErr(resp.Error()), - "%s: sender couldn't accept %s", delegatedSender, req) - case kvserverpb.SnapshotResponse_ACCEPTED: - // The sender accepted the request, it will continue with sending. - log.VEventf( - ctx, 2, "sender %s accepted snapshot request %s", delegatedSender, - req, - ) - default: - err := errors.Errorf( - "%s: server sent an invalid status while negotiating %s: %s", - delegatedSender, req, resp.SnapResponse.Status, - ) - return err - } - - // Wait for response to see if the receiver successfully applied the snapshot. - resp, err = stream.Recv() - if err != nil { - return errors.Mark( - errors.Wrapf(err, "%s: remote failed to send snapshot", delegatedSender), errMarkSnapshotError, - ) - } - // Wait for EOF to ensure server side processing is complete. - if unexpectedResp, err := stream.Recv(); err != io.EOF { - if err != nil { - return errors.Mark(errors.Wrapf( - err, "%s: expected EOF, got resp=%v with error", - delegatedSender.StoreID, unexpectedResp), errMarkSnapshotError) - } - return errors.Mark(errors.Newf( - "%s: expected EOF, got resp=%v", delegatedSender.StoreID, - unexpectedResp), errMarkSnapshotError) - } - sp := tracing.SpanFromContext(ctx) - if sp != nil { - sp.ImportRemoteRecording(resp.CollectedSpans) - } - switch resp.SnapResponse.Status { - case kvserverpb.SnapshotResponse_ERROR: - return maybeHandleDeprecatedSnapErr(resp.Error()) - case kvserverpb.SnapshotResponse_APPLIED: - // This is the response we're expecting. Snapshot successfully applied. - log.VEventf(ctx, 2, "%s: delegated snapshot was successfully applied", delegatedSender) - return nil - default: - return errors.Errorf( - "%s: server sent an invalid status during finalization: %s", - delegatedSender, resp.SnapResponse.Status, - ) - } - -} diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 31b034ace867..b10a0a2fd6af 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -3011,7 +3011,7 @@ func (sp *fakeStorePool) Throttle( } // TestSendSnapshotThrottling tests the store pool throttling behavior of -// store.sendSnapshot, ensuring that it properly updates the StorePool on +// store.sendSnapshotUsingDelegate, ensuring that it properly updates the StorePool on // various exceptional conditions and new capacity estimates. func TestSendSnapshotThrottling(t *testing.T) { defer leaktest.AfterTest(t)() @@ -3082,21 +3082,35 @@ func TestSendSnapshotConcurrency(t *testing.T) { s := tc.store // Checking this now makes sure that if the defaults change this test will also. - require.Equal(t, 2, s.snapshotSendQueue.Len()) + require.Equal(t, 2, s.snapshotSendQueue.AvailableLen()) + require.Equal(t, 0, s.snapshotSendQueue.QueueLen()) cleanup1, err := s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSnapshotRequest{ SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE, SenderQueuePriority: 1, }, 1) - require.Nil(t, err) - require.Equal(t, 1, s.snapshotSendQueue.Len()) + require.NoError(t, err) cleanup2, err := s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSnapshotRequest{ SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE, SenderQueuePriority: 1, }, 1) - require.Nil(t, err) - require.Equal(t, 0, s.snapshotSendQueue.Len()) + require.NoError(t, err) + require.Equal(t, 0, s.snapshotSendQueue.AvailableLen()) + require.Equal(t, 1, s.snapshotSendQueue.QueueLen()) + // At this point both the first two tasks will be holding reservations and - // waiting for cleanup, a third task will block. + // waiting for cleanup, a third task will block or fail First send one with + // the queue length set to 0 - this will fail since the first tasks are still + // running. + _, err = s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSnapshotRequest{ + SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE, + SenderQueuePriority: 1, + QueueOnDelegateLen: 0, + }, 1) + require.Error(t, err) + require.Equal(t, 0, s.snapshotSendQueue.AvailableLen()) + require.Equal(t, 1, s.snapshotSendQueue.QueueLen()) + + // Now add a task that will wait indefinitely for another task to finish. var wg sync.WaitGroup wg.Add(2) go func() { @@ -3104,18 +3118,20 @@ func TestSendSnapshotConcurrency(t *testing.T) { cleanup3, err := s.reserveSendSnapshot(ctx, &kvserverpb.DelegateSnapshotRequest{ SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE, SenderQueuePriority: 1, + QueueOnDelegateLen: -1, }, 1) after := timeutil.Now() + require.NoError(t, err) + require.GreaterOrEqual(t, after.Sub(before), 10*time.Millisecond) cleanup3() wg.Done() - require.Nil(t, err) - require.GreaterOrEqual(t, after.Sub(before), 10*time.Millisecond) }() - // This task will not block for more than a few MS, but we want to wait for - // it to complete to make sure it frees the permit. + // Now add one that will queue up and wait for another task to finish. This + // task will not block for more than 8ms, but we want to wait for it to + // complete to make sure it frees the permit. go func() { - deadlineCtx, cancel := context.WithTimeout(ctx, 1*time.Millisecond) + deadlineCtx, cancel := context.WithTimeout(ctx, 20*time.Millisecond) defer cancel() // This will time out since the deadline is set artificially low. Make sure @@ -3123,19 +3139,25 @@ func TestSendSnapshotConcurrency(t *testing.T) { _, err := s.reserveSendSnapshot(deadlineCtx, &kvserverpb.DelegateSnapshotRequest{ SenderQueueName: kvserverpb.SnapshotRequest_REPLICATE_QUEUE, SenderQueuePriority: 1, + QueueOnDelegateLen: -1, }, 1) + require.Error(t, err) wg.Done() - require.NotNil(t, err) }() // Wait a little time before calling signaling the first two as complete. time.Sleep(100 * time.Millisecond) + require.Equal(t, 0, s.snapshotSendQueue.AvailableLen()) + // One remaining task are queued at this point. + require.Equal(t, 2, s.snapshotSendQueue.QueueLen()) + cleanup1() cleanup2() // Wait until all cleanup run before checking the number of permits. wg.Wait() - require.Equal(t, 2, s.snapshotSendQueue.Len()) + require.Equal(t, 2, s.snapshotSendQueue.AvailableLen()) + require.Equal(t, 0, s.snapshotSendQueue.QueueLen()) } func TestReserveSnapshotThrottling(t *testing.T) { diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 3835ee8a694f..f94753d30be2 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -284,7 +284,7 @@ type StoreTestingKnobs struct { // SendSnapshot is run after receiving a DelegateRaftSnapshot request but // before any throttling or sending logic. - SendSnapshot func() + SendSnapshot func(*kvserverpb.DelegateSnapshotRequest) // ReceiveSnapshot is run after receiving a snapshot header but before // acquiring snapshot quota or doing shouldAcceptSnapshotData checks. If an // error is returned from the hook, it's sent as an ERROR SnapshotResponse. @@ -431,6 +431,9 @@ type StoreTestingKnobs struct { // AfterSendSnapshotThrottle intercepts replicas after receiving a spot in the // send snapshot semaphore. AfterSendSnapshotThrottle func() + // SelectDelegateSnapshotSender returns an ordered list of replica which will + // be used as delegates for sending a snapshot. + SelectDelegateSnapshotSender func(*roachpb.RangeDescriptor) []roachpb.ReplicaDescriptor // This method, if set, gets to see (and mutate, if desired) any local // StoreDescriptor before it is being sent out on the Gossip network.