Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: set destroy status before destroying data on subsumed replicas #55477

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions pkg/kv/kvserver/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,10 +338,6 @@ func (r *Replica) handleChangeReplicasResult(
if err := r.store.removeInitializedReplicaRaftMuLocked(ctx, r, chng.NextReplicaID(), RemoveOptions{
// We destroyed the data when the batch committed so don't destroy it again.
DestroyData: false,
// In order to detect the GC queue racing with other causes of replica removal
// the store will no-op when removing a replica which is already marked as removed
// unless we set ignoreDestroyStatus to true.
ignoreDestroyStatus: true,
}); err != nil {
log.Fatalf(ctx, "failed to remove replica: %v", err)
}
Expand Down
17 changes: 13 additions & 4 deletions pkg/kv/kvserver/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,15 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch(
// its unlock method in cmd.splitMergeUnlock.
rhsRepl.raftMu.AssertHeld()

// We mark the replica as destroyed so that new commands are not
// accepted. This destroy status will be detected after the batch
// commits by handleMergeResult() to finish the removal.
rhsRepl.mu.Lock()
rhsRepl.mu.destroyStatus.Set(
roachpb.NewRangeNotFoundError(rhsRepl.RangeID, rhsRepl.store.StoreID()),
destroyReasonRemoved)
rhsRepl.mu.Unlock()

// Use math.MaxInt32 (mergedTombstoneReplicaID) as the nextReplicaID as an
// extra safeguard against creating new replicas of the RHS. This isn't
// required for correctness, since the merge protocol should guarantee that
Expand Down Expand Up @@ -726,11 +735,11 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch(
!b.r.store.TestingKnobs().DisableEagerReplicaRemoval {

// We mark the replica as destroyed so that new commands are not
// accepted. This destroy status will be detected after the batch commits
// by Replica.handleChangeReplicasTrigger() to finish the removal.
// accepted. This destroy status will be detected after the batch
// commits by handleChangeReplicasResult() to finish the removal.
//
// NB: we must be holding the raftMu here because we're in the
// midst of application.
// NB: we must be holding the raftMu here because we're in the midst of
// application.
b.r.mu.Lock()
b.r.mu.destroyStatus.Set(
roachpb.NewRangeNotFoundError(b.r.RangeID, b.r.store.StoreID()),
Expand Down
12 changes: 11 additions & 1 deletion pkg/kv/kvserver/replica_destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,17 @@ func (r *Replica) preDestroyRaftMuLocked(
clearRangeIDLocalOnly bool,
mustUseClearRange bool,
) error {
desc := r.Desc()
r.mu.RLock()
desc := r.descRLocked()
removed := r.mu.destroyStatus.Removed()
r.mu.RUnlock()

// The replica must be marked as destroyed before its data is removed. If
// not, we risk new commands being accepted and observing the missing data.
if !removed {
log.Fatalf(ctx, "replica not marked as destroyed before call to preDestroyRaftMuLocked: %v", r)
}

err := clearRangeData(desc, reader, writer, clearRangeIDLocalOnly, mustUseClearRange)
if err != nil {
return err
Expand Down
11 changes: 0 additions & 11 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1567,17 +1567,6 @@ func (r *Replica) maybeAcquireSnapshotMergeLock(
subsumedRepls = append(subsumedRepls, sRepl)
endKey = sRepl.Desc().EndKey
}
// TODO(benesch): we may be unnecessarily forcing another Raft snapshot here
// by subsuming too much. Consider the case where [a, b) and [c, e) first
// merged into [a, e), then split into [a, d) and [d, e), and we're applying a
// snapshot that spans this merge and split. The bounds of this snapshot will
// be [a, d), so we'll subsume [c, e). But we're still a member of [d, e)!
// We'll currently be forced to get a Raft snapshot to catch up. Ideally, we'd
// subsume only half of [c, e) and synthesize a new RHS [d, e), effectively
// applying both the split and merge during snapshot application. This isn't a
// huge deal, though: we're probably behind enough that the RHS would need to
// get caught up with a Raft snapshot anyway, even if we synthesized it
// properly.
return subsumedRepls, func() {
for _, sr := range subsumedRepls {
sr.raftMu.Unlock()
Expand Down
12 changes: 11 additions & 1 deletion pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,15 @@ func (r *Replica) clearSubsumedReplicaDiskData(
keyRanges := getKeyRanges(desc)
totalKeyRanges := append([]rditer.KeyRange(nil), keyRanges[:]...)
for _, sr := range subsumedRepls {
// We mark the replica as destroyed so that new commands are not
// accepted. This destroy status will be detected after the batch
// commits by clearSubsumedReplicaInMemoryData() to finish the removal.
sr.mu.Lock()
sr.mu.destroyStatus.Set(
roachpb.NewRangeNotFoundError(sr.RangeID, sr.store.StoreID()),
destroyReasonRemoved)
sr.mu.Unlock()

// We have to create an SST for the subsumed replica's range-id local keys.
subsumedReplSSTFile := &storage.MemFile{}
subsumedReplSST := storage.MakeIngestionSSTWriter(subsumedReplSSTFile)
Expand Down Expand Up @@ -1146,7 +1155,8 @@ func (r *Replica) clearSubsumedReplicaInMemoryData(
// replicas themselves is protected by their raftMus, which are held from
// start to finish.
if err := r.store.removeInitializedReplicaRaftMuLocked(ctx, sr, subsumedNextReplicaID, RemoveOptions{
DestroyData: false, // data is already destroyed
// The data was already destroyed by clearSubsumedReplicaDiskData.
DestroyData: false,
}); err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/store_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ func (s *Store) MergeRange(
// call removeInitializedReplicaRaftMuLocked directly to avoid deadlocking
// on the right-hand replica's raftMu.
if err := s.removeInitializedReplicaRaftMuLocked(ctx, rightRepl, rightDesc.NextReplicaID, RemoveOptions{
DestroyData: false, // the replica was destroyed when the merge commit applied
// The replica was destroyed by the tombstones added to the batch in
// runPreApplyTriggersAfterStagingWriteBatch.
DestroyData: false,
}); err != nil {
return errors.Errorf("cannot remove range: %s", err)
}
Expand Down
27 changes: 15 additions & 12 deletions pkg/kv/kvserver/store_remove_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,8 @@ import (

// RemoveOptions bundles boolean parameters for Store.RemoveReplica.
type RemoveOptions struct {
// If true, the replica's destroyStatus must be marked as removed.
DestroyData bool

// ignoreDestroyStatus allows a caller to instruct the store to remove
// replicas which are already marked as destroyed. This is helpful in cases
// where the caller knows that it set the destroy status and cannot have raced
// with another goroutine. See Replica.handleChangeReplicasResult().
ignoreDestroyStatus bool
}

// RemoveReplica removes the replica from the store's replica map and from the
Expand Down Expand Up @@ -78,10 +73,19 @@ func (s *Store) removeInitializedReplicaRaftMuLocked(
{
rep.mu.Lock()

// Detect if we were already removed.
if !opts.ignoreDestroyStatus && rep.mu.destroyStatus.Removed() {
rep.mu.Unlock()
return nil // already removed, noop
if opts.DestroyData {
// Detect if we were already removed.
if rep.mu.destroyStatus.Removed() {
rep.mu.Unlock()
return nil // already removed, noop
}
} else {
// If the caller doesn't want to destroy the data because it already
// has done so, then it must have already also set the destroyStatus.
if !rep.mu.destroyStatus.Removed() {
rep.mu.Unlock()
log.Fatalf(ctx, "replica not marked as destroyed but data already destroyed: %v", rep)
}
}

desc = rep.mu.state.Desc
Expand All @@ -96,8 +100,7 @@ func (s *Store) removeInitializedReplicaRaftMuLocked(
/// uninitialized.
if !rep.isInitializedRLocked() {
rep.mu.Unlock()
log.Fatalf(ctx, "uninitialized replica cannot be removed with removeInitializedReplica: %v",
rep)
log.Fatalf(ctx, "uninitialized replica cannot be removed with removeInitializedReplica: %v", rep)
}

// Mark the replica as removed before deleting data.
Expand Down