Skip to content

Commit

Permalink
keys,kvserver: introduce RaftReplicaID
Browse files Browse the repository at this point in the history
The RaftReplicaIDKey is an unreplicated range-id local key that
contains the ReplicaID of the replica whose HardState is represented
in the RaftHardStateKey. These two keys share the same lifetime,
and are removed atomically when we clear the range-id local keys
for a replica.

We currently do not utilize this information on node restart
to figure out whether we should cleanup stale uninitialized replicas.
Doing such cleanup can wait until we implement and start using
ReplicasStorage. The change here is meant to set us up to rely
on RaftReplicaID from the next release onwards.

Informs #75740

Release note: None
  • Loading branch information
sumeerbhola committed Feb 3, 2022
1 parent 61e2304 commit bcf42cc
Show file tree
Hide file tree
Showing 13 changed files with 134 additions and 11 deletions.
14 changes: 11 additions & 3 deletions pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,11 @@ var (
// LocalRangeAppliedStateSuffix is the suffix for the range applied state
// key.
LocalRangeAppliedStateSuffix = []byte("rask")
// LocalRaftTruncatedStateSuffix is the suffix for the
// This was previously used for the replicated RaftTruncatedState. It is no
// longer used and this key has been removed via a migration. See
// LocalRaftTruncatedStateSuffix for the corresponding unreplicated
// RaftTruncatedState.
// Note: This suffix is also used for unreplicated Range-ID keys.
LocalRaftTruncatedStateSuffix = []byte("rftt")
_ = []byte("rftt")
// LocalRangeLeaseSuffix is the suffix for a range lease.
LocalRangeLeaseSuffix = []byte("rll-")
// LocalRangePriorReadSummarySuffix is the suffix for a range's prior read
Expand Down Expand Up @@ -122,6 +123,13 @@ var (
localRaftLastIndexSuffix = []byte("rfti")
// LocalRaftLogSuffix is the suffix for the raft log.
LocalRaftLogSuffix = []byte("rftl")
// LocalRaftReplicaIDSuffix is the suffix for the RaftReplicaID. This is
// written when the HardState for a particular ReplicaID is first written.
LocalRaftReplicaIDSuffix = []byte("rftr")
// LocalRaftTruncatedStateSuffix is the suffix for the unreplicated
// RaftTruncatedState.
LocalRaftTruncatedStateSuffix = []byte("rftt")

// LocalRangeLastReplicaGCTimestampSuffix is the suffix for a range's last
// replica GC timestamp (for GC of old replicas).
LocalRangeLastReplicaGCTimestampSuffix = []byte("rlrt")
Expand Down
1 change: 1 addition & 0 deletions pkg/keys/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ var _ = [...]interface{}{
RangeTombstoneKey, // "rftb"
RaftHardStateKey, // "rfth"
RaftLogKey, // "rftl"
RaftReplicaIDKey, // "rftr"
RaftTruncatedStateKey, // "rftt"
RangeLastReplicaGCTimestampKey, // "rlrt"

Expand Down
10 changes: 10 additions & 0 deletions pkg/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,11 @@ func RaftLogKey(rangeID roachpb.RangeID, logIndex uint64) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RaftLogKey(logIndex)
}

// RaftReplicaIDKey returns a system-local key for a RaftReplicaID.
func RaftReplicaIDKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RaftReplicaIDKey()
}

// RangeLastReplicaGCTimestampKey returns a range-local key for
// the range's last replica GC timestamp.
func RangeLastReplicaGCTimestampKey(rangeID roachpb.RangeID) roachpb.Key {
Expand Down Expand Up @@ -1007,6 +1012,11 @@ func (b RangeIDPrefixBuf) RaftLogKey(logIndex uint64) roachpb.Key {
return encoding.EncodeUint64Ascending(b.RaftLogPrefix(), logIndex)
}

// RaftReplicaIDKey returns a system-local key for a RaftReplicaID.
func (b RangeIDPrefixBuf) RaftReplicaIDKey() roachpb.Key {
return append(b.unreplicatedPrefix(), LocalRaftReplicaIDSuffix...)
}

// RangeLastReplicaGCTimestampKey returns a range-local key for
// the range's last replica GC timestamp.
func (b RangeIDPrefixBuf) RangeLastReplicaGCTimestampKey() roachpb.Key {
Expand Down
11 changes: 9 additions & 2 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,12 @@ type Replica struct {
// It will not change over the lifetime of this replica. If addressed under
// a newer replicaID, the replica immediately replicaGCs itself to make
// way for the newer incarnation.
// TODO(sumeer): since this is initialized in newUnloadedReplica and never
// changed, lift this out of the mu struct.
replicaID roachpb.ReplicaID
// wroteReplicaID transitions once to true, when RaftReplicaID is written
// to the store.
wroteReplicaID bool
// The minimum allowed ID for this replica. Initialized from
// RangeTombstone.NextReplicaID.
tombstoneMinReplicaID roachpb.ReplicaID
Expand Down Expand Up @@ -710,9 +715,11 @@ func (r *Replica) SafeFormat(w redact.SafePrinter, _ rune) {
r.store.Ident.NodeID, r.store.Ident.StoreID, r.rangeStr.get())
}

// ReplicaID returns the ID for the Replica. It may be zero if the replica does
// not know its ID. Once a Replica has a non-zero ReplicaID it will never change.
// ReplicaID returns the ID for the Replica. This value is fixed for the
// lifetime of the Replica.
func (r *Replica) ReplicaID() roachpb.ReplicaID {
// The locking of mu is unnecessary. It will be removed when we lift
// replicaID out of the mu struct.
r.mu.RLock()
defer r.mu.RUnlock()
return r.mu.replicaID
Expand Down
14 changes: 14 additions & 0 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
return unquiesceAndWakeLeader, nil
})
r.mu.applyingEntries = len(rd.CommittedEntries) > 0
alreadyWroteReplicaID := r.mu.wroteReplicaID
r.mu.Unlock()
if errors.Is(err, errRemoved) {
// If we've been removed then just return.
Expand Down Expand Up @@ -796,6 +797,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
return stats, expl, errors.Wrap(err, expl)
}
}
wroteReplicaID := false
if !raft.IsEmptyHardState(rd.HardState) {
if !r.IsInitialized() && rd.HardState.Commit != 0 {
log.Fatalf(ctx, "setting non-zero HardState.Commit on uninitialized replica %s. HS=%+v", r, rd.HardState)
Expand All @@ -812,6 +814,15 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
const expl = "during setHardState"
return stats, expl, errors.Wrap(err, expl)
}
// It is possible that we have set HardState for the first time, in which
// case alreadyWroteReplicaID will be false.
if !alreadyWroteReplicaID {
if err := r.raftMu.stateLoader.SetRaftReplicaID(ctx, batch, r.ReplicaID()); err != nil {
const expl = "during setRaftReplicaID"
return stats, expl, errors.Wrap(err, expl)
}
wroteReplicaID = true
}
}
// Synchronously commit the batch with the Raft log entries and Raft hard
// state as we're promising not to lose this data.
Expand Down Expand Up @@ -870,6 +881,9 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
// previously the leader.
becameLeader = r.mu.leaderID == r.mu.replicaID
}
if wroteReplicaID {
r.mu.wroteReplicaID = wroteReplicaID
}
r.mu.Unlock()

// When becoming the leader, proactively add the replica to the replicate
Expand Down
14 changes: 14 additions & 0 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,10 +859,21 @@ func (r *Replica) applySnapshot(
return errors.Wrapf(err, "error clearing range of unreplicated SST writer")
}

// The HardState and RaftReplicaID should typically already exist for this
// replica, unless this snapshot application is the first time raft.Ready is
// being processed. In that case we must write the RaftReplicaID so that it
// shares the same lifetime as the HardState. Additionally, we've cleared
// all the raft state above, so we are forced to write the RaftReplicaID
// here regardless of what happened before.
//
// Update HardState.
if err := r.raftMu.stateLoader.SetHardState(ctx, &unreplicatedSST, hs); err != nil {
return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer")
}
if err := r.raftMu.stateLoader.SetRaftReplicaID(
ctx, &unreplicatedSST, r.mu.replicaID); err != nil {
return errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer")
}

// Update Raft entries.
r.store.raftEntryCache.Drop(r.RangeID)
Expand Down Expand Up @@ -989,6 +1000,9 @@ func (r *Replica) applySnapshot(
// Snapshots typically have fewer log entries than the leaseholder. The next
// time we hold the lease, recompute the log size before making decisions.
r.mu.raftLogSizeTrusted = false
// RaftReplicaID is definitely written due to the earlier logic in this
// function.
r.mu.wroteReplicaID = true

// Invoke the leasePostApply method to ensure we properly initialize the
// replica according to whether it holds the lease. We allow jumps in the
Expand Down
11 changes: 10 additions & 1 deletion pkg/kv/kvserver/stateloader/initial.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func WriteInitialRangeState(
ctx context.Context,
readWriter storage.ReadWriter,
desc roachpb.RangeDescriptor,
replicaID roachpb.ReplicaID,
replicaVersion roachpb.Version,
) error {
initialLease := roachpb.Lease{}
Expand All @@ -108,7 +109,15 @@ func WriteInitialRangeState(
); err != nil {
return err
}
if err := Make(desc.RangeID).SynthesizeRaftState(ctx, readWriter); err != nil {
sl := Make(desc.RangeID)
if err := sl.SynthesizeRaftState(ctx, readWriter); err != nil {
return err
}
// It is inconvenient that we cannot set Replica.mu.wroteReplicaID=true
// since we don't have a Replica object yet. This is harmless since we will
// just write the RaftReplicaID again later. The invariant that HardState
// and RaftReplicaID both exist in the store is not being violated.
if err := sl.SetRaftReplicaID(ctx, readWriter, replicaID); err != nil {
return err
}
return nil
Expand Down
17 changes: 17 additions & 0 deletions pkg/kv/kvserver/stateloader/stateloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,3 +434,20 @@ func (rsl StateLoader) SynthesizeHardState(
err := rsl.SetHardState(ctx, readWriter, newHS)
return errors.Wrapf(err, "writing HardState %+v", &newHS)
}

// SetRaftReplicaID overwrites the RaftReplicaID.
func (rsl StateLoader) SetRaftReplicaID(
ctx context.Context, writer storage.Writer, replicaID roachpb.ReplicaID,
) error {
rid := roachpb.RaftReplicaID{ReplicaID: replicaID}
// "Blind" because ms == nil and timestamp.IsEmpty().
return storage.MVCCBlindPutProto(
ctx,
writer,
nil, /* ms */
rsl.RaftReplicaIDKey(),
hlc.Timestamp{}, /* timestamp */
&rid,
nil, /* txn */
)
}
19 changes: 19 additions & 0 deletions pkg/kv/kvserver/store_create_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,25 @@ func (s *Store) tryGetOrCreateReplica(
} else if ok && replicaID != 0 && replicaID < tombstone.NextReplicaID {
return nil, false, &roachpb.RaftGroupDeletedError{}
}
// It is possible that we have already created the HardState for an
// uninitialized replica, then crashed, and on recovery are receiving a raft
// message for the same or later replica. In either case we will create a
// Replica with Replica.mu.wroteReplicaID=false, and will eventually write
// the HardState and RaftReplicaID to the correct value. However, in the
// latter case there is some time interval during which the Replica object
// has a newer ReplicaID than the one in the store, and a stored HardState
// that is for the older ReplicaID. This seems harmless, but to be more
// precise about the invariants we should remove the stale
// persistent state here.
//
// TODO(sumeer): once we have RaftReplicaID being populated for all replicas
// and we have purged replicas that don't populate it, we can read the
// (HardState,RaftReplicaID) here and find one of the following cases:
// - HardState exists, RaftReplicaID not exists: must be a purged replica so
// we can delete HardState.
// - HardState exists, RaftReplicaID exists: if the latter is old, remove
// both HardState and RaftReplicaID.
// - Neither exits: nothing to do.

// Create a new replica and lock it for raft processing.
uninitializedDesc := &roachpb.RangeDescriptor{
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/store_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,12 @@ func WriteInitialClusterData(
EndKey: endKey,
NextReplicaID: 2,
}
const firstReplicaID = 1
replicas := []roachpb.ReplicaDescriptor{
{
NodeID: FirstNodeID,
StoreID: FirstStoreID,
ReplicaID: 1,
ReplicaID: firstReplicaID,
},
}
desc.SetReplicas(roachpb.MakeReplicaSet(replicas))
Expand Down Expand Up @@ -244,7 +245,8 @@ func WriteInitialClusterData(
}
}

if err := stateloader.WriteInitialRangeState(ctx, batch, *desc, initialReplicaVersion); err != nil {
if err := stateloader.WriteInitialRangeState(
ctx, batch, *desc, firstReplicaID, initialReplicaVersion); err != nil {
return err
}
computedStats, err := rditer.ComputeStatsForRange(desc, batch, now.WallTime)
Expand Down
19 changes: 17 additions & 2 deletions pkg/kv/kvserver/store_split.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func splitPreApply(
//
// The exception to that is if the DisableEagerReplicaRemoval testing flag is
// enabled.
_, hasRightDesc := split.RightDesc.GetReplicaDescriptor(r.StoreID())
rightDesc, hasRightDesc := split.RightDesc.GetReplicaDescriptor(r.StoreID())
_, hasLeftDesc := split.LeftDesc.GetReplicaDescriptor(r.StoreID())
if !hasRightDesc || !hasLeftDesc {
log.Fatalf(ctx, "cannot process split on s%s which does not exist in the split: %+v",
Expand Down Expand Up @@ -100,9 +100,18 @@ func splitPreApply(
log.Fatalf(ctx, "failed to clear range data for removed rhs: %v", err)
}
if rightRepl != nil {
// Cleared the HardState and RaftReplicaID, so rewrite them to the
// current values.
// TODO(sumeer): we know HardState.Commit cannot advance since the RHS
// cannot apply a snapshot yet. But what prevents a concurrent change to
// HardState.{Term,Vote} that we would accidentally undo here.
if err := rightRepl.raftMu.stateLoader.SetHardState(ctx, readWriter, hs); err != nil {
log.Fatalf(ctx, "failed to set hard state with 0 commit index for removed rhs: %v", err)
}
if err := rightRepl.raftMu.stateLoader.SetRaftReplicaID(
ctx, readWriter, rightRepl.ReplicaID()); err != nil {
log.Fatalf(ctx, "failed to set RaftReplicaID for removed rhs: %v", err)
}
}
return
}
Expand All @@ -114,7 +123,10 @@ func splitPreApply(
if err := rsl.SynthesizeRaftState(ctx, readWriter); err != nil {
log.Fatalf(ctx, "%v", err)
}

// Write the RaftReplicaID for the RHS.
if err := rsl.SetRaftReplicaID(ctx, readWriter, rightDesc.ReplicaID); err != nil {
log.Fatalf(ctx, "%v", err)
}
// Persist the closed timestamp.
//
// In order to tolerate a nil initClosedTS input, let's forward to
Expand Down Expand Up @@ -161,6 +173,9 @@ func splitPostApply(
log.Fatalf(ctx, "%s: found replica which is RHS of a split "+
"without a valid tenant ID", rightReplOrNil)
}
rightReplOrNil.mu.Lock()
rightReplOrNil.mu.wroteReplicaID = true
rightReplOrNil.mu.Unlock()
}

now := r.store.Clock().NowAsClockTimestamp()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2633,7 +2633,7 @@ func TestStoreRemovePlaceholderOnRaftIgnored(t *testing.T) {

uninitDesc := roachpb.RangeDescriptor{RangeID: repl1.Desc().RangeID}
if err := stateloader.WriteInitialRangeState(
ctx, s.Engine(), uninitDesc, roachpb.Version{},
ctx, s.Engine(), uninitDesc, 2, roachpb.Version{},
); err != nil {
t.Fatal(err)
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/roachpb/internal_raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,10 @@ message RaftSnapshotData {
repeated bytes log_entries = 3;
reserved 1;
}

message RaftReplicaID {
option (gogoproto.equal) = true;
// ReplicaID is the ID of the replica with the corresponding HardState.
optional int32 replica_id = 1 [(gogoproto.nullable) = false,
(gogoproto.customname) = "ReplicaID", (gogoproto.casttype) = "ReplicaID"];
}

0 comments on commit bcf42cc

Please sign in to comment.