diff --git a/pkg/keys/constants.go b/pkg/keys/constants.go index 7d1adb9ec861..4db431b498f6 100644 --- a/pkg/keys/constants.go +++ b/pkg/keys/constants.go @@ -87,10 +87,11 @@ var ( // LocalRangeAppliedStateSuffix is the suffix for the range applied state // key. LocalRangeAppliedStateSuffix = []byte("rask") - // LocalRaftTruncatedStateSuffix is the suffix for the + // This was previously used for the replicated RaftTruncatedState. It is no + // longer used and this key has been removed via a migration. See + // LocalRaftTruncatedStateSuffix for the corresponding unreplicated // RaftTruncatedState. - // Note: This suffix is also used for unreplicated Range-ID keys. - LocalRaftTruncatedStateSuffix = []byte("rftt") + _ = []byte("rftt") // LocalRangeLeaseSuffix is the suffix for a range lease. LocalRangeLeaseSuffix = []byte("rll-") // LocalRangePriorReadSummarySuffix is the suffix for a range's prior read @@ -122,6 +123,13 @@ var ( localRaftLastIndexSuffix = []byte("rfti") // LocalRaftLogSuffix is the suffix for the raft log. LocalRaftLogSuffix = []byte("rftl") + // LocalRaftReplicaIDSuffix is the suffix for the RaftReplicaID. This is + // written when the HardState for a particular ReplicaID is first written. + LocalRaftReplicaIDSuffix = []byte("rftr") + // LocalRaftTruncatedStateSuffix is the suffix for the unreplicated + // RaftTruncatedState. + LocalRaftTruncatedStateSuffix = []byte("rftt") + // LocalRangeLastReplicaGCTimestampSuffix is the suffix for a range's last // replica GC timestamp (for GC of old replicas). LocalRangeLastReplicaGCTimestampSuffix = []byte("rlrt") diff --git a/pkg/keys/doc.go b/pkg/keys/doc.go index 7a4248130ed7..47f5fc5dfc58 100644 --- a/pkg/keys/doc.go +++ b/pkg/keys/doc.go @@ -199,6 +199,7 @@ var _ = [...]interface{}{ RangeTombstoneKey, // "rftb" RaftHardStateKey, // "rfth" RaftLogKey, // "rftl" + RaftReplicaIDKey, // "rftr" RaftTruncatedStateKey, // "rftt" RangeLastReplicaGCTimestampKey, // "rlrt" diff --git a/pkg/keys/keys.go b/pkg/keys/keys.go index de6eae1dcbb3..8e768cc8eafa 100644 --- a/pkg/keys/keys.go +++ b/pkg/keys/keys.go @@ -331,6 +331,11 @@ func RaftLogKey(rangeID roachpb.RangeID, logIndex uint64) roachpb.Key { return MakeRangeIDPrefixBuf(rangeID).RaftLogKey(logIndex) } +// RaftReplicaIDKey returns a system-local key for a RaftReplicaID. +func RaftReplicaIDKey(rangeID roachpb.RangeID) roachpb.Key { + return MakeRangeIDPrefixBuf(rangeID).RaftReplicaIDKey() +} + // RangeLastReplicaGCTimestampKey returns a range-local key for // the range's last replica GC timestamp. func RangeLastReplicaGCTimestampKey(rangeID roachpb.RangeID) roachpb.Key { @@ -1007,6 +1012,11 @@ func (b RangeIDPrefixBuf) RaftLogKey(logIndex uint64) roachpb.Key { return encoding.EncodeUint64Ascending(b.RaftLogPrefix(), logIndex) } +// RaftReplicaIDKey returns a system-local key for a RaftReplicaID. +func (b RangeIDPrefixBuf) RaftReplicaIDKey() roachpb.Key { + return append(b.unreplicatedPrefix(), LocalRaftReplicaIDSuffix...) +} + // RangeLastReplicaGCTimestampKey returns a range-local key for // the range's last replica GC timestamp. func (b RangeIDPrefixBuf) RangeLastReplicaGCTimestampKey() roachpb.Key { diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index e71f71ec83e1..1502de6500b1 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -520,7 +520,12 @@ type Replica struct { // It will not change over the lifetime of this replica. If addressed under // a newer replicaID, the replica immediately replicaGCs itself to make // way for the newer incarnation. + // TODO(sumeer): since this is initialized in newUnloadedReplica and never + // changed, lift this out of the mu struct. replicaID roachpb.ReplicaID + // wroteReplicaID transitions once to true, when RaftReplicaID is written + // to the store. + wroteReplicaID bool // The minimum allowed ID for this replica. Initialized from // RangeTombstone.NextReplicaID. tombstoneMinReplicaID roachpb.ReplicaID @@ -710,9 +715,11 @@ func (r *Replica) SafeFormat(w redact.SafePrinter, _ rune) { r.store.Ident.NodeID, r.store.Ident.StoreID, r.rangeStr.get()) } -// ReplicaID returns the ID for the Replica. It may be zero if the replica does -// not know its ID. Once a Replica has a non-zero ReplicaID it will never change. +// ReplicaID returns the ID for the Replica. This value is fixed for the +// lifetime of the Replica. func (r *Replica) ReplicaID() roachpb.ReplicaID { + // The locking of mu is unnecessary. It will be removed when we lift + // replicaID out of the mu struct. r.mu.RLock() defer r.mu.RUnlock() return r.mu.replicaID diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index f2680ff5b043..01a712a8e3d1 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -583,6 +583,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( return unquiesceAndWakeLeader, nil }) r.mu.applyingEntries = len(rd.CommittedEntries) > 0 + alreadyWroteReplicaID := r.mu.wroteReplicaID r.mu.Unlock() if errors.Is(err, errRemoved) { // If we've been removed then just return. @@ -796,6 +797,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( return stats, expl, errors.Wrap(err, expl) } } + wroteReplicaID := false if !raft.IsEmptyHardState(rd.HardState) { if !r.IsInitialized() && rd.HardState.Commit != 0 { log.Fatalf(ctx, "setting non-zero HardState.Commit on uninitialized replica %s. HS=%+v", r, rd.HardState) @@ -812,6 +814,15 @@ func (r *Replica) handleRaftReadyRaftMuLocked( const expl = "during setHardState" return stats, expl, errors.Wrap(err, expl) } + // It is possible that we have set HardState for the first time, in which + // case alreadyWroteReplicaID will be false. + if !alreadyWroteReplicaID { + if err := r.raftMu.stateLoader.SetRaftReplicaID(ctx, batch, r.ReplicaID()); err != nil { + const expl = "during setRaftReplicaID" + return stats, expl, errors.Wrap(err, expl) + } + wroteReplicaID = true + } } // Synchronously commit the batch with the Raft log entries and Raft hard // state as we're promising not to lose this data. @@ -870,6 +881,9 @@ func (r *Replica) handleRaftReadyRaftMuLocked( // previously the leader. becameLeader = r.mu.leaderID == r.mu.replicaID } + if wroteReplicaID { + r.mu.wroteReplicaID = wroteReplicaID + } r.mu.Unlock() // When becoming the leader, proactively add the replica to the replicate diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index ea5711c7bd14..a557cb43cc7d 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -859,10 +859,21 @@ func (r *Replica) applySnapshot( return errors.Wrapf(err, "error clearing range of unreplicated SST writer") } + // The HardState and RaftReplicaID should typically already exist for this + // replica, unless this snapshot application is the first time raft.Ready is + // being processed. In that case we must write the RaftReplicaID so that it + // shares the same lifetime as the HardState. Additionally, we've cleared + // all the raft state above, so we are forced to write the RaftReplicaID + // here regardless of what happened before. + // // Update HardState. if err := r.raftMu.stateLoader.SetHardState(ctx, &unreplicatedSST, hs); err != nil { return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer") } + if err := r.raftMu.stateLoader.SetRaftReplicaID( + ctx, &unreplicatedSST, r.mu.replicaID); err != nil { + return errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer") + } // Update Raft entries. r.store.raftEntryCache.Drop(r.RangeID) @@ -989,6 +1000,9 @@ func (r *Replica) applySnapshot( // Snapshots typically have fewer log entries than the leaseholder. The next // time we hold the lease, recompute the log size before making decisions. r.mu.raftLogSizeTrusted = false + // RaftReplicaID is definitely written due to the earlier logic in this + // function. + r.mu.wroteReplicaID = true // Invoke the leasePostApply method to ensure we properly initialize the // replica according to whether it holds the lease. We allow jumps in the diff --git a/pkg/kv/kvserver/stateloader/initial.go b/pkg/kv/kvserver/stateloader/initial.go index f1688147a310..3ddc0d750226 100644 --- a/pkg/kv/kvserver/stateloader/initial.go +++ b/pkg/kv/kvserver/stateloader/initial.go @@ -96,6 +96,7 @@ func WriteInitialRangeState( ctx context.Context, readWriter storage.ReadWriter, desc roachpb.RangeDescriptor, + replicaID roachpb.ReplicaID, replicaVersion roachpb.Version, ) error { initialLease := roachpb.Lease{} @@ -108,7 +109,15 @@ func WriteInitialRangeState( ); err != nil { return err } - if err := Make(desc.RangeID).SynthesizeRaftState(ctx, readWriter); err != nil { + sl := Make(desc.RangeID) + if err := sl.SynthesizeRaftState(ctx, readWriter); err != nil { + return err + } + // It is inconvenient that we cannot set Replica.mu.wroteReplicaID=true + // since we don't have a Replica object yet. This is harmless since we will + // just write the RaftReplicaID again later. The invariant that HardState + // and RaftReplicaID both exist in the store is not being violated. + if err := sl.SetRaftReplicaID(ctx, readWriter, replicaID); err != nil { return err } return nil diff --git a/pkg/kv/kvserver/stateloader/stateloader.go b/pkg/kv/kvserver/stateloader/stateloader.go index 2ab3273f5320..5790fdd8bed1 100644 --- a/pkg/kv/kvserver/stateloader/stateloader.go +++ b/pkg/kv/kvserver/stateloader/stateloader.go @@ -434,3 +434,20 @@ func (rsl StateLoader) SynthesizeHardState( err := rsl.SetHardState(ctx, readWriter, newHS) return errors.Wrapf(err, "writing HardState %+v", &newHS) } + +// SetRaftReplicaID overwrites the RaftReplicaID. +func (rsl StateLoader) SetRaftReplicaID( + ctx context.Context, writer storage.Writer, replicaID roachpb.ReplicaID, +) error { + rid := roachpb.RaftReplicaID{ReplicaID: replicaID} + // "Blind" because ms == nil and timestamp.IsEmpty(). + return storage.MVCCBlindPutProto( + ctx, + writer, + nil, /* ms */ + rsl.RaftReplicaIDKey(), + hlc.Timestamp{}, /* timestamp */ + &rid, + nil, /* txn */ + ) +} diff --git a/pkg/kv/kvserver/store_create_replica.go b/pkg/kv/kvserver/store_create_replica.go index 81f05d939837..fc2c413a7cda 100644 --- a/pkg/kv/kvserver/store_create_replica.go +++ b/pkg/kv/kvserver/store_create_replica.go @@ -146,6 +146,25 @@ func (s *Store) tryGetOrCreateReplica( } else if ok && replicaID != 0 && replicaID < tombstone.NextReplicaID { return nil, false, &roachpb.RaftGroupDeletedError{} } + // It is possible that we have already created the HardState for an + // uninitialized replica, then crashed, and on recovery are receiving a raft + // message for the same or later replica. In either case we will create a + // Replica with Replica.mu.wroteReplicaID=false, and will eventually write + // the HardState and RaftReplicaID to the correct value. However, in the + // latter case there is some time interval during which the Replica object + // has a newer ReplicaID than the one in the store, and a stored HardState + // that is for the older ReplicaID. This seems harmless, but to be more + // precise about the invariants we should remove the stale + // persistent state here. + // + // TODO(sumeer): once we have RaftReplicaID being populated for all replicas + // and we have purged replicas that don't populate it, we can read the + // (HardState,RaftReplicaID) here and find one of the following cases: + // - HardState exists, RaftReplicaID not exists: must be a purged replica so + // we can delete HardState. + // - HardState exists, RaftReplicaID exists: if the latter is old, remove + // both HardState and RaftReplicaID. + // - Neither exits: nothing to do. // Create a new replica and lock it for raft processing. uninitializedDesc := &roachpb.RangeDescriptor{ diff --git a/pkg/kv/kvserver/store_init.go b/pkg/kv/kvserver/store_init.go index 33aac9a89072..321c8fc21ac1 100644 --- a/pkg/kv/kvserver/store_init.go +++ b/pkg/kv/kvserver/store_init.go @@ -173,11 +173,12 @@ func WriteInitialClusterData( EndKey: endKey, NextReplicaID: 2, } + const firstReplicaID = 1 replicas := []roachpb.ReplicaDescriptor{ { NodeID: FirstNodeID, StoreID: FirstStoreID, - ReplicaID: 1, + ReplicaID: firstReplicaID, }, } desc.SetReplicas(roachpb.MakeReplicaSet(replicas)) @@ -244,7 +245,8 @@ func WriteInitialClusterData( } } - if err := stateloader.WriteInitialRangeState(ctx, batch, *desc, initialReplicaVersion); err != nil { + if err := stateloader.WriteInitialRangeState( + ctx, batch, *desc, firstReplicaID, initialReplicaVersion); err != nil { return err } computedStats, err := rditer.ComputeStatsForRange(desc, batch, now.WallTime) diff --git a/pkg/kv/kvserver/store_split.go b/pkg/kv/kvserver/store_split.go index 2294ca8ecd90..5316b3fb242a 100644 --- a/pkg/kv/kvserver/store_split.go +++ b/pkg/kv/kvserver/store_split.go @@ -42,7 +42,7 @@ func splitPreApply( // // The exception to that is if the DisableEagerReplicaRemoval testing flag is // enabled. - _, hasRightDesc := split.RightDesc.GetReplicaDescriptor(r.StoreID()) + rightDesc, hasRightDesc := split.RightDesc.GetReplicaDescriptor(r.StoreID()) _, hasLeftDesc := split.LeftDesc.GetReplicaDescriptor(r.StoreID()) if !hasRightDesc || !hasLeftDesc { log.Fatalf(ctx, "cannot process split on s%s which does not exist in the split: %+v", @@ -100,9 +100,18 @@ func splitPreApply( log.Fatalf(ctx, "failed to clear range data for removed rhs: %v", err) } if rightRepl != nil { + // Cleared the HardState and RaftReplicaID, so rewrite them to the + // current values. + // TODO(sumeer): we know HardState.Commit cannot advance since the RHS + // cannot apply a snapshot yet. But what prevents a concurrent change to + // HardState.{Term,Vote} that we would accidentally undo here. if err := rightRepl.raftMu.stateLoader.SetHardState(ctx, readWriter, hs); err != nil { log.Fatalf(ctx, "failed to set hard state with 0 commit index for removed rhs: %v", err) } + if err := rightRepl.raftMu.stateLoader.SetRaftReplicaID( + ctx, readWriter, rightRepl.ReplicaID()); err != nil { + log.Fatalf(ctx, "failed to set RaftReplicaID for removed rhs: %v", err) + } } return } @@ -114,7 +123,10 @@ func splitPreApply( if err := rsl.SynthesizeRaftState(ctx, readWriter); err != nil { log.Fatalf(ctx, "%v", err) } - + // Write the RaftReplicaID for the RHS. + if err := rsl.SetRaftReplicaID(ctx, readWriter, rightDesc.ReplicaID); err != nil { + log.Fatalf(ctx, "%v", err) + } // Persist the closed timestamp. // // In order to tolerate a nil initClosedTS input, let's forward to @@ -161,6 +173,9 @@ func splitPostApply( log.Fatalf(ctx, "%s: found replica which is RHS of a split "+ "without a valid tenant ID", rightReplOrNil) } + rightReplOrNil.mu.Lock() + rightReplOrNil.mu.wroteReplicaID = true + rightReplOrNil.mu.Unlock() } now := r.store.Clock().NowAsClockTimestamp() diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 9f7d033cff05..17b6019248ea 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -2633,7 +2633,7 @@ func TestStoreRemovePlaceholderOnRaftIgnored(t *testing.T) { uninitDesc := roachpb.RangeDescriptor{RangeID: repl1.Desc().RangeID} if err := stateloader.WriteInitialRangeState( - ctx, s.Engine(), uninitDesc, roachpb.Version{}, + ctx, s.Engine(), uninitDesc, 2, roachpb.Version{}, ); err != nil { t.Fatal(err) } diff --git a/pkg/roachpb/internal_raft.proto b/pkg/roachpb/internal_raft.proto index 08d7a2322efc..2c9ac3d6e724 100644 --- a/pkg/roachpb/internal_raft.proto +++ b/pkg/roachpb/internal_raft.proto @@ -49,3 +49,10 @@ message RaftSnapshotData { repeated bytes log_entries = 3; reserved 1; } + +message RaftReplicaID { + option (gogoproto.equal) = true; + // ReplicaID is the ID of the replica with the corresponding HardState. + optional int32 replica_id = 1 [(gogoproto.nullable) = false, + (gogoproto.customname) = "ReplicaID", (gogoproto.casttype) = "ReplicaID"]; +}