Skip to content

Commit

Permalink
Merge pull request cockroachdb#44952 from ajwerner/backport19.2-42686…
Browse files Browse the repository at this point in the history
…-44809

release-19.2: storage: make queue timeouts controllable, snapshot sending queues dynamic
  • Loading branch information
ajwerner committed Mar 4, 2020
2 parents 65de26e + a526d55 commit fa0f6c4
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 21 deletions.
16 changes: 14 additions & 2 deletions pkg/storage/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,20 @@ func newMergeQueue(store *Store, db *client.DB, gossip *gossip.Gossip) *mergeQue
mq.baseQueue = newBaseQueue(
"merge", mq, store, gossip,
queueConfig{
maxSize: defaultQueueMaxSize,
maxConcurrency: mergeQueueConcurrency,
maxSize: defaultQueueMaxSize,
maxConcurrency: mergeQueueConcurrency,
// TODO(ajwerner): Sometimes the merge queue needs to send multiple
// snapshots, but the timeout function here is configured based on the
// duration required to send a single snapshot. That being said, this
// timeout provides leeway for snapshots to be 10x slower than the
// specified rate and still respects the queue processing minimum timeout.
// While using the below function is certainly better than just using the
// default timeout, it would be better to have a function which takes into
// account how many snapshots processing will need to send. That might be
// hard to determine ahead of time. An alternative would be to calculate
// the timeout with a function that additionally considers the replication
// factor.
processTimeoutFunc: makeQueueSnapshotTimeoutFunc(rebalanceSnapshotRate),
needsLease: true,
needsSystemConfig: true,
acceptsUnsplitRanges: false,
Expand Down
68 changes: 62 additions & 6 deletions pkg/storage/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/util/causer"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
Expand All @@ -44,6 +47,56 @@ const (
defaultQueueMaxSize = 10000
)

// queueGuaranteedProcessingTimeBudget is the smallest amount of time before
// which the processing of a queue may time out. It is an escape hatch to raise
// the timeout for queues.
var queueGuaranteedProcessingTimeBudget = settings.RegisterDurationSetting(
"kv.queue.process.guaranteed_time_budget",
"the guaranteed duration before which the processing of a queue may "+
"time out",
defaultProcessTimeout,
)

func init() {
queueGuaranteedProcessingTimeBudget.SetConfidential()
}

func defaultProcessTimeoutFunc(cs *cluster.Settings, _ replicaInQueue) time.Duration {
return queueGuaranteedProcessingTimeBudget.Get(&cs.SV)
}

// The queues which send snapshots while processing should have a timeout which
// is a function of the size of the range and the maximum allowed rate of data
// transfer that adheres to a minimum timeout specified in a cluster setting.
//
// The parameter controls which rate to use.
func makeQueueSnapshotTimeoutFunc(rateSetting *settings.ByteSizeSetting) queueProcessTimeoutFunc {
return func(cs *cluster.Settings, r replicaInQueue) time.Duration {
minimumTimeout := queueGuaranteedProcessingTimeBudget.Get(&cs.SV)
// NB: In production code this will type assertion will always succeed.
// Some tests set up a fake implementation of replicaInQueue in which
// case we fall back to the configured minimum timeout.
repl, ok := r.(interface{ GetMVCCStats() enginepb.MVCCStats })
if !ok {
return minimumTimeout
}
snapshotRate := rateSetting.Get(&cs.SV)
stats := repl.GetMVCCStats()
totalBytes := stats.KeyBytes + stats.ValBytes + stats.IntentBytes + stats.SysBytes
estimatedDuration := time.Duration(totalBytes/snapshotRate) * time.Second
timeout := estimatedDuration * permittedSnapshotSlowdown
if timeout < minimumTimeout {
timeout = minimumTimeout
}
return timeout
}
}

// permittedSnapshotSlowdown is the factor of the above the estimated duration
// for a snapshot given the configured snapshot rate which we use to configure
// the snapshot's timeout.
const permittedSnapshotSlowdown = 10

// a purgatoryError indicates a replica processing failure which indicates
// the replica can be placed into purgatory for faster retries when the
// failure condition changes.
Expand Down Expand Up @@ -218,6 +271,10 @@ type queueImpl interface {
purgatoryChan() <-chan time.Time
}

// queueProcessTimeoutFunc controls the timeout for queue processing for a
// replicaInQueue.
type queueProcessTimeoutFunc func(*cluster.Settings, replicaInQueue) time.Duration

type queueConfig struct {
// maxSize is the maximum number of replicas to queue.
maxSize int
Expand Down Expand Up @@ -253,8 +310,8 @@ type queueConfig struct {
// processDestroyedReplicas controls whether or not we want to process replicas
// that have been destroyed but not GCed.
processDestroyedReplicas bool
// processTimeout is the timeout for processing a replica.
processTimeout time.Duration
// processTimeout returns the timeout for processing a replica.
processTimeoutFunc queueProcessTimeoutFunc
// successes is a counter of replicas processed successfully.
successes *metric.Counter
// failures is a counter of replicas which failed processing.
Expand Down Expand Up @@ -374,8 +431,8 @@ func newBaseQueue(
name string, impl queueImpl, store *Store, gossip *gossip.Gossip, cfg queueConfig,
) *baseQueue {
// Use the default process timeout if none specified.
if cfg.processTimeout == 0 {
cfg.processTimeout = defaultProcessTimeout
if cfg.processTimeoutFunc == nil {
cfg.processTimeoutFunc = defaultProcessTimeoutFunc
}
if cfg.maxConcurrency == 0 {
cfg.maxConcurrency = 1
Expand Down Expand Up @@ -850,9 +907,8 @@ func (bq *baseQueue) processReplica(ctx context.Context, repl replicaInQueue) er

ctx, span := bq.AnnotateCtxWithSpan(ctx, bq.name)
defer span.Finish()

return contextutil.RunWithTimeout(ctx, fmt.Sprintf("%s queue process replica %d", bq.name, repl.GetRangeID()),
bq.processTimeout, func(ctx context.Context) error {
bq.processTimeoutFunc(bq.store.ClusterSettings(), repl), func(ctx context.Context) error {
log.VEventf(ctx, 1, "processing replica")

if !repl.IsInitialized() {
Expand Down
7 changes: 6 additions & 1 deletion pkg/storage/queue_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -30,6 +31,10 @@ import (
"golang.org/x/sync/errgroup"
)

func constantTimeoutFunc(d time.Duration) func(*cluster.Settings, replicaInQueue) time.Duration {
return func(*cluster.Settings, replicaInQueue) time.Duration { return d }
}

// TestBaseQueueConcurrent verifies that under concurrent adds/removes of ranges
// to the queue including purgatory errors and regular errors, the queue
// invariants are upheld. The test operates on fake ranges and a mock queue
Expand All @@ -49,7 +54,7 @@ func TestBaseQueueConcurrent(t *testing.T) {
maxSize: num / 2,
maxConcurrency: 4,
acceptsUnsplitRanges: true,
processTimeout: time.Millisecond,
processTimeoutFunc: constantTimeoutFunc(time.Millisecond),
// We don't care about these, but we don't want to crash.
successes: metric.NewCounter(metric.Metadata{Name: "processed"}),
failures: metric.NewCounter(metric.Metadata{Name: "failures"}),
Expand Down
66 changes: 63 additions & 3 deletions pkg/storage/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -864,7 +865,7 @@ func TestBaseQueueProcessTimeout(t *testing.T) {
bq := makeTestBaseQueue("test", ptQueue, tc.store, tc.gossip,
queueConfig{
maxSize: 1,
processTimeout: time.Millisecond,
processTimeoutFunc: constantTimeoutFunc(time.Millisecond),
acceptsUnsplitRanges: true,
})
bq.Start(stopper)
Expand All @@ -886,6 +887,65 @@ func TestBaseQueueProcessTimeout(t *testing.T) {
})
}

type mvccStatsReplicaInQueue struct {
replicaInQueue
size int64
}

func (r mvccStatsReplicaInQueue) GetMVCCStats() enginepb.MVCCStats {
return enginepb.MVCCStats{ValBytes: r.size}
}

func TestQueueSnapshotTimeoutFunc(t *testing.T) {
defer leaktest.AfterTest(t)()
type testCase struct {
guaranteedProcessingTime time.Duration
snapshotRate int64 // bytes/s
replicaSize int64 // bytes
expectedTimeout time.Duration
}
makeTest := func(tc testCase) (string, func(t *testing.T)) {
return fmt.Sprintf("%+v", tc), func(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
queueGuaranteedProcessingTimeBudget.Override(&st.SV, tc.guaranteedProcessingTime)
recoverySnapshotRate.Override(&st.SV, tc.snapshotRate)
tf := makeQueueSnapshotTimeoutFunc(recoverySnapshotRate)
repl := mvccStatsReplicaInQueue{
size: tc.replicaSize,
}
require.Equal(t, tc.expectedTimeout, tf(st, repl))
}
}
for _, tc := range []testCase{
{
guaranteedProcessingTime: time.Minute,
snapshotRate: 1 << 30,
replicaSize: 1 << 20,
expectedTimeout: time.Minute,
},
{
guaranteedProcessingTime: time.Minute,
snapshotRate: 1 << 20,
replicaSize: 100 << 20,
expectedTimeout: 100 * time.Second * permittedSnapshotSlowdown,
},
{
guaranteedProcessingTime: time.Hour,
snapshotRate: 1 << 20,
replicaSize: 100 << 20,
expectedTimeout: time.Hour,
},
{
guaranteedProcessingTime: time.Minute,
snapshotRate: 1 << 10,
replicaSize: 100 << 20,
expectedTimeout: 100 * (1 << 10) * time.Second * permittedSnapshotSlowdown,
},
} {
t.Run(makeTest(tc))
}
}

// processTimeQueueImpl spends 5ms on each process request.
type processTimeQueueImpl struct {
testQueueImpl
Expand Down Expand Up @@ -920,7 +980,7 @@ func TestBaseQueueTimeMetric(t *testing.T) {
bq := makeTestBaseQueue("test", ptQueue, tc.store, tc.gossip,
queueConfig{
maxSize: 1,
processTimeout: time.Millisecond,
processTimeoutFunc: constantTimeoutFunc(time.Millisecond),
acceptsUnsplitRanges: true,
})
bq.Start(stopper)
Expand All @@ -930,7 +990,7 @@ func TestBaseQueueTimeMetric(t *testing.T) {
if v := bq.successes.Count(); v != 1 {
return errors.Errorf("expected 1 processed replicas; got %d", v)
}
if min, v := bq.queueConfig.processTimeout, bq.processingNanos.Count(); v < min.Nanoseconds() {
if min, v := bq.queueConfig.processTimeoutFunc(nil, nil), bq.processingNanos.Count(); v < min.Nanoseconds() {
return errors.Errorf("expected >= %s in processing time; got %s", min, time.Duration(v))
}
return nil
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/raft_snapshot_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func newRaftSnapshotQueue(store *Store, g *gossip.Gossip) *raftSnapshotQueue {
needsLease: false,
needsSystemConfig: false,
acceptsUnsplitRanges: true,
processTimeoutFunc: makeQueueSnapshotTimeoutFunc(recoverySnapshotRate),
successes: store.metrics.RaftSnapshotQueueSuccesses,
failures: store.metrics.RaftSnapshotQueueFailures,
pending: store.metrics.RaftSnapshotQueuePending,
Expand Down
15 changes: 10 additions & 5 deletions pkg/storage/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,16 @@ func newReplicateQueue(store *Store, g *gossip.Gossip, allocator Allocator) *rep
needsLease: true,
needsSystemConfig: true,
acceptsUnsplitRanges: store.TestingKnobs().ReplicateQueueAcceptsUnsplit,
successes: store.metrics.ReplicateQueueSuccesses,
failures: store.metrics.ReplicateQueueFailures,
pending: store.metrics.ReplicateQueuePending,
processingNanos: store.metrics.ReplicateQueueProcessingNanos,
purgatory: store.metrics.ReplicateQueuePurgatory,
// The processing of the replicate queue often needs to send snapshots
// so we use the raftSnapshotQueueTimeoutFunc. This function sets a
// timeout based on the range size and the sending rate in addition
// to consulting the setting which controls the minimum timeout.
processTimeoutFunc: makeQueueSnapshotTimeoutFunc(rebalanceSnapshotRate),
successes: store.metrics.ReplicateQueueSuccesses,
failures: store.metrics.ReplicateQueueFailures,
pending: store.metrics.ReplicateQueuePending,
processingNanos: store.metrics.ReplicateQueueProcessingNanos,
purgatory: store.metrics.ReplicateQueuePurgatory,
},
)

Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/store_rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ func (sr *StoreRebalancer) rebalanceStore(

log.VEventf(ctx, 1, "transferring r%d (%.2f qps) to s%d to better balance load",
replWithStats.repl.RangeID, replWithStats.qps, target.StoreID)
if err := contextutil.RunWithTimeout(ctx, "transfer lease", sr.rq.processTimeout, func(ctx context.Context) error {
timeout := sr.rq.processTimeoutFunc(sr.st, replWithStats.repl)
if err := contextutil.RunWithTimeout(ctx, "transfer lease", timeout, func(ctx context.Context) error {
return sr.rq.transferLease(ctx, replWithStats.repl, target, replWithStats.qps)
}); err != nil {
log.Errorf(ctx, "unable to transfer lease to s%d: %+v", target.StoreID, err)
Expand Down Expand Up @@ -303,7 +304,8 @@ func (sr *StoreRebalancer) rebalanceStore(
descBeforeRebalance := replWithStats.repl.Desc()
log.VEventf(ctx, 1, "rebalancing r%d (%.2f qps) from %v to %v to better balance load",
replWithStats.repl.RangeID, replWithStats.qps, descBeforeRebalance.Replicas(), targets)
if err := contextutil.RunWithTimeout(ctx, "relocate range", sr.rq.processTimeout, func(ctx context.Context) error {
timeout := sr.rq.processTimeoutFunc(sr.st, replWithStats.repl)
if err := contextutil.RunWithTimeout(ctx, "relocate range", timeout, func(ctx context.Context) error {
return sr.rq.store.AdminRelocateRange(ctx, *descBeforeRebalance, targets)
}); err != nil {
log.Errorf(ctx, "unable to relocate range to %v: %+v", targets, err)
Expand Down
14 changes: 12 additions & 2 deletions pkg/storage/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,12 +871,21 @@ type SnapshotStorePool interface {
throttle(reason throttleReason, why string, toStoreID roachpb.StoreID)
}

// validatePositive is a function to validate that a settings value is positive.
func validatePositive(v int64) error {
if v <= 0 {
return errors.Errorf("%d is not positive", v)
}
return nil
}

// rebalanceSnapshotRate is the rate at which preemptive snapshots can be sent.
// This includes snapshots generated for upreplication or for rebalancing.
var rebalanceSnapshotRate = settings.RegisterByteSizeSetting(
var rebalanceSnapshotRate = settings.RegisterValidatedByteSizeSetting(
"kv.snapshot_rebalance.max_rate",
"the rate limit (bytes/sec) to use for rebalance and upreplication snapshots",
envutil.EnvOrDefaultBytes("COCKROACH_PREEMPTIVE_SNAPSHOT_RATE", 8<<20),
validatePositive,
)

// recoverySnapshotRate is the rate at which Raft-initiated spanshots can be
Expand All @@ -885,10 +894,11 @@ var rebalanceSnapshotRate = settings.RegisterByteSizeSetting(
// completely get rid of them.
// TODO(tbg): The existence of this rate, separate from rebalanceSnapshotRate,
// does not make a whole lot of sense.
var recoverySnapshotRate = settings.RegisterByteSizeSetting(
var recoverySnapshotRate = settings.RegisterValidatedByteSizeSetting(
"kv.snapshot_recovery.max_rate",
"the rate limit (bytes/sec) to use for recovery snapshots",
envutil.EnvOrDefaultBytes("COCKROACH_RAFT_SNAPSHOT_RATE", 8<<20),
validatePositive,
)

// snapshotSSTWriteSyncRate is the size of chunks to write before fsync-ing.
Expand Down

0 comments on commit fa0f6c4

Please sign in to comment.