Skip to content

Commit

Permalink
kvstorage: complete RaftReplicaID migration
Browse files Browse the repository at this point in the history
As of v22.1[^1], we always write the RaftReplicaID when creating a
Replica or updating a snapshot. However, since this is
persisted state that could've originated in older versions and not
updated yet, we couldn't rely on a persisted ReplicaID yet.

This commit adds code to the `(*Store).Start` boot sequence that

- persists a RaftReplicaID for all initialized replicas (using the
  ReplicaID from the descriptor)
- deletes all uninitialized replicas (since we don't know their
  ReplicaID at this point).

The second item in theory violates Raft invariants, as uninitialized
Replicas are allowed to vote (though they then cannot accept log
entries). So in theory:

- an uninitialized replica casts a decisive vote for a leader
- it restarts
- code in this commit removes the uninited replica (and its vote)
- delayed MsgVote from another leader arrives
- it casts another vote for the same term for a dueling leader
- now there are two leaders in the same term.

The above in addition presupposes that the two leaders cannot
communicate with each other. Also, even if that is the case, since the
two leaders cannot append to the uninitialized replica (it doesn't
accept entries), we also need additional voters to return at the exact
right time.

Since an uninitialized replica without RaftReplicaID in is necessarily
at least one release old, this is exceedingly unlikely and we will
live with this theoretical risk.

This commit also introduces a few assertions that make sure that
we don't have overlapping initialized replicas (which would be
detected at Store.Start time otherwise while inserting in the
btree, but it's nice to catch this earlier) or duplicate
RangeIDs.

[^1]: cockroachdb#75761

Epic: CRDB-220
Release note: None
  • Loading branch information
tbg committed Feb 2, 2023
1 parent b63fb6d commit 365f853
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 124 deletions.
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/kvstorage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ go_library(
"//pkg/kv/kvserver/logstore",
"//pkg/roachpb",
"//pkg/storage",
"//pkg/util/buildutil",
"//pkg/util/hlc",
"//pkg/util/iterutil",
"//pkg/util/log",
"//pkg/util/protoutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@io_etcd_go_raft_v3//raftpb",
],
)

Expand Down
23 changes: 5 additions & 18 deletions pkg/kv/kvserver/kvstorage/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,38 +160,25 @@ func TestDataDriven(t *testing.T) {
); desc != nil {
fmt.Fprintln(&buf, desc)
}
case "list-range-ids":
m, err := loadFullReplicaIDsFromDisk(ctx, e.eng)
require.NoError(t, err)
var sl []storage.FullReplicaID
for id := range m {
sl = append(sl, id)
}
sort.Slice(sl, func(i, j int) bool {
return sl[i].RangeID < sl[j].RangeID
})
for _, id := range sl {
fmt.Fprintf(&buf, "r%d/%d\n", id.RangeID, id.ReplicaID)
}
case "load-and-reconcile":
rs, err := LoadAndReconcileReplicas(ctx, e.eng)
if err != nil {
fmt.Fprintln(&buf, err)
break
}
var merged []storage.FullReplicaID
for id := range rs.Initialized {
merged = append(merged, id)
for _, repl := range rs.Initialized {
merged = append(merged, repl.ID())
}
for id := range rs.Uninitialized {
merged = append(merged, id)
for _, repl := range rs.Uninitialized {
merged = append(merged, repl.ID())
}
sort.Slice(merged, func(i, j int) bool {
return merged[i].RangeID < merged[j].RangeID
})
for _, id := range merged {
fmt.Fprintf(&buf, "r%d/%d: ", id.RangeID, id.ReplicaID)
if desc := rs.Initialized[id]; desc != nil {
if desc := rs.Initialized[id.RangeID].Desc; desc != nil {
fmt.Fprint(&buf, desc)
} else {
fmt.Fprintf(&buf, "uninitialized")
Expand Down
225 changes: 165 additions & 60 deletions pkg/kv/kvserver/kvstorage/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,19 @@ package kvstorage
import (
"bytes"
"context"
"sort"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"go.etcd.io/raft/v3/raftpb"
)

// FirstNodeID is the NodeID assigned to the node bootstrapping a new cluster.
Expand Down Expand Up @@ -249,15 +251,15 @@ func IterateRangeDescriptorsFromDisk(

allCount := 0
matchCount := 0
bySuffix := make(map[string]int)
bySuffix := make(map[redact.RedactableString]int)
kvToDesc := func(kv roachpb.KeyValue) error {
allCount++
// Only consider range metadata entries; ignore others.
startKey, suffix, _, err := keys.DecodeRangeKey(kv.Key)
if err != nil {
return err
}
bySuffix[string(suffix)]++
bySuffix[redact.RedactableString(suffix)]++
if !bytes.Equal(suffix, keys.LocalRangeDescriptorSuffix) {
return nil
}
Expand Down Expand Up @@ -291,46 +293,93 @@ func IterateRangeDescriptorsFromDisk(
return err
}

type EngineReplicas struct {
Uninitialized map[storage.FullReplicaID]struct{}
Initialized map[storage.FullReplicaID]*roachpb.RangeDescriptor
// LoadedReplicas represents the Replicas present on a storage engine.
type LoadedReplicas struct {
Uninitialized ReplicaMap
Initialized ReplicaMap
}

// loadFullReplicaIDsFromDisk discovers all Replicas on this Store.
// There will only be one entry in the map for a given RangeID.
//
// TODO(sep-raft-log): the reader here is for the log engine.
func loadFullReplicaIDsFromDisk(
ctx context.Context, reader storage.Reader,
) (map[storage.FullReplicaID]struct{}, error) {
m := map[storage.FullReplicaID]struct{}{}
var msg roachpb.RaftReplicaID
if err := IterateIDPrefixKeys(ctx, reader, func(rangeID roachpb.RangeID) roachpb.Key {
return keys.RaftReplicaIDKey(rangeID)
}, &msg, func(rangeID roachpb.RangeID) error {
m[storage.FullReplicaID{RangeID: rangeID, ReplicaID: msg.ReplicaID}] = struct{}{}
return nil
}); err != nil {
return nil, err
// A Replica references a CockroachDB Replica. The data in this struct does not
// represent the data of the Replica but is sufficient to access all of its
// contents via additional calls to the storage engine.
type Replica struct {
RangeID roachpb.RangeID
ReplicaID roachpb.ReplicaID
Desc *roachpb.RangeDescriptor // nil for uninitialized Replica

descReplicaID roachpb.ReplicaID // internal to kvstorage
hardState raftpb.HardState // internal to kvstorage
}

// ID returns the FullReplicaID.
func (r Replica) ID() storage.FullReplicaID {
return storage.FullReplicaID{
RangeID: r.RangeID,
ReplicaID: r.ReplicaID,
}
}

// A ReplicaMap organizes a set of Replicas with unique RangeIDs.
type ReplicaMap map[roachpb.RangeID]Replica

// TODO(sep-raft-log): if there is any other data that we mandate is present here
// (like a HardState), validate that here.
// SortedSlice returns the contents of the map as a RangeID-sorted slice.
func (m ReplicaMap) SortedSlice() []Replica {
var sl []Replica
for _, repl := range m {
sl = append(sl, repl)
}
sort.Slice(sl, func(i, j int) bool {
return sl[i].RangeID < sl[j].RangeID
})
return sl
}

func (m ReplicaMap) getOrMake(rangeID roachpb.RangeID) Replica {
ent := m[rangeID]
ent.RangeID = rangeID
return ent
}

return m, nil
func (m ReplicaMap) setReplicaID(rangeID roachpb.RangeID, replicaID roachpb.ReplicaID) {
ent := m.getOrMake(rangeID)
ent.ReplicaID = replicaID
m[rangeID] = ent
}

// LoadAndReconcileReplicas loads the Replicas present on this
func (m ReplicaMap) setHardState(rangeID roachpb.RangeID, hs raftpb.HardState) {
ent := m.getOrMake(rangeID)
ent.hardState = hs
m[rangeID] = ent
}

func (m ReplicaMap) setDesc(rangeID roachpb.RangeID, desc roachpb.RangeDescriptor) error {
ent := m.getOrMake(rangeID)
if ent.Desc != nil {
return errors.AssertionFailedf("overlapping descriptors %v and %v", ent.Desc, &desc)
}
ent.Desc = &desc
m[rangeID] = ent
return nil
}

func (m ReplicaMap) setDescReplicaID(rangeID roachpb.RangeID, descReplicaID roachpb.ReplicaID) {
ent := m.getOrMake(rangeID)
ent.descReplicaID = descReplicaID
m[rangeID] = ent
}

// LoadAndReconcileReplicas loads the LoadedReplicas present on this
// store. It reconciles inconsistent state and runs validation checks.
//
// TOOD(sep-raft-log): consider a callback-visitor pattern here.
func LoadAndReconcileReplicas(ctx context.Context, eng storage.Engine) (*EngineReplicas, error) {
// TODO(sep-raft-log): consider a callback-visitor pattern here.
func LoadAndReconcileReplicas(ctx context.Context, eng storage.Engine) (LoadedReplicas, error) {
ident, err := ReadStoreIdent(ctx, eng)
if err != nil {
return nil, err
return LoadedReplicas{}, err
}

initM := map[storage.FullReplicaID]*roachpb.RangeDescriptor{}
s := ReplicaMap{}

// INVARIANT: the latest visible committed version of the RangeDescriptor
// (which is what IterateRangeDescriptorsFromDisk returns) is the one reflecting
// the state of the Replica.
Expand All @@ -347,47 +396,103 @@ func LoadAndReconcileReplicas(ctx context.Context, eng storage.Engine) (*EngineR
ident.StoreID, desc)
}

initM[storage.FullReplicaID{
RangeID: desc.RangeID,
ReplicaID: repDesc.ReplicaID,
}] = &desc
if err := s.setDesc(desc.RangeID, desc); err != nil {
return err
}
s.setDescReplicaID(desc.RangeID, repDesc.ReplicaID)
return nil
}); err != nil {
return nil, err
return LoadedReplicas{}, err
}

allM, err := loadFullReplicaIDsFromDisk(ctx, eng)
if err != nil {
return nil, err
// INVARIANT: all replicas have a persisted full ReplicaID (i.e. a "ReplicaID from disk").
//
// This invariant is true for replicas created in 22.1. Without further action, it
// would be violated for clusters that originated before 22.1. In this method, we
// backfill the ReplicaID (for initialized replicas) and we remove uninitialized
// replicas (see below for rationale).
//
// The migration can be removed when the KV host cluster MinSupportedVersion
// matches or exceeds 23.1 (i.e. once we know that a store has definitely
// started up on >=23.1 at least once).

// Collect all the RangeIDs that either have a RaftReplicaID or HardState. For
// unmigrated replicas we see only the HardState - that is how we detect
// replicas that still need to be migrated.
//
// TODO(tbg): tighten up the case where we see a RaftReplicaID but no HardState.
// This leads to the general desire to validate the internal consistency of the
// entire raft state (i.e. HardState, TruncatedState, Log).
{
var msg roachpb.RaftReplicaID
if err := IterateIDPrefixKeys(ctx, eng, func(rangeID roachpb.RangeID) roachpb.Key {
return keys.RaftReplicaIDKey(rangeID)
}, &msg, func(rangeID roachpb.RangeID) error {
s.setReplicaID(rangeID, msg.ReplicaID)
return nil
}); err != nil {
return LoadedReplicas{}, err
}

var hs raftpb.HardState
if err := IterateIDPrefixKeys(ctx, eng, func(rangeID roachpb.RangeID) roachpb.Key {
return keys.RaftHardStateKey(rangeID)
}, &hs, func(rangeID roachpb.RangeID) error {
s.setHardState(rangeID, hs)
return nil
}); err != nil {
return LoadedReplicas{}, err
}
}

for id := range initM {
if _, ok := allM[id]; !ok {
// INVARIANT: all replicas have a persisted full replicaID (i.e. a "replicaID from disk").
//
// This invariant is true for replicas created in 22.2, but no migration
// was ever written. So we backfill the replicaID here (as of 23.1) and
// remove this code in the future (the follow-up release, assuming it is
// forced to migrate through 23.1, otherwise later).
if buildutil.CrdbTestBuild {
return nil, errors.AssertionFailedf("%s has no persisted replicaID", initM[id])
// Migrate for all replicas that need it. Sorted order for deterministic unit
// tests.
for _, repl := range s.SortedSlice() {
if repl.ReplicaID != 0 {
// If we have both a RaftReplicaID and a descriptor, the ReplicaIDs
// need to match.
if repl.descReplicaID != 0 && repl.descReplicaID != repl.ReplicaID {
return LoadedReplicas{}, errors.AssertionFailedf("conflicting RaftReplicaID %d for %s", repl.ReplicaID, repl.Desc)
}
// We have a RaftReplicaID, no need to migrate.
continue
}
if repl.descReplicaID != 0 {
// Backfill RaftReplicaID for an initialized Replica.
if err := logstore.NewStateLoader(repl.RangeID).SetRaftReplicaID(ctx, eng, repl.descReplicaID); err != nil {
return LoadedReplicas{}, errors.Wrapf(err, "backfilling ReplicaID for r%d", repl.RangeID)
}
if err := logstore.NewStateLoader(id.RangeID).SetRaftReplicaID(ctx, eng, id.ReplicaID); err != nil {
return nil, errors.Wrapf(err, "backfilling replicaID for r%d", id.RangeID)
s.setReplicaID(repl.RangeID, repl.descReplicaID)
log.Eventf(ctx, "backfilled replicaID for initialized replica %s", s.getOrMake(repl.RangeID).ID())
} else {
// We found an uninitialized replica that did not have a persisted
// ReplicaID. We can't determine the ReplicaID now, so we migrate by
// removing this uninitialized replica. This technically violates raft
// invariants if this replica has cast a vote, but the conditions under
// which this matters are extremely unlikely.
//
// TODO(tbg): if clearRangeData were in this package we could destroy more
// effectively even if for some reason we had in the past written state
// other than the HardState here (not supposed to happen, but still).
if err := eng.ClearUnversioned(logstore.NewStateLoader(repl.RangeID).RaftHardStateKey()); err != nil {
return LoadedReplicas{}, errors.Wrapf(err, "removing HardState for r%d", repl.RangeID)
}
log.Eventf(ctx, "backfilled replicaID for %s", id)
delete(s, repl.RangeID)
log.Eventf(ctx, "removed legacy uninitialized replica for r%s", repl.RangeID)
}
}

init := ReplicaMap{}
uninit := ReplicaMap{}
for rangeID, state := range s {
if state.Desc != nil {
init[rangeID] = state
} else {
uninit[rangeID] = state
}
// `allM` will be our map of uninitialized replicas.
//
// A replica is "uninitialized" if it's not in initM (i.e. is at log position
// zero and has no visible RangeDescriptor).
delete(allM, id)
}

return &EngineReplicas{
Initialized: initM,
Uninitialized: allM, // NB: init'ed ones were deleted earlier
}, nil
return LoadedReplicas{Initialized: init, Uninitialized: uninit}, nil
}

// A NotBootstrappedError indicates that an engine has not yet been
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ r1:{c-d} [(n1,s1):20, next=21, gen=0]
# raft state.
load-and-reconcile
----
r1:{a-c} [(n1,s1):10, next=11, gen=0] has no persisted replicaID
overlapping descriptors r1:{a-c} [(n1,s1):10, next=11, gen=0] and r1:{c-d} [(n1,s1):20, next=21, gen=0]
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ ok

load-and-reconcile
----
r1:{a-c} [(n1,s1):10, next=11, gen=0] has no persisted replicaID
conflicting RaftReplicaID 20 for r1:{a-c} [(n1,s1):10, next=11, gen=0]
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/kvstorage/testdata/assert_overlapping_replica
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# TODO(tbg): actually assert against the overlap and make the output of this
# test a failed assertion.
new-replica range-id=1 replica-id=10 k=a ek=c
----
r1:{a-c} [(n1,s1):10, next=11, gen=0]
Expand Down
Loading

0 comments on commit 365f853

Please sign in to comment.