Skip to content

Commit

Permalink
Merge #36564
Browse files Browse the repository at this point in the history
36564: storage: use finer locks and async methods for queuing ranges r=bdarnell a=tbg

baseQueue.mu locks were held unnecessarily long, and in particular across
`shouldQueue` invocations which in turn can acquire various Replica locks,
complicating the lock ordering story.

Sidestep this issue by never holding baseQueue.mu across any operation that
can touch the Replica.

The remaining issue here is that callers of `Add` or `MaybeAdd` may be
holding locks on the Replica object already which could also lead to
deadlock. This is because the queue calls into
`repl.maybeInitializeRaftGroup()` (which can end up acquiring the raftMu)
but also because it calls the various `shouldQueue` methods doing the
queue-specific checks; those almost always acquire locks, too. This problem
should be exclusive to `raftMu`; I doubt we hold `Replica.mu` while calling
`Add/MaybeAdd` or at least it should be easy to avoid that.

Working around this is left to the second commit, which avoids having to worry
about lock ordering if a replica is added to a queue while holding mutexes.
Primarly, this affects calls to add a replica to a queue from within the raft
processing goroutines, where the replica's raftMu is held. Adding a replica to
a queue can in turn require acquisition of the raftMu, leading to potential
deadlock.

This hasn't been observed in practice, but let's be prudent and avoid this
problem. Queues act asynchronously by their very nature, so we lose very
little by not offering replicas to a queue synchronously, except during
unit tests, where we'll simply have work around it.

Touches #36413.

Co-authored-by: Tobias Schottdorf <tobias.schottdorf@gmail.com>
  • Loading branch information
craig[bot] and tbg committed Apr 8, 2019
2 parents 4eacf37 + 1cd61f9 commit f8be509
Show file tree
Hide file tree
Showing 17 changed files with 298 additions and 244 deletions.
2 changes: 1 addition & 1 deletion pkg/storage/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (mq *mergeQueue) process(
// On seeing a ConditionFailedError, don't return an error and enqueue
// this replica again in case it still needs to be merged.
log.Infof(ctx, "merge saw concurrent descriptor modification; maybe retrying")
mq.MaybeAdd(lhsRepl, mq.store.Clock().Now())
mq.MaybeAddAsync(ctx, lhsRepl, mq.store.Clock().Now())
default:
// While range merges are unstable, be extra cautious and mark every error
// as purgatory-worthy.
Expand Down
115 changes: 77 additions & 38 deletions pkg/storage/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,9 @@ type queueConfig struct {
maxSize int
// maxConcurrency is the maximum number of replicas that can be processed
// concurrently. If not set, defaults to 1.
maxConcurrency int
maxConcurrency int
addSemSize int
maybeAddSemSize int
// needsLease controls whether this queue requires the range lease to
// operate on a replica. If so, one will be acquired if necessary.
needsLease bool
Expand Down Expand Up @@ -265,10 +267,13 @@ type baseQueue struct {
store *Store
gossip *gossip.Gossip
queueConfig
incoming chan struct{} // Channel signaled when a new replica is added to the queue.
processSem chan struct{}
processDur int64 // accessed atomically
mu struct {
incoming chan struct{} // Channel signaled when a new replica is added to the queue.
processSem chan struct{}
addSem chan struct{} // for .AddAsync
maybeAddSem chan struct{} // for .MaybeAddAsync
addLogN log.EveryN // avoid log spam when addSem, maybeAddSem are maxed out
processDur int64 // accessed atomically
mu struct {
syncutil.Mutex // Protects all variables in the mu struct
replicas map[roachpb.RangeID]*replicaItem // Map from RangeID to replicaItem
priorityQ priorityQueue // The priority queue
Expand All @@ -295,6 +300,12 @@ func newBaseQueue(
if cfg.maxConcurrency == 0 {
cfg.maxConcurrency = 1
}
if cfg.addSemSize == 0 {
cfg.addSemSize = 100
}
if cfg.maybeAddSemSize == 0 {
cfg.maybeAddSemSize = 500
}

ambient := store.cfg.AmbientCtx
ambient.AddLogTag(name, nil)
Expand All @@ -313,6 +324,9 @@ func newBaseQueue(
queueConfig: cfg,
incoming: make(chan struct{}, 1),
processSem: make(chan struct{}, cfg.maxConcurrency),
addSem: make(chan struct{}, cfg.maxConcurrency),
maybeAddSem: make(chan struct{}, cfg.maxConcurrency),
addLogN: log.Every(5 * time.Second),
}
bq.mu.replicas = map[roachpb.RangeID]*replicaItem{}

Expand Down Expand Up @@ -386,34 +400,43 @@ func (bq *baseQueue) Start(stopper *stop.Stopper) {
bq.processLoop(stopper)
}

// Add adds the specified replica to the queue, regardless of the
// AddAsync asynchronously adds the specified replica to the queue, regardless of the
// return value of bq.shouldQueue. The replica is added with specified
// priority. If the queue is too full, the replica may not be added,
// as the replica with the lowest priority will be dropped. Returns
// (true, nil) if the replica was added, (false, nil) if the replica
// was already present, and (false, err) if the replica could not be
// added for any other reason.
func (bq *baseQueue) Add(repl *Replica, priority float64) (bool, error) {
bq.mu.Lock()
defer bq.mu.Unlock()
ctx := repl.AnnotateCtx(bq.AnnotateCtx(context.TODO()))
return bq.addInternalLocked(ctx, repl.Desc(), true, priority)
func (bq *baseQueue) AddAsync(ctx context.Context, repl *Replica, priority float64) {
opName := "add-" + bq.name
if err := bq.store.stopper.RunLimitedAsyncTask(ctx, opName,
bq.addSem, false, /* wait */
func(ctx context.Context) {
_, _ = bq.addInternal(ctx, repl.Desc(), true, priority)
}); err != nil && bq.addLogN.ShouldLog() {

log.Infof(ctx, "rate limited in %s: %s", opName, err)
}
}

// MaybeAdd adds the specified replica if bq.shouldQueue specifies it
// should be queued. Replicas are added to the queue using the priority
// returned by bq.shouldQueue. If the queue is too full, the replica may
// not be added, as the replica with the lowest priority will be
// dropped.
func (bq *baseQueue) MaybeAdd(repl *Replica, now hlc.Timestamp) {
ctx := repl.AnnotateCtx(bq.AnnotateCtx(context.TODO()))

bq.mu.Lock()
bq.maybeAddLocked(ctx, repl, now)
bq.mu.Unlock()
// MaybeAddAsync asynchronously adds the specified replica if bq.shouldQueue
// specifies it should be queued. Replicas are added to the queue using the
// priority returned by bq.shouldQueue. If the queue is too full, the replica
// may not be added, as the replica with the lowest priority will be dropped.
func (bq *baseQueue) MaybeAddAsync(ctx context.Context, repl *Replica, now hlc.Timestamp) {
opName := "maybeadd-" + bq.name
if err := bq.store.stopper.RunLimitedAsyncTask(
ctx, opName,
bq.maybeAddSem, false, /* wait */
func(ctx context.Context) {
bq.maybeAdd(ctx, repl, now)
}); err != nil && bq.addLogN.ShouldLog() {

log.Infof(ctx, "rate limited in %s: %s", opName, err)
}
}

func (bq *baseQueue) maybeAddLocked(ctx context.Context, repl *Replica, now hlc.Timestamp) {
func (bq *baseQueue) maybeAdd(ctx context.Context, repl *Replica, now hlc.Timestamp) {
// Load the system config if it's needed.
var cfg *config.SystemConfig
if bq.needsSystemConfig {
Expand All @@ -426,7 +449,11 @@ func (bq *baseQueue) maybeAddLocked(ctx context.Context, repl *Replica, now hlc.
}
}

if bq.mu.stopped || bq.mu.disabled {
bq.mu.Lock()
stopped := bq.mu.stopped || bq.mu.disabled
bq.mu.Unlock()

if stopped {
return
}

Expand Down Expand Up @@ -460,7 +487,7 @@ func (bq *baseQueue) maybeAddLocked(ctx context.Context, repl *Replica, now hlc.
}

should, priority := bq.impl.shouldQueue(ctx, now, repl, cfg)
if _, err := bq.addInternalLocked(ctx, repl.Desc(), should, priority); !isExpectedQueueError(err) {
if _, err := bq.addInternal(ctx, repl.Desc(), should, priority); !isExpectedQueueError(err) {
log.Errorf(ctx, "unable to add: %s", err)
}
}
Expand All @@ -479,12 +506,23 @@ func (bq *baseQueue) requiresSplit(cfg *config.SystemConfig, repl *Replica) bool
return cfg.NeedsSplit(desc.StartKey, desc.EndKey)
}

// addInternalLocked adds the replica the queue with specified priority. If
// addInternal adds the replica the queue with specified priority. If
// the replica is already queued at a lower priority, updates the existing
// priority. Expects the queue lock to be held by caller.
func (bq *baseQueue) addInternalLocked(
func (bq *baseQueue) addInternal(
ctx context.Context, desc *roachpb.RangeDescriptor, should bool, priority float64,
) (bool, error) {
// NB: this is intentionally outside of bq.mu to avoid having to consider
// lock ordering constraints.
if !desc.IsInitialized() {
// We checked this above in MaybeAdd(), but we need to check it
// again for Add().
return false, errors.New("replica not initialized")
}

bq.mu.Lock()
defer bq.mu.Unlock()

if bq.mu.stopped {
return false, errQueueStopped
}
Expand All @@ -496,12 +534,6 @@ func (bq *baseQueue) addInternalLocked(
return false, errQueueDisabled
}

if !desc.IsInitialized() {
// We checked this above in MaybeAdd(), but we need to check it
// again for Add().
return false, errors.New("replica not initialized")
}

// If the replica is currently in purgatory, don't re-add it.
if _, ok := bq.mu.purgatory[desc.RangeID]; ok {
return false, nil
Expand Down Expand Up @@ -793,15 +825,18 @@ func (bq *baseQueue) finishProcessingReplica(
ctx context.Context, stopper *stop.Stopper, repl *Replica, err error,
) {
bq.mu.Lock()
defer bq.mu.Unlock()

// Remove item from replica set completely. We may add it
// back in down below.
item := bq.mu.replicas[repl.RangeID]
callbacks := item.callbacks
requeue := item.requeue
item.callbacks = nil
bq.removeFromReplicaSetLocked(repl.RangeID)
item = nil // prevent accidental use below
bq.mu.Unlock()

// Call any registered callbacks.
for _, cb := range item.callbacks {
for _, cb := range callbacks {
cb(err)
}

Expand All @@ -820,7 +855,9 @@ func (bq *baseQueue) finishProcessingReplica(
// scheduled to be requeued, we ignore this if we add the replica to
// purgatory.
if purgErr, ok := isPurgatoryError(err); ok {
bq.mu.Lock()
bq.addToPurgatoryLocked(ctx, stopper, repl, purgErr)
bq.mu.Unlock()
return
}

Expand All @@ -831,8 +868,8 @@ func (bq *baseQueue) finishProcessingReplica(
}

// Maybe add replica back into queue, if requested.
if item.requeue {
bq.maybeAddLocked(ctx, repl, bq.store.Clock().Now())
if requeue {
bq.maybeAdd(ctx, repl, bq.store.Clock().Now())
}
}

Expand All @@ -841,6 +878,8 @@ func (bq *baseQueue) finishProcessingReplica(
func (bq *baseQueue) addToPurgatoryLocked(
ctx context.Context, stopper *stop.Stopper, repl *Replica, purgErr purgatoryError,
) {
bq.mu.AssertHeld()

// Check whether the queue supports purgatory errors. If not then something
// went wrong because a purgatory error should not have ended up here.
if bq.impl.purgatoryChan() == nil {
Expand Down
141 changes: 141 additions & 0 deletions pkg/storage/queue_helpers_testutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright 2019 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package storage

import (
"context"

"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/pkg/errors"
)

// Code in this file is for testing usage only. It is exported only because it
// is called from outside of the package.

func (bq *baseQueue) testingAdd(
ctx context.Context, repl *Replica, priority float64,
) (bool, error) {
return bq.addInternal(ctx, repl.Desc(), true, priority)
}

func forceScanAndProcess(s *Store, q *baseQueue) error {
// Check that the system config is available. It is needed by many queues. If
// it's not available, some queues silently fail to process any replicas,
// which is undesirable for this method.
if cfg := s.Gossip().GetSystemConfig(); cfg == nil {
return errors.Errorf("system config not available in gossip")
}

newStoreReplicaVisitor(s).Visit(func(repl *Replica) bool {
q.maybeAdd(context.Background(), repl, s.cfg.Clock.Now())
return true
})

q.DrainQueue(s.stopper)
return nil
}

func mustForceScanAndProcess(ctx context.Context, s *Store, q *baseQueue) {
if err := forceScanAndProcess(s, q); err != nil {
log.Fatal(ctx, err)
}
}

// ForceReplicationScanAndProcess iterates over all ranges and
// enqueues any that need to be replicated.
func (s *Store) ForceReplicationScanAndProcess() error {
return forceScanAndProcess(s, s.replicateQueue.baseQueue)
}

// MustForceReplicaGCScanAndProcess iterates over all ranges and enqueues any that
// may need to be GC'd.
func (s *Store) MustForceReplicaGCScanAndProcess() {
mustForceScanAndProcess(context.TODO(), s, s.replicaGCQueue.baseQueue)
}

// MustForceMergeScanAndProcess iterates over all ranges and enqueues any that
// may need to be merged.
func (s *Store) MustForceMergeScanAndProcess() {
mustForceScanAndProcess(context.TODO(), s, s.mergeQueue.baseQueue)
}

// ForceSplitScanAndProcess iterates over all ranges and enqueues any that
// may need to be split.
func (s *Store) ForceSplitScanAndProcess() error {
return forceScanAndProcess(s, s.splitQueue.baseQueue)
}

// MustForceRaftLogScanAndProcess iterates over all ranges and enqueues any that
// need their raft logs truncated and then process each of them.
func (s *Store) MustForceRaftLogScanAndProcess() {
mustForceScanAndProcess(context.TODO(), s, s.raftLogQueue.baseQueue)
}

// ForceTimeSeriesMaintenanceQueueProcess iterates over all ranges, enqueuing
// any that need time series maintenance, then processes the time series
// maintenance queue.
func (s *Store) ForceTimeSeriesMaintenanceQueueProcess() error {
return forceScanAndProcess(s, s.tsMaintenanceQueue.baseQueue)
}

// ForceRaftSnapshotQueueProcess iterates over all ranges, enqueuing
// any that need raft snapshots, then processes the raft snapshot
// queue.
func (s *Store) ForceRaftSnapshotQueueProcess() error {
return forceScanAndProcess(s, s.raftSnapshotQueue.baseQueue)
}

// ForceConsistencyQueueProcess runs all the ranges through the consistency
// queue.
func (s *Store) ForceConsistencyQueueProcess() error {
return forceScanAndProcess(s, s.consistencyQueue.baseQueue)
}

// The methods below can be used to control a store's queues. Stopping a queue
// is only meant to happen in tests.

func (s *Store) setGCQueueActive(active bool) {
s.gcQueue.SetDisabled(!active)
}
func (s *Store) setMergeQueueActive(active bool) {
s.mergeQueue.SetDisabled(!active)
}
func (s *Store) setRaftLogQueueActive(active bool) {
s.raftLogQueue.SetDisabled(!active)
}
func (s *Store) setReplicaGCQueueActive(active bool) {
s.replicaGCQueue.SetDisabled(!active)
}

// SetReplicateQueueActive controls the replication queue. Only
// intended for tests.
func (s *Store) SetReplicateQueueActive(active bool) {
s.replicateQueue.SetDisabled(!active)
}
func (s *Store) setSplitQueueActive(active bool) {
s.splitQueue.SetDisabled(!active)
}
func (s *Store) setTimeSeriesMaintenanceQueueActive(active bool) {
s.tsMaintenanceQueue.SetDisabled(!active)
}
func (s *Store) setRaftSnapshotQueueActive(active bool) {
s.raftSnapshotQueue.SetDisabled(!active)
}
func (s *Store) setConsistencyQueueActive(active bool) {
s.consistencyQueue.SetDisabled(!active)
}
func (s *Store) setScannerActive(active bool) {
s.scanner.SetDisabled(!active)
}
Loading

0 comments on commit f8be509

Please sign in to comment.