From 9ce46f304a9cd0dac143f6fd21ec7d8235ae72d4 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Thu, 21 Nov 2019 19:16:35 -0500 Subject: [PATCH 1/4] storage: make queue timeout a function rather than a constant The background storage queues each carry a timeout. This timeout seems like a good idea to unstick a potentially stuck queue. I'm not going to speculate as to why these queues might find themselves getting stuck but, let's just say, maybe it happens and maybe having a timeout is a good idea. Unfortunately sometimes these timeouts can come to bite us, especially when things are unexpectedly slow or data sizes are unexpectedly large. Failure to make progress before a timeout expires in queue processing is a common cause of long-term outages. In those cases it's good to have an escape hatch. Another concern on the horizon is the desire to have larger range sizes. Today our default queue processing timeout is 1m. The raft snapshot queue does not override this. The default snapshot rate is 8 MB/s. ``` (512MB / 8MB/s) = 64s > 1m ``` This unfortunate fact means that ranges larger than 512 MB can never successfully receive a snapshot from the raft snapshot queue. The next commit will utilize this fact by adding a cluster setting to control the timeout of the raft snapshot queue. This commit changes the constant per-queue timeout to a function which can consult both cluster settings and the Replica which is about to be processed. Release note: None. --- pkg/storage/queue.go | 15 +++++++++------ pkg/storage/queue_concurrency_test.go | 6 +++++- pkg/storage/queue_test.go | 6 +++--- pkg/storage/store_rebalancer.go | 4 ++-- 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/pkg/storage/queue.go b/pkg/storage/queue.go index afcf7e24efec..978a10e02c39 100644 --- a/pkg/storage/queue.go +++ b/pkg/storage/queue.go @@ -44,6 +44,10 @@ const ( defaultQueueMaxSize = 10000 ) +func defaultProcessTimeoutFunc(replicaInQueue) time.Duration { + return defaultProcessTimeout +} + // a purgatoryError indicates a replica processing failure which indicates // the replica can be placed into purgatory for faster retries when the // failure condition changes. @@ -253,8 +257,8 @@ type queueConfig struct { // processDestroyedReplicas controls whether or not we want to process replicas // that have been destroyed but not GCed. processDestroyedReplicas bool - // processTimeout is the timeout for processing a replica. - processTimeout time.Duration + // processTimeout returns the timeout for processing a replica. + processTimeoutFunc func(replicaInQueue) time.Duration // successes is a counter of replicas processed successfully. successes *metric.Counter // failures is a counter of replicas which failed processing. @@ -374,8 +378,8 @@ func newBaseQueue( name string, impl queueImpl, store *Store, gossip *gossip.Gossip, cfg queueConfig, ) *baseQueue { // Use the default process timeout if none specified. - if cfg.processTimeout == 0 { - cfg.processTimeout = defaultProcessTimeout + if cfg.processTimeoutFunc == nil { + cfg.processTimeoutFunc = defaultProcessTimeoutFunc } if cfg.maxConcurrency == 0 { cfg.maxConcurrency = 1 @@ -850,9 +854,8 @@ func (bq *baseQueue) processReplica(ctx context.Context, repl replicaInQueue) er ctx, span := bq.AnnotateCtxWithSpan(ctx, bq.name) defer span.Finish() - return contextutil.RunWithTimeout(ctx, fmt.Sprintf("%s queue process replica %d", bq.name, repl.GetRangeID()), - bq.processTimeout, func(ctx context.Context) error { + bq.processTimeoutFunc(repl), func(ctx context.Context) error { log.VEventf(ctx, 1, "processing replica") if !repl.IsInitialized() { diff --git a/pkg/storage/queue_concurrency_test.go b/pkg/storage/queue_concurrency_test.go index 0e82cfb9382b..a58fa7e2c10b 100644 --- a/pkg/storage/queue_concurrency_test.go +++ b/pkg/storage/queue_concurrency_test.go @@ -30,6 +30,10 @@ import ( "golang.org/x/sync/errgroup" ) +func constantTimeoutFunc(d time.Duration) func(replicaInQueue) time.Duration { + return func(replicaInQueue) time.Duration { return d } +} + // TestBaseQueueConcurrent verifies that under concurrent adds/removes of ranges // to the queue including purgatory errors and regular errors, the queue // invariants are upheld. The test operates on fake ranges and a mock queue @@ -49,7 +53,7 @@ func TestBaseQueueConcurrent(t *testing.T) { maxSize: num / 2, maxConcurrency: 4, acceptsUnsplitRanges: true, - processTimeout: time.Millisecond, + processTimeoutFunc: constantTimeoutFunc(time.Millisecond), // We don't care about these, but we don't want to crash. successes: metric.NewCounter(metric.Metadata{Name: "processed"}), failures: metric.NewCounter(metric.Metadata{Name: "failures"}), diff --git a/pkg/storage/queue_test.go b/pkg/storage/queue_test.go index c1f8f20fe7f2..6842a8776e3b 100644 --- a/pkg/storage/queue_test.go +++ b/pkg/storage/queue_test.go @@ -864,7 +864,7 @@ func TestBaseQueueProcessTimeout(t *testing.T) { bq := makeTestBaseQueue("test", ptQueue, tc.store, tc.gossip, queueConfig{ maxSize: 1, - processTimeout: time.Millisecond, + processTimeoutFunc: constantTimeoutFunc(time.Millisecond), acceptsUnsplitRanges: true, }) bq.Start(stopper) @@ -920,7 +920,7 @@ func TestBaseQueueTimeMetric(t *testing.T) { bq := makeTestBaseQueue("test", ptQueue, tc.store, tc.gossip, queueConfig{ maxSize: 1, - processTimeout: time.Millisecond, + processTimeoutFunc: constantTimeoutFunc(time.Millisecond), acceptsUnsplitRanges: true, }) bq.Start(stopper) @@ -930,7 +930,7 @@ func TestBaseQueueTimeMetric(t *testing.T) { if v := bq.successes.Count(); v != 1 { return errors.Errorf("expected 1 processed replicas; got %d", v) } - if min, v := bq.queueConfig.processTimeout, bq.processingNanos.Count(); v < min.Nanoseconds() { + if min, v := bq.queueConfig.processTimeoutFunc(nil), bq.processingNanos.Count(); v < min.Nanoseconds() { return errors.Errorf("expected >= %s in processing time; got %s", min, time.Duration(v)) } return nil diff --git a/pkg/storage/store_rebalancer.go b/pkg/storage/store_rebalancer.go index 3f7cbfe90277..23da6dba5f7e 100644 --- a/pkg/storage/store_rebalancer.go +++ b/pkg/storage/store_rebalancer.go @@ -244,7 +244,7 @@ func (sr *StoreRebalancer) rebalanceStore( log.VEventf(ctx, 1, "transferring r%d (%.2f qps) to s%d to better balance load", replWithStats.repl.RangeID, replWithStats.qps, target.StoreID) - if err := contextutil.RunWithTimeout(ctx, "transfer lease", sr.rq.processTimeout, func(ctx context.Context) error { + if err := contextutil.RunWithTimeout(ctx, "transfer lease", sr.rq.processTimeoutFunc(replWithStats.repl), func(ctx context.Context) error { return sr.rq.transferLease(ctx, replWithStats.repl, target, replWithStats.qps) }); err != nil { log.Errorf(ctx, "unable to transfer lease to s%d: %+v", target.StoreID, err) @@ -303,7 +303,7 @@ func (sr *StoreRebalancer) rebalanceStore( descBeforeRebalance := replWithStats.repl.Desc() log.VEventf(ctx, 1, "rebalancing r%d (%.2f qps) from %v to %v to better balance load", replWithStats.repl.RangeID, replWithStats.qps, descBeforeRebalance.Replicas(), targets) - if err := contextutil.RunWithTimeout(ctx, "relocate range", sr.rq.processTimeout, func(ctx context.Context) error { + if err := contextutil.RunWithTimeout(ctx, "relocate range", sr.rq.processTimeoutFunc(replWithStats.repl), func(ctx context.Context) error { return sr.rq.store.AdminRelocateRange(ctx, *descBeforeRebalance, targets) }); err != nil { log.Errorf(ctx, "unable to relocate range to %v: %+v", targets, err) From a056d48fab7a6eebf9d357c9f6b79a1a0e82bf3c Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Thu, 21 Nov 2019 19:33:09 -0500 Subject: [PATCH 2/4] storage: derive timeout for raftSnapshotQueue process from settings and size This commit adds a hidden setting to control the minimum timeout of raftSnapshotQueue processing. It is an escape hatch to deal with snapshots for large ranges. At the default send rate of 8MB/s a range must stay smaller than 500MB to be successfully sent before the default 1m timeout. When this has been hit traditionally it is has been mitigated by increasing the send rate. This may not always be desirable. In addition to the minimum timeout the change also now computes the timeout on a per Replica basis based on the current snapshot rate limit and the size of the snapshot being sent. This should prevent large ranges with slow send rates from timing out. Maybe there should be a release note but because the setting is hidden I opted not to add it. Release note: None. --- pkg/storage/raft_snapshot_queue.go | 52 +++++++++++++++++++++++++++--- 1 file changed, 48 insertions(+), 4 deletions(-) diff --git a/pkg/storage/raft_snapshot_queue.go b/pkg/storage/raft_snapshot_queue.go index 0f9637032ac2..2275dc058117 100644 --- a/pkg/storage/raft_snapshot_queue.go +++ b/pkg/storage/raft_snapshot_queue.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -32,6 +33,24 @@ const ( raftSnapshotPriority float64 = 0 ) +// raftSnapshotQueueMinimumTimeout is the minimum duration after which the +// processing of the raft snapshot queue will time out. See the timeoutFunc +// in newRaftSnapshotQueue. +var raftSnapshotQueueMinimumTimeout = settings.RegisterDurationSetting( + // NB: this setting has a relatively awkward name because the linter does not + // permit `minimum_timeout` but rather asks for it to be `.minimum.timeout`. + "kv.raft_snapshot_queue.process.timeout_minimum", + "minimum duration after which the processing of the raft snapshot queue will "+ + "time out; it is an escape hatch to raise the minimum timeout for sending "+ + "a raft snapshot which is sent much more slowly than the allowable rate "+ + "as specified by kv.snapshot_recovery.max_rate", + defaultProcessTimeout, +) + +func init() { + raftSnapshotQueueMinimumTimeout.SetConfidential() +} + // raftSnapshotQueue manages a queue of replicas which may need to catch a // replica up with a snapshot to their range. type raftSnapshotQueue struct { @@ -51,10 +70,35 @@ func newRaftSnapshotQueue(store *Store, g *gossip.Gossip) *raftSnapshotQueue { needsLease: false, needsSystemConfig: false, acceptsUnsplitRanges: true, - successes: store.metrics.RaftSnapshotQueueSuccesses, - failures: store.metrics.RaftSnapshotQueueFailures, - pending: store.metrics.RaftSnapshotQueuePending, - processingNanos: store.metrics.RaftSnapshotQueueProcessingNanos, + // Create a timeout which is a function of the size of the range and the + // maximum allowed rate of data transfer that adheres to a minimum timeout + // specified in a cluster setting. + processTimeoutFunc: func(r replicaInQueue) (d time.Duration) { + minimumTimeout := raftSnapshotQueueMinimumTimeout.Get(&store.ClusterSettings().SV) + // NB: In production code this will type assertion will always succeed. + // Some tests set up a fake implementation of replicaInQueue in which + // case we fall back to the configured minimum timeout. + repl, ok := r.(*Replica) + if !ok { + return minimumTimeout + } + stats := repl.GetMVCCStats() + totalBytes := stats.KeyBytes + stats.ValBytes + stats.IntentBytes + stats.SysBytes + snapshotRecoveryRate := recoverySnapshotRate.Get(&store.ClusterSettings().SV) + estimatedDuration := time.Duration(totalBytes / snapshotRecoveryRate) + // Set a timeout to 1/10th of the allowed throughput. + const permittedSlowdown = 10 + timeout := estimatedDuration * permittedSlowdown + + if timeout < minimumTimeout { + timeout = minimumTimeout + } + return timeout + }, + successes: store.metrics.RaftSnapshotQueueSuccesses, + failures: store.metrics.RaftSnapshotQueueFailures, + pending: store.metrics.RaftSnapshotQueuePending, + processingNanos: store.metrics.RaftSnapshotQueueProcessingNanos, }, ) return rq From 8dcc13201443bd424583a1ac29898797b7cc5096 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Thu, 6 Feb 2020 18:21:46 -0500 Subject: [PATCH 3/4] storage: ensure that rate cluster settings are positive Before this commit we would permit rate cluster settings to be set to non-positive values. This would have caused crashes had anybody dared to try. Release note: None --- pkg/storage/store_snapshot.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/pkg/storage/store_snapshot.go b/pkg/storage/store_snapshot.go index 51d87583feda..114337fdc33f 100644 --- a/pkg/storage/store_snapshot.go +++ b/pkg/storage/store_snapshot.go @@ -871,12 +871,21 @@ type SnapshotStorePool interface { throttle(reason throttleReason, why string, toStoreID roachpb.StoreID) } +// validatePositive is a function to validate that a settings value is positive. +func validatePositive(v int64) error { + if v <= 0 { + return errors.Errorf("%d is not positive", v) + } + return nil +} + // rebalanceSnapshotRate is the rate at which preemptive snapshots can be sent. // This includes snapshots generated for upreplication or for rebalancing. -var rebalanceSnapshotRate = settings.RegisterByteSizeSetting( +var rebalanceSnapshotRate = settings.RegisterValidatedByteSizeSetting( "kv.snapshot_rebalance.max_rate", "the rate limit (bytes/sec) to use for rebalance and upreplication snapshots", envutil.EnvOrDefaultBytes("COCKROACH_PREEMPTIVE_SNAPSHOT_RATE", 8<<20), + validatePositive, ) // recoverySnapshotRate is the rate at which Raft-initiated spanshots can be @@ -885,10 +894,11 @@ var rebalanceSnapshotRate = settings.RegisterByteSizeSetting( // completely get rid of them. // TODO(tbg): The existence of this rate, separate from rebalanceSnapshotRate, // does not make a whole lot of sense. -var recoverySnapshotRate = settings.RegisterByteSizeSetting( +var recoverySnapshotRate = settings.RegisterValidatedByteSizeSetting( "kv.snapshot_recovery.max_rate", "the rate limit (bytes/sec) to use for recovery snapshots", envutil.EnvOrDefaultBytes("COCKROACH_RAFT_SNAPSHOT_RATE", 8<<20), + validatePositive, ) // snapshotSSTWriteSyncRate is the size of chunks to write before fsync-ing. From a526d55043e54ae02b2956b909021b05a8a385d9 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Thu, 6 Feb 2020 09:46:11 -0500 Subject: [PATCH 4/4] storage: make queue timeouts controllable, snapshot sending queues dynamic In #42686 we made the raft snapshot queue timeout dynamic and based on the size of the snapshot being sent. We also added an escape hatch to control the timeout of processing of that queue. This change generalizes that cluster setting to apply to all of the queues. It so happens that the replicate queue and the merge queue also sometimes need to send snapshots. This PR gives them similar treatment to the raft snapshot queue. The previous cluster setting was never released and is reserved so it does not need a release note. Release note (bug fix): Fixed a bug where large ranges with slow send rates would hit the timeout in several storage system queues by making the timeout dynamic based on the current rate limit and the size of the data being sent. This affects several storage system queues: the Raft snapshot queue, the replication queue, and the merge queue. --- pkg/storage/merge_queue.go | 16 ++++++- pkg/storage/queue.go | 61 ++++++++++++++++++++++++-- pkg/storage/queue_concurrency_test.go | 5 ++- pkg/storage/queue_test.go | 62 ++++++++++++++++++++++++++- pkg/storage/raft_snapshot_queue.go | 53 +++-------------------- pkg/storage/replicate_queue.go | 15 ++++--- pkg/storage/store_rebalancer.go | 6 ++- 7 files changed, 154 insertions(+), 64 deletions(-) diff --git a/pkg/storage/merge_queue.go b/pkg/storage/merge_queue.go index 454589acbcc4..8a096bd8bbb1 100644 --- a/pkg/storage/merge_queue.go +++ b/pkg/storage/merge_queue.go @@ -101,8 +101,20 @@ func newMergeQueue(store *Store, db *client.DB, gossip *gossip.Gossip) *mergeQue mq.baseQueue = newBaseQueue( "merge", mq, store, gossip, queueConfig{ - maxSize: defaultQueueMaxSize, - maxConcurrency: mergeQueueConcurrency, + maxSize: defaultQueueMaxSize, + maxConcurrency: mergeQueueConcurrency, + // TODO(ajwerner): Sometimes the merge queue needs to send multiple + // snapshots, but the timeout function here is configured based on the + // duration required to send a single snapshot. That being said, this + // timeout provides leeway for snapshots to be 10x slower than the + // specified rate and still respects the queue processing minimum timeout. + // While using the below function is certainly better than just using the + // default timeout, it would be better to have a function which takes into + // account how many snapshots processing will need to send. That might be + // hard to determine ahead of time. An alternative would be to calculate + // the timeout with a function that additionally considers the replication + // factor. + processTimeoutFunc: makeQueueSnapshotTimeoutFunc(rebalanceSnapshotRate), needsLease: true, needsSystemConfig: true, acceptsUnsplitRanges: false, diff --git a/pkg/storage/queue.go b/pkg/storage/queue.go index 978a10e02c39..e7f2946fe67c 100644 --- a/pkg/storage/queue.go +++ b/pkg/storage/queue.go @@ -20,6 +20,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" "github.com/cockroachdb/cockroach/pkg/util/causer" "github.com/cockroachdb/cockroach/pkg/util/contextutil" @@ -44,10 +47,56 @@ const ( defaultQueueMaxSize = 10000 ) -func defaultProcessTimeoutFunc(replicaInQueue) time.Duration { - return defaultProcessTimeout +// queueGuaranteedProcessingTimeBudget is the smallest amount of time before +// which the processing of a queue may time out. It is an escape hatch to raise +// the timeout for queues. +var queueGuaranteedProcessingTimeBudget = settings.RegisterDurationSetting( + "kv.queue.process.guaranteed_time_budget", + "the guaranteed duration before which the processing of a queue may "+ + "time out", + defaultProcessTimeout, +) + +func init() { + queueGuaranteedProcessingTimeBudget.SetConfidential() +} + +func defaultProcessTimeoutFunc(cs *cluster.Settings, _ replicaInQueue) time.Duration { + return queueGuaranteedProcessingTimeBudget.Get(&cs.SV) +} + +// The queues which send snapshots while processing should have a timeout which +// is a function of the size of the range and the maximum allowed rate of data +// transfer that adheres to a minimum timeout specified in a cluster setting. +// +// The parameter controls which rate to use. +func makeQueueSnapshotTimeoutFunc(rateSetting *settings.ByteSizeSetting) queueProcessTimeoutFunc { + return func(cs *cluster.Settings, r replicaInQueue) time.Duration { + minimumTimeout := queueGuaranteedProcessingTimeBudget.Get(&cs.SV) + // NB: In production code this will type assertion will always succeed. + // Some tests set up a fake implementation of replicaInQueue in which + // case we fall back to the configured minimum timeout. + repl, ok := r.(interface{ GetMVCCStats() enginepb.MVCCStats }) + if !ok { + return minimumTimeout + } + snapshotRate := rateSetting.Get(&cs.SV) + stats := repl.GetMVCCStats() + totalBytes := stats.KeyBytes + stats.ValBytes + stats.IntentBytes + stats.SysBytes + estimatedDuration := time.Duration(totalBytes/snapshotRate) * time.Second + timeout := estimatedDuration * permittedSnapshotSlowdown + if timeout < minimumTimeout { + timeout = minimumTimeout + } + return timeout + } } +// permittedSnapshotSlowdown is the factor of the above the estimated duration +// for a snapshot given the configured snapshot rate which we use to configure +// the snapshot's timeout. +const permittedSnapshotSlowdown = 10 + // a purgatoryError indicates a replica processing failure which indicates // the replica can be placed into purgatory for faster retries when the // failure condition changes. @@ -222,6 +271,10 @@ type queueImpl interface { purgatoryChan() <-chan time.Time } +// queueProcessTimeoutFunc controls the timeout for queue processing for a +// replicaInQueue. +type queueProcessTimeoutFunc func(*cluster.Settings, replicaInQueue) time.Duration + type queueConfig struct { // maxSize is the maximum number of replicas to queue. maxSize int @@ -258,7 +311,7 @@ type queueConfig struct { // that have been destroyed but not GCed. processDestroyedReplicas bool // processTimeout returns the timeout for processing a replica. - processTimeoutFunc func(replicaInQueue) time.Duration + processTimeoutFunc queueProcessTimeoutFunc // successes is a counter of replicas processed successfully. successes *metric.Counter // failures is a counter of replicas which failed processing. @@ -855,7 +908,7 @@ func (bq *baseQueue) processReplica(ctx context.Context, repl replicaInQueue) er ctx, span := bq.AnnotateCtxWithSpan(ctx, bq.name) defer span.Finish() return contextutil.RunWithTimeout(ctx, fmt.Sprintf("%s queue process replica %d", bq.name, repl.GetRangeID()), - bq.processTimeoutFunc(repl), func(ctx context.Context) error { + bq.processTimeoutFunc(bq.store.ClusterSettings(), repl), func(ctx context.Context) error { log.VEventf(ctx, 1, "processing replica") if !repl.IsInitialized() { diff --git a/pkg/storage/queue_concurrency_test.go b/pkg/storage/queue_concurrency_test.go index a58fa7e2c10b..3977c6e74b06 100644 --- a/pkg/storage/queue_concurrency_test.go +++ b/pkg/storage/queue_concurrency_test.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -30,8 +31,8 @@ import ( "golang.org/x/sync/errgroup" ) -func constantTimeoutFunc(d time.Duration) func(replicaInQueue) time.Duration { - return func(replicaInQueue) time.Duration { return d } +func constantTimeoutFunc(d time.Duration) func(*cluster.Settings, replicaInQueue) time.Duration { + return func(*cluster.Settings, replicaInQueue) time.Duration { return d } } // TestBaseQueueConcurrent verifies that under concurrent adds/removes of ranges diff --git a/pkg/storage/queue_test.go b/pkg/storage/queue_test.go index 6842a8776e3b..e3010fc751d0 100644 --- a/pkg/storage/queue_test.go +++ b/pkg/storage/queue_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -886,6 +887,65 @@ func TestBaseQueueProcessTimeout(t *testing.T) { }) } +type mvccStatsReplicaInQueue struct { + replicaInQueue + size int64 +} + +func (r mvccStatsReplicaInQueue) GetMVCCStats() enginepb.MVCCStats { + return enginepb.MVCCStats{ValBytes: r.size} +} + +func TestQueueSnapshotTimeoutFunc(t *testing.T) { + defer leaktest.AfterTest(t)() + type testCase struct { + guaranteedProcessingTime time.Duration + snapshotRate int64 // bytes/s + replicaSize int64 // bytes + expectedTimeout time.Duration + } + makeTest := func(tc testCase) (string, func(t *testing.T)) { + return fmt.Sprintf("%+v", tc), func(t *testing.T) { + st := cluster.MakeTestingClusterSettings() + queueGuaranteedProcessingTimeBudget.Override(&st.SV, tc.guaranteedProcessingTime) + recoverySnapshotRate.Override(&st.SV, tc.snapshotRate) + tf := makeQueueSnapshotTimeoutFunc(recoverySnapshotRate) + repl := mvccStatsReplicaInQueue{ + size: tc.replicaSize, + } + require.Equal(t, tc.expectedTimeout, tf(st, repl)) + } + } + for _, tc := range []testCase{ + { + guaranteedProcessingTime: time.Minute, + snapshotRate: 1 << 30, + replicaSize: 1 << 20, + expectedTimeout: time.Minute, + }, + { + guaranteedProcessingTime: time.Minute, + snapshotRate: 1 << 20, + replicaSize: 100 << 20, + expectedTimeout: 100 * time.Second * permittedSnapshotSlowdown, + }, + { + guaranteedProcessingTime: time.Hour, + snapshotRate: 1 << 20, + replicaSize: 100 << 20, + expectedTimeout: time.Hour, + }, + { + guaranteedProcessingTime: time.Minute, + snapshotRate: 1 << 10, + replicaSize: 100 << 20, + expectedTimeout: 100 * (1 << 10) * time.Second * permittedSnapshotSlowdown, + }, + } { + t.Run(makeTest(tc)) + } +} + // processTimeQueueImpl spends 5ms on each process request. type processTimeQueueImpl struct { testQueueImpl @@ -930,7 +990,7 @@ func TestBaseQueueTimeMetric(t *testing.T) { if v := bq.successes.Count(); v != 1 { return errors.Errorf("expected 1 processed replicas; got %d", v) } - if min, v := bq.queueConfig.processTimeoutFunc(nil), bq.processingNanos.Count(); v < min.Nanoseconds() { + if min, v := bq.queueConfig.processTimeoutFunc(nil, nil), bq.processingNanos.Count(); v < min.Nanoseconds() { return errors.Errorf("expected >= %s in processing time; got %s", min, time.Duration(v)) } return nil diff --git a/pkg/storage/raft_snapshot_queue.go b/pkg/storage/raft_snapshot_queue.go index 2275dc058117..7f2e43844317 100644 --- a/pkg/storage/raft_snapshot_queue.go +++ b/pkg/storage/raft_snapshot_queue.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -33,24 +32,6 @@ const ( raftSnapshotPriority float64 = 0 ) -// raftSnapshotQueueMinimumTimeout is the minimum duration after which the -// processing of the raft snapshot queue will time out. See the timeoutFunc -// in newRaftSnapshotQueue. -var raftSnapshotQueueMinimumTimeout = settings.RegisterDurationSetting( - // NB: this setting has a relatively awkward name because the linter does not - // permit `minimum_timeout` but rather asks for it to be `.minimum.timeout`. - "kv.raft_snapshot_queue.process.timeout_minimum", - "minimum duration after which the processing of the raft snapshot queue will "+ - "time out; it is an escape hatch to raise the minimum timeout for sending "+ - "a raft snapshot which is sent much more slowly than the allowable rate "+ - "as specified by kv.snapshot_recovery.max_rate", - defaultProcessTimeout, -) - -func init() { - raftSnapshotQueueMinimumTimeout.SetConfidential() -} - // raftSnapshotQueue manages a queue of replicas which may need to catch a // replica up with a snapshot to their range. type raftSnapshotQueue struct { @@ -70,35 +51,11 @@ func newRaftSnapshotQueue(store *Store, g *gossip.Gossip) *raftSnapshotQueue { needsLease: false, needsSystemConfig: false, acceptsUnsplitRanges: true, - // Create a timeout which is a function of the size of the range and the - // maximum allowed rate of data transfer that adheres to a minimum timeout - // specified in a cluster setting. - processTimeoutFunc: func(r replicaInQueue) (d time.Duration) { - minimumTimeout := raftSnapshotQueueMinimumTimeout.Get(&store.ClusterSettings().SV) - // NB: In production code this will type assertion will always succeed. - // Some tests set up a fake implementation of replicaInQueue in which - // case we fall back to the configured minimum timeout. - repl, ok := r.(*Replica) - if !ok { - return minimumTimeout - } - stats := repl.GetMVCCStats() - totalBytes := stats.KeyBytes + stats.ValBytes + stats.IntentBytes + stats.SysBytes - snapshotRecoveryRate := recoverySnapshotRate.Get(&store.ClusterSettings().SV) - estimatedDuration := time.Duration(totalBytes / snapshotRecoveryRate) - // Set a timeout to 1/10th of the allowed throughput. - const permittedSlowdown = 10 - timeout := estimatedDuration * permittedSlowdown - - if timeout < minimumTimeout { - timeout = minimumTimeout - } - return timeout - }, - successes: store.metrics.RaftSnapshotQueueSuccesses, - failures: store.metrics.RaftSnapshotQueueFailures, - pending: store.metrics.RaftSnapshotQueuePending, - processingNanos: store.metrics.RaftSnapshotQueueProcessingNanos, + processTimeoutFunc: makeQueueSnapshotTimeoutFunc(recoverySnapshotRate), + successes: store.metrics.RaftSnapshotQueueSuccesses, + failures: store.metrics.RaftSnapshotQueueFailures, + pending: store.metrics.RaftSnapshotQueuePending, + processingNanos: store.metrics.RaftSnapshotQueueProcessingNanos, }, ) return rq diff --git a/pkg/storage/replicate_queue.go b/pkg/storage/replicate_queue.go index b09ec36d9aee..df5ff8f748dd 100644 --- a/pkg/storage/replicate_queue.go +++ b/pkg/storage/replicate_queue.go @@ -155,11 +155,16 @@ func newReplicateQueue(store *Store, g *gossip.Gossip, allocator Allocator) *rep needsLease: true, needsSystemConfig: true, acceptsUnsplitRanges: store.TestingKnobs().ReplicateQueueAcceptsUnsplit, - successes: store.metrics.ReplicateQueueSuccesses, - failures: store.metrics.ReplicateQueueFailures, - pending: store.metrics.ReplicateQueuePending, - processingNanos: store.metrics.ReplicateQueueProcessingNanos, - purgatory: store.metrics.ReplicateQueuePurgatory, + // The processing of the replicate queue often needs to send snapshots + // so we use the raftSnapshotQueueTimeoutFunc. This function sets a + // timeout based on the range size and the sending rate in addition + // to consulting the setting which controls the minimum timeout. + processTimeoutFunc: makeQueueSnapshotTimeoutFunc(rebalanceSnapshotRate), + successes: store.metrics.ReplicateQueueSuccesses, + failures: store.metrics.ReplicateQueueFailures, + pending: store.metrics.ReplicateQueuePending, + processingNanos: store.metrics.ReplicateQueueProcessingNanos, + purgatory: store.metrics.ReplicateQueuePurgatory, }, ) diff --git a/pkg/storage/store_rebalancer.go b/pkg/storage/store_rebalancer.go index 23da6dba5f7e..078939ae30e2 100644 --- a/pkg/storage/store_rebalancer.go +++ b/pkg/storage/store_rebalancer.go @@ -244,7 +244,8 @@ func (sr *StoreRebalancer) rebalanceStore( log.VEventf(ctx, 1, "transferring r%d (%.2f qps) to s%d to better balance load", replWithStats.repl.RangeID, replWithStats.qps, target.StoreID) - if err := contextutil.RunWithTimeout(ctx, "transfer lease", sr.rq.processTimeoutFunc(replWithStats.repl), func(ctx context.Context) error { + timeout := sr.rq.processTimeoutFunc(sr.st, replWithStats.repl) + if err := contextutil.RunWithTimeout(ctx, "transfer lease", timeout, func(ctx context.Context) error { return sr.rq.transferLease(ctx, replWithStats.repl, target, replWithStats.qps) }); err != nil { log.Errorf(ctx, "unable to transfer lease to s%d: %+v", target.StoreID, err) @@ -303,7 +304,8 @@ func (sr *StoreRebalancer) rebalanceStore( descBeforeRebalance := replWithStats.repl.Desc() log.VEventf(ctx, 1, "rebalancing r%d (%.2f qps) from %v to %v to better balance load", replWithStats.repl.RangeID, replWithStats.qps, descBeforeRebalance.Replicas(), targets) - if err := contextutil.RunWithTimeout(ctx, "relocate range", sr.rq.processTimeoutFunc(replWithStats.repl), func(ctx context.Context) error { + timeout := sr.rq.processTimeoutFunc(sr.st, replWithStats.repl) + if err := contextutil.RunWithTimeout(ctx, "relocate range", timeout, func(ctx context.Context) error { return sr.rq.store.AdminRelocateRange(ctx, *descBeforeRebalance, targets) }); err != nil { log.Errorf(ctx, "unable to relocate range to %v: %+v", targets, err)