diff --git a/pkg/kv/kvserver/replica_application_result.go b/pkg/kv/kvserver/replica_application_result.go index 0d7ad110f08c..98ce1bf6711e 100644 --- a/pkg/kv/kvserver/replica_application_result.go +++ b/pkg/kv/kvserver/replica_application_result.go @@ -338,10 +338,6 @@ func (r *Replica) handleChangeReplicasResult( if err := r.store.removeInitializedReplicaRaftMuLocked(ctx, r, chng.NextReplicaID(), RemoveOptions{ // We destroyed the data when the batch committed so don't destroy it again. DestroyData: false, - // In order to detect the GC queue racing with other causes of replica removal - // the store will no-op when removing a replica which is already marked as removed - // unless we set ignoreDestroyStatus to true. - ignoreDestroyStatus: true, }); err != nil { log.Fatalf(ctx, "failed to remove replica: %v", err) } diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index 3972a4426ca9..4c916afb4caf 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -652,6 +652,15 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch( // its unlock method in cmd.splitMergeUnlock. rhsRepl.raftMu.AssertHeld() + // We mark the replica as destroyed so that new commands are not + // accepted. This destroy status will be detected after the batch + // commits by handleMergeResult() to finish the removal. + rhsRepl.mu.Lock() + rhsRepl.mu.destroyStatus.Set( + roachpb.NewRangeNotFoundError(rhsRepl.RangeID, rhsRepl.store.StoreID()), + destroyReasonRemoved) + rhsRepl.mu.Unlock() + // Use math.MaxInt32 (mergedTombstoneReplicaID) as the nextReplicaID as an // extra safeguard against creating new replicas of the RHS. This isn't // required for correctness, since the merge protocol should guarantee that @@ -726,11 +735,11 @@ func (b *replicaAppBatch) runPreApplyTriggersAfterStagingWriteBatch( !b.r.store.TestingKnobs().DisableEagerReplicaRemoval { // We mark the replica as destroyed so that new commands are not - // accepted. This destroy status will be detected after the batch commits - // by Replica.handleChangeReplicasTrigger() to finish the removal. + // accepted. This destroy status will be detected after the batch + // commits by handleChangeReplicasResult() to finish the removal. // - // NB: we must be holding the raftMu here because we're in the - // midst of application. + // NB: we must be holding the raftMu here because we're in the midst of + // application. b.r.mu.Lock() b.r.mu.destroyStatus.Set( roachpb.NewRangeNotFoundError(b.r.RangeID, b.r.store.StoreID()), diff --git a/pkg/kv/kvserver/replica_destroy.go b/pkg/kv/kvserver/replica_destroy.go index 0cc9f265f045..3754466b0ac7 100644 --- a/pkg/kv/kvserver/replica_destroy.go +++ b/pkg/kv/kvserver/replica_destroy.go @@ -80,7 +80,17 @@ func (r *Replica) preDestroyRaftMuLocked( clearRangeIDLocalOnly bool, mustUseClearRange bool, ) error { - desc := r.Desc() + r.mu.RLock() + desc := r.descRLocked() + removed := r.mu.destroyStatus.Removed() + r.mu.RUnlock() + + // The replica must be marked as destroyed before its data is removed. If + // not, we risk new commands being accepted and observing the missing data. + if !removed { + log.Fatalf(ctx, "replica not marked as destroyed before call to preDestroyRaftMuLocked: %v", r) + } + err := clearRangeData(desc, reader, writer, clearRangeIDLocalOnly, mustUseClearRange) if err != nil { return err diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 696ecb003df9..b374840e3482 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -1567,17 +1567,6 @@ func (r *Replica) maybeAcquireSnapshotMergeLock( subsumedRepls = append(subsumedRepls, sRepl) endKey = sRepl.Desc().EndKey } - // TODO(benesch): we may be unnecessarily forcing another Raft snapshot here - // by subsuming too much. Consider the case where [a, b) and [c, e) first - // merged into [a, e), then split into [a, d) and [d, e), and we're applying a - // snapshot that spans this merge and split. The bounds of this snapshot will - // be [a, d), so we'll subsume [c, e). But we're still a member of [d, e)! - // We'll currently be forced to get a Raft snapshot to catch up. Ideally, we'd - // subsume only half of [c, e) and synthesize a new RHS [d, e), effectively - // applying both the split and merge during snapshot application. This isn't a - // huge deal, though: we're probably behind enough that the RHS would need to - // get caught up with a Raft snapshot anyway, even if we synthesized it - // properly. return subsumedRepls, func() { for _, sr := range subsumedRepls { sr.raftMu.Unlock() diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 5873dba9f67a..be6745aa982f 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -1032,6 +1032,15 @@ func (r *Replica) clearSubsumedReplicaDiskData( keyRanges := getKeyRanges(desc) totalKeyRanges := append([]rditer.KeyRange(nil), keyRanges[:]...) for _, sr := range subsumedRepls { + // We mark the replica as destroyed so that new commands are not + // accepted. This destroy status will be detected after the batch + // commits by clearSubsumedReplicaInMemoryData() to finish the removal. + sr.mu.Lock() + sr.mu.destroyStatus.Set( + roachpb.NewRangeNotFoundError(sr.RangeID, sr.store.StoreID()), + destroyReasonRemoved) + sr.mu.Unlock() + // We have to create an SST for the subsumed replica's range-id local keys. subsumedReplSSTFile := &storage.MemFile{} subsumedReplSST := storage.MakeIngestionSSTWriter(subsumedReplSSTFile) @@ -1146,7 +1155,8 @@ func (r *Replica) clearSubsumedReplicaInMemoryData( // replicas themselves is protected by their raftMus, which are held from // start to finish. if err := r.store.removeInitializedReplicaRaftMuLocked(ctx, sr, subsumedNextReplicaID, RemoveOptions{ - DestroyData: false, // data is already destroyed + // The data was already destroyed by clearSubsumedReplicaDiskData. + DestroyData: false, }); err != nil { return err } diff --git a/pkg/kv/kvserver/store_merge.go b/pkg/kv/kvserver/store_merge.go index 3a900d10b33c..de2c90694deb 100644 --- a/pkg/kv/kvserver/store_merge.go +++ b/pkg/kv/kvserver/store_merge.go @@ -51,7 +51,9 @@ func (s *Store) MergeRange( // call removeInitializedReplicaRaftMuLocked directly to avoid deadlocking // on the right-hand replica's raftMu. if err := s.removeInitializedReplicaRaftMuLocked(ctx, rightRepl, rightDesc.NextReplicaID, RemoveOptions{ - DestroyData: false, // the replica was destroyed when the merge commit applied + // The replica was destroyed by the tombstones added to the batch in + // runPreApplyTriggersAfterStagingWriteBatch. + DestroyData: false, }); err != nil { return errors.Errorf("cannot remove range: %s", err) } diff --git a/pkg/kv/kvserver/store_remove_replica.go b/pkg/kv/kvserver/store_remove_replica.go index 89d69a388e6f..968a0619a5cc 100644 --- a/pkg/kv/kvserver/store_remove_replica.go +++ b/pkg/kv/kvserver/store_remove_replica.go @@ -21,13 +21,8 @@ import ( // RemoveOptions bundles boolean parameters for Store.RemoveReplica. type RemoveOptions struct { + // If true, the replica's destroyStatus must be marked as removed. DestroyData bool - - // ignoreDestroyStatus allows a caller to instruct the store to remove - // replicas which are already marked as destroyed. This is helpful in cases - // where the caller knows that it set the destroy status and cannot have raced - // with another goroutine. See Replica.handleChangeReplicasResult(). - ignoreDestroyStatus bool } // RemoveReplica removes the replica from the store's replica map and from the @@ -78,10 +73,19 @@ func (s *Store) removeInitializedReplicaRaftMuLocked( { rep.mu.Lock() - // Detect if we were already removed. - if !opts.ignoreDestroyStatus && rep.mu.destroyStatus.Removed() { - rep.mu.Unlock() - return nil // already removed, noop + if opts.DestroyData { + // Detect if we were already removed. + if rep.mu.destroyStatus.Removed() { + rep.mu.Unlock() + return nil // already removed, noop + } + } else { + // If the caller doesn't want to destroy the data because it already + // has done so, then it must have already also set the destroyStatus. + if !rep.mu.destroyStatus.Removed() { + rep.mu.Unlock() + log.Fatalf(ctx, "replica not marked as destroyed but data already destroyed: %v", rep) + } } desc = rep.mu.state.Desc @@ -96,8 +100,7 @@ func (s *Store) removeInitializedReplicaRaftMuLocked( /// uninitialized. if !rep.isInitializedRLocked() { rep.mu.Unlock() - log.Fatalf(ctx, "uninitialized replica cannot be removed with removeInitializedReplica: %v", - rep) + log.Fatalf(ctx, "uninitialized replica cannot be removed with removeInitializedReplica: %v", rep) } // Mark the replica as removed before deleting data.