Skip to content

Commit

Permalink
kvserver: remove data race in onSpanConfigUpdate
Browse files Browse the repository at this point in the history
We weren't holding `Store.mu`.

This fixes boatloads of nightly issues that I am not going to list here,
but which should be linked against this PR when it merges.

Release note: None
  • Loading branch information
tbg committed Jan 18, 2022
1 parent e84001d commit 86399fa
Showing 1 changed file with 69 additions and 58 deletions.
127 changes: 69 additions & 58 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2309,69 +2309,80 @@ func (s *Store) onSpanConfigUpdate(ctx context.Context, updated roachpb.Span) {
return
}

now := s.cfg.Clock.NowAsClockTimestamp()
if err := s.mu.replicasByKey.VisitKeyRange(ctx, sp.Key, sp.EndKey, AscendingKeyOrder,
func(ctx context.Context, it replicaOrPlaceholder) error {
repl := it.repl
if repl == nil {
return nil // placeholder; ignore
}
// Collect the affected replicas separately, to avoid holding the Store
// mutex for longer than necessary.
var repls []*Replica
{
s.mu.RLock()
err := s.mu.replicasByKey.VisitKeyRange(
ctx, sp.Key, sp.EndKey, AscendingKeyOrder,
func(ctx context.Context, it replicaOrPlaceholder) error {
if it.repl != nil {
repls = append(repls, it.repl)
}
return nil
})
s.mu.RUnlock()
if err != nil {
// Errors here should not be possible, but if there is one, log loudly.
log.Errorf(ctx, "unexpected error visiting replicas: %v", err)
return
}
}

startKey := repl.Desc().StartKey
if !sp.ContainsKey(startKey) {
// It's possible that the update we're receiving here is the
// right-hand side of a span config getting split. Think of
// installing a zone config on some partition of an index where
// previously there was none on any of the partitions. The range
// spanning the entire index would have to split on the
// partition boundary, and before it does so, it's possible that
// it would receive a span config update for just the partition.
//
// To avoid clobbering the pre-split range's embedded span
// config with the partition's config, we'll ensure that the
// range's start key is part of the update. We don't have to
// enqueue the range in the split queue here, that takes place
// when processing the left-hand side span config update.

return nil // ignore
}
now := s.cfg.Clock.NowAsClockTimestamp()

// TODO(irfansharif): It's possible for a config to be applied over an
// entire range when it only pertains to the first half of the range.
// This will be corrected shortly -- we enqueue the range for a split
// below where we then apply the right config on each half. But still,
// it's surprising behavior and gets in the way of a desirable
// consistency guarantee: a key's config at any point in time is one
// that was explicitly declared over it, or the default config.
for _, repl := range repls {
startKey := repl.Desc().StartKey
if !sp.ContainsKey(startKey) {
// It's possible that the update we're receiving here is the
// right-hand side of a span config getting split. Think of
// installing a zone config on some partition of an index where
// previously there was none on any of the partitions. The range
// spanning the entire index would have to split on the
// partition boundary, and before it does so, it's possible that
// it would receive a span config update for just the partition.
//
// We can do better, we can skip applying the config entirely and
// enqueue the split, then relying on the split trigger to install
// the right configs on each half. The current structure is as it is
// to maintain parity with the system config span variant.
// To avoid clobbering the pre-split range's embedded span
// config with the partition's config, we'll ensure that the
// range's start key is part of the update. We don't have to
// enqueue the range in the split queue here, that takes place
// when processing the left-hand side span config update.

continue // ignore
}

// TODO(irfansharif): It's possible for a config to be applied over an
// entire range when it only pertains to the first half of the range.
// This will be corrected shortly -- we enqueue the range for a split
// below where we then apply the right config on each half. But still,
// it's surprising behavior and gets in the way of a desirable
// consistency guarantee: a key's config at any point in time is one
// that was explicitly declared over it, or the default config.
//
// We can do better, we can skip applying the config entirely and
// enqueue the split, then relying on the split trigger to install
// the right configs on each half. The current structure is as it is
// to maintain parity with the system config span variant.

replCtx := repl.AnnotateCtx(ctx)
conf, err := s.cfg.SpanConfigSubscriber.GetSpanConfigForKey(replCtx, startKey)
if err != nil {
log.Errorf(ctx, "skipped applying update, unexpected error reading from subscriber: %v", err)
return err
}
repl.SetSpanConfig(conf)
replCtx := repl.AnnotateCtx(ctx)
conf, err := s.cfg.SpanConfigSubscriber.GetSpanConfigForKey(replCtx, startKey)
if err != nil {
log.Errorf(ctx, "skipped applying update, unexpected error reading from subscriber: %v", err)
return
}
repl.SetSpanConfig(conf)

// TODO(irfansharif): For symmetry with the system config span variant,
// we queue blindly; we could instead only queue it if we knew the
// range's keyspans has a split in there somewhere, or was now part of a
// larger range and eligible for a merge.
s.splitQueue.Async(replCtx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) {
h.MaybeAdd(ctx, repl, now)
})
s.mergeQueue.Async(replCtx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) {
h.MaybeAdd(ctx, repl, now)
})
return nil // more
},
); err != nil {
// Errors here should not be possible, but if there is one, log loudly.
log.Errorf(ctx, "unexpected error visiting replicas: %v", err)
// TODO(irfansharif): For symmetry with the system config span variant,
// we queue blindly; we could instead only queue it if we knew the
// range's keyspans has a split in there somewhere, or was now part of a
// larger range and eligible for a merge.
s.splitQueue.Async(replCtx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) {
h.MaybeAdd(ctx, repl, now)
})
s.mergeQueue.Async(replCtx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) {
h.MaybeAdd(ctx, repl, now)
})
}
}

Expand Down

0 comments on commit 86399fa

Please sign in to comment.