Skip to content

Commit

Permalink
keys,kvserver: introduce RaftReplicaID
Browse files Browse the repository at this point in the history
The RaftReplicaIDKey is an unreplicated range-id local key that
contains the ReplicaID of the replica whose HardState is represented
in the RaftHardStateKey. These two keys share the same lifetime,
and are removed atomically when we clear the range-id local keys
for a replica.

We currently do not utilize this information on node restart
to figure out whether we should cleanup stale uninitialized replicas.
Doing such cleanup can wait until we implement and start using
ReplicasStorage. The change here is meant to set us up to rely
on RaftReplicaID from the next release onwards.

Informs #75740

Release note: None
  • Loading branch information
sumeerbhola committed Feb 1, 2022
1 parent a5158c4 commit 09a52de
Show file tree
Hide file tree
Showing 13 changed files with 124 additions and 8 deletions.
14 changes: 11 additions & 3 deletions pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,11 @@ var (
// LocalRangeAppliedStateSuffix is the suffix for the range applied state
// key.
LocalRangeAppliedStateSuffix = []byte("rask")
// LocalRaftTruncatedStateSuffix is the suffix for the
// This was previously used for the replicated RaftTruncatedState. It is no
// longer used and this key has been removed via a migration. See
// LocalRaftTruncatedStateSuffix for the corresponding unreplicated
// RaftTruncatedState.
// Note: This suffix is also used for unreplicated Range-ID keys.
LocalRaftTruncatedStateSuffix = []byte("rftt")
_ = []byte("rftt")
// LocalRangeLeaseSuffix is the suffix for a range lease.
LocalRangeLeaseSuffix = []byte("rll-")
// LocalRangePriorReadSummarySuffix is the suffix for a range's prior read
Expand Down Expand Up @@ -122,6 +123,13 @@ var (
localRaftLastIndexSuffix = []byte("rfti")
// LocalRaftLogSuffix is the suffix for the raft log.
LocalRaftLogSuffix = []byte("rftl")
// LocalRaftReplicaIDSuffix is the suffix for the RaftReplicaID. This is
// written when the HardState for a particular ReplicaID is first written.
LocalRaftReplicaIDSuffix = []byte("rftr")
// LocalRaftTruncatedStateSuffix is the suffix for the unreplicated
// RaftTruncatedState.
LocalRaftTruncatedStateSuffix = []byte("rftt")

// LocalRangeLastReplicaGCTimestampSuffix is the suffix for a range's last
// replica GC timestamp (for GC of old replicas).
LocalRangeLastReplicaGCTimestampSuffix = []byte("rlrt")
Expand Down
1 change: 1 addition & 0 deletions pkg/keys/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ var _ = [...]interface{}{
RangeTombstoneKey, // "rftb"
RaftHardStateKey, // "rfth"
RaftLogKey, // "rftl"
RaftReplicaIDKey, // "rftr"
RaftTruncatedStateKey, // "rftt"
RangeLastReplicaGCTimestampKey, // "rlrt"

Expand Down
10 changes: 10 additions & 0 deletions pkg/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,11 @@ func RaftLogKey(rangeID roachpb.RangeID, logIndex uint64) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RaftLogKey(logIndex)
}

// RaftReplicaIDKey returns a system-local key for a RaftReplicaID.
func RaftReplicaIDKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RaftReplicaIDKey()
}

// RangeLastReplicaGCTimestampKey returns a range-local key for
// the range's last replica GC timestamp.
func RangeLastReplicaGCTimestampKey(rangeID roachpb.RangeID) roachpb.Key {
Expand Down Expand Up @@ -1007,6 +1012,11 @@ func (b RangeIDPrefixBuf) RaftLogKey(logIndex uint64) roachpb.Key {
return encoding.EncodeUint64Ascending(b.RaftLogPrefix(), logIndex)
}

// RaftReplicaIDKey returns a system-local key for a RaftReplicaID.
func (b RangeIDPrefixBuf) RaftReplicaIDKey() roachpb.Key {
return append(b.unreplicatedPrefix(), LocalRaftReplicaIDSuffix...)
}

// RangeLastReplicaGCTimestampKey returns a range-local key for
// the range's last replica GC timestamp.
func (b RangeIDPrefixBuf) RangeLastReplicaGCTimestampKey() roachpb.Key {
Expand Down
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,9 @@ type Replica struct {
// a newer replicaID, the replica immediately replicaGCs itself to make
// way for the newer incarnation.
replicaID roachpb.ReplicaID
// wroteReplicaID transitions once to true, when RaftReplicaID is written
// to the store.
wroteReplicaID bool
// The minimum allowed ID for this replica. Initialized from
// RangeTombstone.NextReplicaID.
tombstoneMinReplicaID roachpb.ReplicaID
Expand Down Expand Up @@ -711,6 +714,12 @@ func (r *Replica) SafeFormat(w redact.SafePrinter, _ rune) {

// ReplicaID returns the ID for the Replica. It may be zero if the replica does
// not know its ID. Once a Replica has a non-zero ReplicaID it will never change.
//
// TODO(sumeer): The preceding sentence is not consistent with the comment at
// Replica.mu.replicaID that says "This value may never be 0". I wonder
// whether this comment is stale since the only time we write to this field is
// in newUnloadedReplica. If so, we should lift replicaID out of the mu
// struct.
func (r *Replica) ReplicaID() roachpb.ReplicaID {
r.mu.RLock()
defer r.mu.RUnlock()
Expand Down
14 changes: 14 additions & 0 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
return unquiesceAndWakeLeader, nil
})
r.mu.applyingEntries = len(rd.CommittedEntries) > 0
alreadyWroteReplicaID := r.mu.wroteReplicaID
r.mu.Unlock()
if errors.Is(err, errRemoved) {
// If we've been removed then just return.
Expand Down Expand Up @@ -775,6 +776,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
return stats, expl, errors.Wrap(err, expl)
}
}
wroteReplicaID := false
if !raft.IsEmptyHardState(rd.HardState) {
if !r.IsInitialized() && rd.HardState.Commit != 0 {
log.Fatalf(ctx, "setting non-zero HardState.Commit on uninitialized replica %s. HS=%+v", r, rd.HardState)
Expand All @@ -791,6 +793,15 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
const expl = "during setHardState"
return stats, expl, errors.Wrap(err, expl)
}
// It is possible that we have set HardState for the first time, in which
// case alreadyWroteReplicaID will be false.
if !alreadyWroteReplicaID {
if err := r.raftMu.stateLoader.SetRaftReplicaID(ctx, batch, r.ReplicaID()); err != nil {
const expl = "during setRaftReplicaID"
return stats, expl, errors.Wrap(err, expl)
}
wroteReplicaID = true
}
}
// Synchronously commit the batch with the Raft log entries and Raft hard
// state as we're promising not to lose this data.
Expand Down Expand Up @@ -849,6 +860,9 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
// previously the leader.
becameLeader = r.mu.leaderID == r.mu.replicaID
}
if wroteReplicaID {
r.mu.wroteReplicaID = wroteReplicaID
}
r.mu.Unlock()

// When becoming the leader, proactively add the replica to the replicate
Expand Down
14 changes: 14 additions & 0 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,17 @@ func (r *Replica) applySnapshot(
return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer")
}

// The HardState and RaftReplicaID should typically already exist for this
// replica, unless this snapshot application is the first time raft.Ready is
// being processed. In that case we must write the RaftReplicaID so that it
// shares the same lifetime as the HardState. Additionally, we've cleared
// all the raft state above, so we are forced to write the RaftReplicaID
// here regardless of what happened before.
if err := r.raftMu.stateLoader.SetRaftReplicaID(
ctx, &unreplicatedSST, r.mu.replicaID); err != nil {
return errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer")
}

// Update Raft entries.
r.store.raftEntryCache.Drop(r.RangeID)

Expand Down Expand Up @@ -989,6 +1000,9 @@ func (r *Replica) applySnapshot(
// Snapshots typically have fewer log entries than the leaseholder. The next
// time we hold the lease, recompute the log size before making decisions.
r.mu.raftLogSizeTrusted = false
// RaftReplicaID is definitely written due to the earlier logic in this
// function.
r.mu.wroteReplicaID = true

// Invoke the leasePostApply method to ensure we properly initialize the
// replica according to whether it holds the lease. We allow jumps in the
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/stateloader/initial.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func WriteInitialRangeState(
ctx context.Context,
readWriter storage.ReadWriter,
desc roachpb.RangeDescriptor,
replicaID roachpb.ReplicaID,
replicaVersion roachpb.Version,
) error {
initialLease := roachpb.Lease{}
Expand All @@ -111,5 +112,12 @@ func WriteInitialRangeState(
if err := Make(desc.RangeID).SynthesizeRaftState(ctx, readWriter); err != nil {
return err
}
// It is inconvenient that we cannot set Replica.mu.wroteReplicaID=true
// since we don't have a Replica object yet. This is harmless since we will
// just write the RaftReplicaID again later. The invariant that HardState
// and RaftReplicaID both exist in the store is not being violated.
if err := Make(desc.RangeID).SetRaftReplicaID(ctx, readWriter, replicaID); err != nil {
return err
}
return nil
}
17 changes: 17 additions & 0 deletions pkg/kv/kvserver/stateloader/stateloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,3 +434,20 @@ func (rsl StateLoader) SynthesizeHardState(
err := rsl.SetHardState(ctx, readWriter, newHS)
return errors.Wrapf(err, "writing HardState %+v", &newHS)
}

// SetRaftReplicaID overwrites the RaftReplicaID.
func (rsl StateLoader) SetRaftReplicaID(
ctx context.Context, writer storage.Writer, replicaID roachpb.ReplicaID,
) error {
rid := roachpb.RaftReplicaID{ReplicaID: replicaID}
// "Blind" because ms == nil and timestamp.IsEmpty().
return storage.MVCCBlindPutProto(
ctx,
writer,
nil, /* ms */
rsl.RaftReplicaIDKey(),
hlc.Timestamp{}, /* timestamp */
&rid,
nil, /* txn */
)
}
11 changes: 11 additions & 0 deletions pkg/kv/kvserver/store_create_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,17 @@ func (s *Store) tryGetOrCreateReplica(
} else if ok && replicaID != 0 && replicaID < tombstone.NextReplicaID {
return nil, false, &roachpb.RaftGroupDeletedError{}
}
// It is possible that we have already created the HardState for an
// uninitialized replica, then crashed, and on recovery are receiving a raft
// message for the same or later replica. In either case we will create a
// Replica with Replica.mu.wroteReplicaID=false, and will eventually write
// the HardState and RaftReplicaID to the correct value. However, in the
// latter case there is some time interval during which the Replica object
// has a newer ReplicaID than the one in the store, and a stored HardState
// that is for the older ReplicaID. This seems harmless, but to be more
// precise about the invariants we should consider removing the stale
// persistent state here.
// TODO(sumeer): figure out where in the code we should do this cleanup.

// Create a new replica and lock it for raft processing.
uninitializedDesc := &roachpb.RangeDescriptor{
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/store_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,12 @@ func WriteInitialClusterData(
EndKey: endKey,
NextReplicaID: 2,
}
const firstReplicaID = 1
replicas := []roachpb.ReplicaDescriptor{
{
NodeID: FirstNodeID,
StoreID: FirstStoreID,
ReplicaID: 1,
ReplicaID: firstReplicaID,
},
}
desc.SetReplicas(roachpb.MakeReplicaSet(replicas))
Expand Down Expand Up @@ -244,7 +245,8 @@ func WriteInitialClusterData(
}
}

if err := stateloader.WriteInitialRangeState(ctx, batch, *desc, initialReplicaVersion); err != nil {
if err := stateloader.WriteInitialRangeState(
ctx, batch, *desc, firstReplicaID, initialReplicaVersion); err != nil {
return err
}
computedStats, err := rditer.ComputeStatsForRange(desc, batch, now.WallTime)
Expand Down
19 changes: 17 additions & 2 deletions pkg/kv/kvserver/store_split.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func splitPreApply(
//
// The exception to that is if the DisableEagerReplicaRemoval testing flag is
// enabled.
_, hasRightDesc := split.RightDesc.GetReplicaDescriptor(r.StoreID())
rightDesc, hasRightDesc := split.RightDesc.GetReplicaDescriptor(r.StoreID())
_, hasLeftDesc := split.LeftDesc.GetReplicaDescriptor(r.StoreID())
if !hasRightDesc || !hasLeftDesc {
log.Fatalf(ctx, "cannot process split on s%s which does not exist in the split: %+v",
Expand Down Expand Up @@ -100,9 +100,18 @@ func splitPreApply(
log.Fatalf(ctx, "failed to clear range data for removed rhs: %v", err)
}
if rightRepl != nil {
// Cleared the HardState and RaftReplicaID, so rewrite them to the
// current values.
// TODO(sumeer): we know HardState.Commit cannot advance since the RHS
// cannot apply a snapshot yet. But what prevents a concurrent change to
// HardState.{Term,Vote} that we would accidentally undo here.
if err := rightRepl.raftMu.stateLoader.SetHardState(ctx, readWriter, hs); err != nil {
log.Fatalf(ctx, "failed to set hard state with 0 commit index for removed rhs: %v", err)
}
if err := rightRepl.raftMu.stateLoader.SetRaftReplicaID(
ctx, readWriter, rightRepl.ReplicaID()); err != nil {
log.Fatalf(ctx, "failed to set RaftReplicaID for removed rhs: %v", err)
}
}
return
}
Expand All @@ -114,7 +123,10 @@ func splitPreApply(
if err := rsl.SynthesizeRaftState(ctx, readWriter); err != nil {
log.Fatalf(ctx, "%v", err)
}

// Write the RaftReplicaID for the RHS.
if err := rsl.SetRaftReplicaID(ctx, readWriter, rightDesc.ReplicaID); err != nil {
log.Fatalf(ctx, "%v", err)
}
// Persist the closed timestamp.
//
// In order to tolerate a nil initClosedTS input, let's forward to
Expand Down Expand Up @@ -161,6 +173,9 @@ func splitPostApply(
log.Fatalf(ctx, "%s: found replica which is RHS of a split "+
"without a valid tenant ID", rightReplOrNil)
}
rightReplOrNil.mu.Lock()
rightReplOrNil.mu.wroteReplicaID = true
rightReplOrNil.mu.Unlock()
}

now := r.store.Clock().NowAsClockTimestamp()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2647,7 +2647,7 @@ func TestStoreRemovePlaceholderOnRaftIgnored(t *testing.T) {

uninitDesc := roachpb.RangeDescriptor{RangeID: repl1.Desc().RangeID}
if err := stateloader.WriteInitialRangeState(
ctx, s.Engine(), uninitDesc, roachpb.Version{},
ctx, s.Engine(), uninitDesc, 2, roachpb.Version{},
); err != nil {
t.Fatal(err)
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/roachpb/internal_raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,10 @@ message RaftSnapshotData {
repeated bytes log_entries = 3;
reserved 1;
}

message RaftReplicaID {
option (gogoproto.equal) = true;
// ReplicaID is the ID of the replica with the corresponding HardState.
optional int32 replica_id = 1 [(gogoproto.nullable) = false,
(gogoproto.customname) = "ReplicaID", (gogoproto.casttype) = "ReplicaID"];
}

0 comments on commit 09a52de

Please sign in to comment.