From 0d6e8b694ca1ff1fc196299f7ea72e75c54be662 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Mon, 31 Jan 2022 17:29:39 -0500 Subject: [PATCH] keys,kvserver: introduce RaftReplicaID The RaftReplicaIDKey is an unreplicated range-id local key that contains the ReplicaID of the replica whose HardState is represented in the RaftHardStateKey. These two keys are removed atomically when we clear the range-id local keys for a replica. See store_create_replica.go for a detailed comment on correctness and version compatibility. We currently do not utilize this information on node restart to figure out whether we should cleanup stale uninitialized replicas. Doing such cleanup can wait until we implement and start using ReplicasStorage. The change here is meant to set us up to rely on RaftReplicaID from the next release onwards. Informs #75740 Release note: None --- pkg/keys/constants.go | 14 ++++-- pkg/keys/doc.go | 1 + pkg/keys/keys.go | 10 ++++ pkg/kv/kvserver/below_raft_protos_test.go | 7 +++ pkg/kv/kvserver/client_store_test.go | 38 +++++++++++++++ pkg/kv/kvserver/replica.go | 8 +++- pkg/kv/kvserver/replica_raftstorage.go | 6 +++ pkg/kv/kvserver/stateloader/initial.go | 9 +++- pkg/kv/kvserver/stateloader/stateloader.go | 26 +++++++++++ pkg/kv/kvserver/store_create_replica.go | 54 ++++++++++++++++++++++ pkg/kv/kvserver/store_init.go | 6 ++- pkg/kv/kvserver/store_split.go | 22 ++++++++- pkg/kv/kvserver/store_test.go | 28 ++++++++++- pkg/roachpb/internal_raft.proto | 9 ++++ 14 files changed, 227 insertions(+), 11 deletions(-) diff --git a/pkg/keys/constants.go b/pkg/keys/constants.go index 7d1adb9ec861..b95b513d50f0 100644 --- a/pkg/keys/constants.go +++ b/pkg/keys/constants.go @@ -87,10 +87,11 @@ var ( // LocalRangeAppliedStateSuffix is the suffix for the range applied state // key. LocalRangeAppliedStateSuffix = []byte("rask") - // LocalRaftTruncatedStateSuffix is the suffix for the + // This was previously used for the replicated RaftTruncatedState. It is no + // longer used and this key has been removed via a migration. See + // LocalRaftTruncatedStateSuffix for the corresponding unreplicated // RaftTruncatedState. - // Note: This suffix is also used for unreplicated Range-ID keys. - LocalRaftTruncatedStateSuffix = []byte("rftt") + _ = []byte("rftt") // LocalRangeLeaseSuffix is the suffix for a range lease. LocalRangeLeaseSuffix = []byte("rll-") // LocalRangePriorReadSummarySuffix is the suffix for a range's prior read @@ -122,6 +123,13 @@ var ( localRaftLastIndexSuffix = []byte("rfti") // LocalRaftLogSuffix is the suffix for the raft log. LocalRaftLogSuffix = []byte("rftl") + // LocalRaftReplicaIDSuffix is the suffix for the RaftReplicaID. This is + // written when a replica is created. + LocalRaftReplicaIDSuffix = []byte("rftr") + // LocalRaftTruncatedStateSuffix is the suffix for the unreplicated + // RaftTruncatedState. + LocalRaftTruncatedStateSuffix = []byte("rftt") + // LocalRangeLastReplicaGCTimestampSuffix is the suffix for a range's last // replica GC timestamp (for GC of old replicas). LocalRangeLastReplicaGCTimestampSuffix = []byte("rlrt") diff --git a/pkg/keys/doc.go b/pkg/keys/doc.go index 7a4248130ed7..47f5fc5dfc58 100644 --- a/pkg/keys/doc.go +++ b/pkg/keys/doc.go @@ -199,6 +199,7 @@ var _ = [...]interface{}{ RangeTombstoneKey, // "rftb" RaftHardStateKey, // "rfth" RaftLogKey, // "rftl" + RaftReplicaIDKey, // "rftr" RaftTruncatedStateKey, // "rftt" RangeLastReplicaGCTimestampKey, // "rlrt" diff --git a/pkg/keys/keys.go b/pkg/keys/keys.go index de6eae1dcbb3..8e768cc8eafa 100644 --- a/pkg/keys/keys.go +++ b/pkg/keys/keys.go @@ -331,6 +331,11 @@ func RaftLogKey(rangeID roachpb.RangeID, logIndex uint64) roachpb.Key { return MakeRangeIDPrefixBuf(rangeID).RaftLogKey(logIndex) } +// RaftReplicaIDKey returns a system-local key for a RaftReplicaID. +func RaftReplicaIDKey(rangeID roachpb.RangeID) roachpb.Key { + return MakeRangeIDPrefixBuf(rangeID).RaftReplicaIDKey() +} + // RangeLastReplicaGCTimestampKey returns a range-local key for // the range's last replica GC timestamp. func RangeLastReplicaGCTimestampKey(rangeID roachpb.RangeID) roachpb.Key { @@ -1007,6 +1012,11 @@ func (b RangeIDPrefixBuf) RaftLogKey(logIndex uint64) roachpb.Key { return encoding.EncodeUint64Ascending(b.RaftLogPrefix(), logIndex) } +// RaftReplicaIDKey returns a system-local key for a RaftReplicaID. +func (b RangeIDPrefixBuf) RaftReplicaIDKey() roachpb.Key { + return append(b.unreplicatedPrefix(), LocalRaftReplicaIDSuffix...) +} + // RangeLastReplicaGCTimestampKey returns a range-local key for // the range's last replica GC timestamp. func (b RangeIDPrefixBuf) RangeLastReplicaGCTimestampKey() roachpb.Key { diff --git a/pkg/kv/kvserver/below_raft_protos_test.go b/pkg/kv/kvserver/below_raft_protos_test.go index ab2f072d9c26..f0c7884c8382 100644 --- a/pkg/kv/kvserver/below_raft_protos_test.go +++ b/pkg/kv/kvserver/below_raft_protos_test.go @@ -133,6 +133,13 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{ emptySum: 14695981039346656037, populatedSum: 1187861800212570275, }, + reflect.TypeOf(&roachpb.RaftReplicaID{}): { + populatedConstructor: func(r *rand.Rand) protoutil.Message { + return roachpb.NewPopulatedRaftReplicaID(r, false) + }, + emptySum: 598336668751268149, + populatedSum: 9313101058286450988, + }, } func TestBelowRaftProtos(t *testing.T) { diff --git a/pkg/kv/kvserver/client_store_test.go b/pkg/kv/kvserver/client_store_test.go index f47006c66943..6e997152004d 100644 --- a/pkg/kv/kvserver/client_store_test.go +++ b/pkg/kv/kvserver/client_store_test.go @@ -15,6 +15,8 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -23,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" ) // TestStoreSetRangesMaxBytes creates a set of ranges via splitting and then @@ -70,3 +73,38 @@ func TestStoreSetRangesMaxBytes(t *testing.T) { return nil }) } + +// TestStoreRaftReplicaID tests that initialized replicas have a +// RaftReplicaID. +func TestStoreRaftReplicaID(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + srv := tc.Server(0) + store, err := srv.GetStores().(*kvserver.Stores).GetStore(srv.GetFirstStoreID()) + require.NoError(t, err) + + scratchKey := tc.ScratchRange(t) + desc, err := tc.LookupRange(scratchKey) + require.NoError(t, err) + repl, err := store.GetReplica(desc.RangeID) + require.NoError(t, err) + replicaID, found, err := stateloader.Make(desc.RangeID).LoadRaftReplicaID(ctx, store.Engine()) + require.True(t, found) + require.NoError(t, err) + require.Equal(t, repl.ReplicaID(), replicaID.ReplicaID) + + // RHS of a split also has ReplicaID. + splitKey := append(scratchKey, '0', '0') + _, rhsDesc, err := tc.SplitRange(splitKey) + require.NoError(t, err) + rhsRepl, err := store.GetReplica(rhsDesc.RangeID) + require.NoError(t, err) + rhsReplicaID, found, err := + stateloader.Make(rhsDesc.RangeID).LoadRaftReplicaID(ctx, store.Engine()) + require.True(t, found) + require.NoError(t, err) + require.Equal(t, rhsRepl.ReplicaID(), rhsReplicaID.ReplicaID) +} diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index e71f71ec83e1..4cade492df87 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -520,6 +520,8 @@ type Replica struct { // It will not change over the lifetime of this replica. If addressed under // a newer replicaID, the replica immediately replicaGCs itself to make // way for the newer incarnation. + // TODO(sumeer): since this is initialized in newUnloadedReplica and never + // changed, lift this out of the mu struct. replicaID roachpb.ReplicaID // The minimum allowed ID for this replica. Initialized from // RangeTombstone.NextReplicaID. @@ -710,9 +712,11 @@ func (r *Replica) SafeFormat(w redact.SafePrinter, _ rune) { r.store.Ident.NodeID, r.store.Ident.StoreID, r.rangeStr.get()) } -// ReplicaID returns the ID for the Replica. It may be zero if the replica does -// not know its ID. Once a Replica has a non-zero ReplicaID it will never change. +// ReplicaID returns the ID for the Replica. This value is fixed for the +// lifetime of the Replica. func (r *Replica) ReplicaID() roachpb.ReplicaID { + // The locking of mu is unnecessary. It will be removed when we lift + // replicaID out of the mu struct. r.mu.RLock() defer r.mu.RUnlock() return r.mu.replicaID diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index ea5711c7bd14..a864cb2f1d70 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -863,6 +863,12 @@ func (r *Replica) applySnapshot( if err := r.raftMu.stateLoader.SetHardState(ctx, &unreplicatedSST, hs); err != nil { return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer") } + // We've cleared all the raft state above, so we are forced to write the + // RaftReplicaID again here. + if err := r.raftMu.stateLoader.SetRaftReplicaID( + ctx, &unreplicatedSST, r.mu.replicaID); err != nil { + return errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer") + } // Update Raft entries. r.store.raftEntryCache.Drop(r.RangeID) diff --git a/pkg/kv/kvserver/stateloader/initial.go b/pkg/kv/kvserver/stateloader/initial.go index f1688147a310..ebf49c47f52b 100644 --- a/pkg/kv/kvserver/stateloader/initial.go +++ b/pkg/kv/kvserver/stateloader/initial.go @@ -96,6 +96,7 @@ func WriteInitialRangeState( ctx context.Context, readWriter storage.ReadWriter, desc roachpb.RangeDescriptor, + replicaID roachpb.ReplicaID, replicaVersion roachpb.Version, ) error { initialLease := roachpb.Lease{} @@ -108,7 +109,13 @@ func WriteInitialRangeState( ); err != nil { return err } - if err := Make(desc.RangeID).SynthesizeRaftState(ctx, readWriter); err != nil { + sl := Make(desc.RangeID) + if err := sl.SynthesizeRaftState(ctx, readWriter); err != nil { + return err + } + // Maintain the invariant that any replica (uninitialized or initialized), + // with persistent state, has a RaftReplicaID. + if err := sl.SetRaftReplicaID(ctx, readWriter, replicaID); err != nil { return err } return nil diff --git a/pkg/kv/kvserver/stateloader/stateloader.go b/pkg/kv/kvserver/stateloader/stateloader.go index 2ab3273f5320..d6bef28ca3eb 100644 --- a/pkg/kv/kvserver/stateloader/stateloader.go +++ b/pkg/kv/kvserver/stateloader/stateloader.go @@ -434,3 +434,29 @@ func (rsl StateLoader) SynthesizeHardState( err := rsl.SetHardState(ctx, readWriter, newHS) return errors.Wrapf(err, "writing HardState %+v", &newHS) } + +// SetRaftReplicaID overwrites the RaftReplicaID. +func (rsl StateLoader) SetRaftReplicaID( + ctx context.Context, writer storage.Writer, replicaID roachpb.ReplicaID, +) error { + rid := roachpb.RaftReplicaID{ReplicaID: replicaID} + // "Blind" because ms == nil and timestamp.IsEmpty(). + return storage.MVCCBlindPutProto( + ctx, + writer, + nil, /* ms */ + rsl.RaftReplicaIDKey(), + hlc.Timestamp{}, /* timestamp */ + &rid, + nil, /* txn */ + ) +} + +// LoadRaftReplicaID loads the RaftReplicaID. +func (rsl StateLoader) LoadRaftReplicaID( + ctx context.Context, reader storage.Reader, +) (replicaID roachpb.RaftReplicaID, found bool, err error) { + found, err = storage.MVCCGetProto(ctx, reader, rsl.RaftReplicaIDKey(), + hlc.Timestamp{}, &replicaID, storage.MVCCGetOptions{}) + return +} diff --git a/pkg/kv/kvserver/store_create_replica.go b/pkg/kv/kvserver/store_create_replica.go index 81f05d939837..6418941e88cd 100644 --- a/pkg/kv/kvserver/store_create_replica.go +++ b/pkg/kv/kvserver/store_create_replica.go @@ -220,6 +220,60 @@ func (s *Store) tryGetOrCreateReplica( } else if hs.Commit != 0 { log.Fatalf(ctx, "found non-zero HardState.Commit on uninitialized replica %s. HS=%+v", repl, hs) } + + // Write the RaftReplicaID for this replica. This is the only place in the + // CockroachDB code that we are creating a new *uninitialized* replica. + // Note that it is possible that we have already created the HardState for + // an uninitialized replica, then crashed, and on recovery are receiving a + // raft message for the same or later replica. + // - Same replica: we are overwriting the RaftReplicaID with the same + // value, which is harmless. + // - Later replica: there may be an existing HardState for the older + // uninitialized replica with Commit=0 and non-zero Term and Vote. Using + // the Term and Vote values for that older replica in the context of + // this newer replica is harmless since it just limits the votes for + // this replica. + // + // + // Compatibility: + // - v21.2 and v22.1: v22.1 unilaterally introduces RaftReplicaID (an + // unreplicated range-id local key). If a v22.1 binary is rolled back at + // a node, the fact that RaftReplicaID was written is harmless to a + // v21.2 node since it does not read it. When a v21.2 drops an + // initialized range, the RaftReplicaID will also be deleted because the + // whole range-ID local key space is deleted. + // + // - v22.2: we will start relying on the presence of RaftReplicaID, and + // remove any unitialized replicas that have a HardState but no + // RaftReplicaID. This removal will happen in ReplicasStorage.Init and + // allow us to tighten invariants. Additionally, knowing the ReplicaID + // for an unitialized range could allow a node to somehow contact the + // raft group (say by broadcasting to all nodes in the cluster), and if + // the ReplicaID is stale, would allow the node to remove the HardState + // and RaftReplicaID. See + // https://github.com/cockroachdb/cockroach/issues/75740. + // + // There is a concern that there could be some replica that survived + // from v21.2 to v22.1 to v22.2 in unitialized state and will be + // incorrectly removed in ReplicasStorage.Init causing the loss of the + // HardState.{Term,Vote} and lead to a "split-brain" wrt leader + // election. + // + // Even though this seems theoretically possible, it is considered + // practically impossible, and not just because a replica's vote is + // unlikely to stay relevant across 2 upgrades. For one, we're always + // going through learners and don't promote until caught up, so + // uninitialized replicas generally never get to vote. Second, even if + // their vote somehow mattered (perhaps we sent a learner a snap which + // was not durably persisted - which we also know is impossible, but + // let's assume it - and then promoted the node and it immediately + // power-cycled, losing the snapshot) the fire-and-forget way in which + // raft votes are requested (in the same raft cycle) makes it extremely + // unlikely that the restarted node would then receive it. + if err := repl.mu.stateLoader.SetRaftReplicaID(ctx, s.Engine(), replicaID); err != nil { + return err + } + return repl.loadRaftMuLockedReplicaMuLocked(uninitializedDesc) }(); err != nil { // Mark the replica as destroyed and remove it from the replicas maps to diff --git a/pkg/kv/kvserver/store_init.go b/pkg/kv/kvserver/store_init.go index 33aac9a89072..321c8fc21ac1 100644 --- a/pkg/kv/kvserver/store_init.go +++ b/pkg/kv/kvserver/store_init.go @@ -173,11 +173,12 @@ func WriteInitialClusterData( EndKey: endKey, NextReplicaID: 2, } + const firstReplicaID = 1 replicas := []roachpb.ReplicaDescriptor{ { NodeID: FirstNodeID, StoreID: FirstStoreID, - ReplicaID: 1, + ReplicaID: firstReplicaID, }, } desc.SetReplicas(roachpb.MakeReplicaSet(replicas)) @@ -244,7 +245,8 @@ func WriteInitialClusterData( } } - if err := stateloader.WriteInitialRangeState(ctx, batch, *desc, initialReplicaVersion); err != nil { + if err := stateloader.WriteInitialRangeState( + ctx, batch, *desc, firstReplicaID, initialReplicaVersion); err != nil { return err } computedStats, err := rditer.ComputeStatsForRange(desc, batch, now.WallTime) diff --git a/pkg/kv/kvserver/store_split.go b/pkg/kv/kvserver/store_split.go index 2294ca8ecd90..45327e102047 100644 --- a/pkg/kv/kvserver/store_split.go +++ b/pkg/kv/kvserver/store_split.go @@ -42,7 +42,7 @@ func splitPreApply( // // The exception to that is if the DisableEagerReplicaRemoval testing flag is // enabled. - _, hasRightDesc := split.RightDesc.GetReplicaDescriptor(r.StoreID()) + rightDesc, hasRightDesc := split.RightDesc.GetReplicaDescriptor(r.StoreID()) _, hasLeftDesc := split.LeftDesc.GetReplicaDescriptor(r.StoreID()) if !hasRightDesc || !hasLeftDesc { log.Fatalf(ctx, "cannot process split on s%s which does not exist in the split: %+v", @@ -100,9 +100,20 @@ func splitPreApply( log.Fatalf(ctx, "failed to clear range data for removed rhs: %v", err) } if rightRepl != nil { + // Cleared the HardState and RaftReplicaID, so rewrite them to the + // current values. + // TODO(sumeer): we know HardState.Commit cannot advance since the RHS + // cannot apply a snapshot yet. But there could be a concurrent change + // to HardState.{Term,Vote} that we would accidentally undo here, + // because we are not actually holding the appropriate mutex. See + // https://github.com/cockroachdb/cockroach/issues/75918. if err := rightRepl.raftMu.stateLoader.SetHardState(ctx, readWriter, hs); err != nil { log.Fatalf(ctx, "failed to set hard state with 0 commit index for removed rhs: %v", err) } + if err := rightRepl.raftMu.stateLoader.SetRaftReplicaID( + ctx, readWriter, rightRepl.ReplicaID()); err != nil { + log.Fatalf(ctx, "failed to set RaftReplicaID for removed rhs: %v", err) + } } return } @@ -114,7 +125,14 @@ func splitPreApply( if err := rsl.SynthesizeRaftState(ctx, readWriter); err != nil { log.Fatalf(ctx, "%v", err) } - + // Write the RaftReplicaID for the RHS to maintain the invariant that any + // replica (uninitialized or initialized), with persistent state, has a + // RaftReplicaID. NB: this invariant will not be universally true until we + // introduce node startup code that will write this value for existing + // ranges. + if err := rsl.SetRaftReplicaID(ctx, readWriter, rightDesc.ReplicaID); err != nil { + log.Fatalf(ctx, "%v", err) + } // Persist the closed timestamp. // // In order to tolerate a nil initClosedTS input, let's forward to diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 9f7d033cff05..03a4b1721772 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -2633,7 +2633,7 @@ func TestStoreRemovePlaceholderOnRaftIgnored(t *testing.T) { uninitDesc := roachpb.RangeDescriptor{RangeID: repl1.Desc().RangeID} if err := stateloader.WriteInitialRangeState( - ctx, s.Engine(), uninitDesc, roachpb.Version{}, + ctx, s.Engine(), uninitDesc, 2, roachpb.Version{}, ); err != nil { t.Fatal(err) } @@ -3046,6 +3046,32 @@ func TestManuallyEnqueueUninitializedReplica(t *testing.T) { require.Contains(t, err.Error(), "not enqueueing uninitialized replica") } +// TestStoreGetOrCreateReplicaWritesRaftReplicaID tests that an uninitialized +// replica has a RaftReplicaID. +func TestStoreGetOrCreateReplicaWritesRaftReplicaID(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + tc := testContext{} + tc.Start(ctx, t, stopper) + + repl, created, err := tc.store.getOrCreateReplica( + ctx, 42, 7, &roachpb.ReplicaDescriptor{ + NodeID: tc.store.NodeID(), + StoreID: tc.store.StoreID(), + ReplicaID: 7, + }) + require.NoError(t, err) + require.True(t, created) + replicaID, found, err := repl.mu.stateLoader.LoadRaftReplicaID(ctx, tc.store.Engine()) + require.NoError(t, err) + require.True(t, found) + require.Equal(t, roachpb.RaftReplicaID{ReplicaID: 7}, replicaID) +} + func BenchmarkStoreGetReplica(b *testing.B) { ctx := context.Background() stopper := stop.NewStopper() diff --git a/pkg/roachpb/internal_raft.proto b/pkg/roachpb/internal_raft.proto index 08d7a2322efc..5837411d43ff 100644 --- a/pkg/roachpb/internal_raft.proto +++ b/pkg/roachpb/internal_raft.proto @@ -49,3 +49,12 @@ message RaftSnapshotData { repeated bytes log_entries = 3; reserved 1; } + +message RaftReplicaID { + option (gogoproto.equal) = true; + option (gogoproto.populate) = true; + + // ReplicaID is the ID of the replica with the corresponding HardState. + optional int32 replica_id = 1 [(gogoproto.nullable) = false, + (gogoproto.customname) = "ReplicaID", (gogoproto.casttype) = "ReplicaID"]; +}