Skip to content

Commit

Permalink
kvstorage: complete RaftReplicaID migration
Browse files Browse the repository at this point in the history
As of v22.1[^1], we always write the RaftReplicaID when creating a
Replica or updating a snapshot. However, since this is
persisted state that could've originated in older versions and not
updated yet, we couldn't rely on a persisted ReplicaID yet.

This commit adds code to the `(*Store).Start` boot sequence that

- persists a RaftReplicaID for all initialized replicas (using the
  ReplicaID from the descriptor)
- deletes all uninitialized replicas that don't have a RaftReplicaID
  (since we don't know their ReplicaID at this point).

The second item in theory violates Raft invariants, as uninitialized
Replicas are allowed to vote (though they then cannot accept log
entries). So in theory:

- an uninitialized replica casts a decisive vote for a leader
- it restarts
- code in this commit removes the uninited replica (and its vote)
- delayed MsgVote from another leader arrives
- it casts another vote for the same term for a dueling leader
- now there are two leaders in the same term.

The above in addition presupposes that the two leaders cannot
communicate with each other. Also, even if that is the case, since the
two leaders cannot append to the uninitialized replica (it doesn't
accept entries), we also need additional voters to return at the exact
right time.

Since an uninitialized replica without RaftReplicaID in is necessarily
at least one release old, this is exceedingly unlikely and we will
live with this theoretical risk.

This commit also introduces a few assertions that make sure that
we don't have overlapping initialized replicas (which would be
detected at Store.Start time otherwise while inserting in the
btree, but it's nice to catch this earlier) or duplicate
RangeIDs.

[^1]: cockroachdb#75761

Epic: CRDB-220
Release note: None
  • Loading branch information
tbg committed Feb 6, 2023
1 parent 4f619e4 commit 672e8b1
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 148 deletions.
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/kvstorage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ go_library(
"//pkg/kv/kvserver/logstore",
"//pkg/roachpb",
"//pkg/storage",
"//pkg/util/buildutil",
"//pkg/util/hlc",
"//pkg/util/iterutil",
"//pkg/util/log",
"//pkg/util/protoutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@io_etcd_go_raft_v3//raftpb",
],
)

Expand Down
32 changes: 4 additions & 28 deletions pkg/kv/kvserver/kvstorage/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"context"
"fmt"
"regexp"
"sort"
"strings"
"testing"

Expand Down Expand Up @@ -160,38 +159,15 @@ func TestDataDriven(t *testing.T) {
); desc != nil {
fmt.Fprintln(&buf, desc)
}
case "list-range-ids":
m, err := loadFullReplicaIDsFromDisk(ctx, e.eng)
require.NoError(t, err)
var sl []storage.FullReplicaID
for id := range m {
sl = append(sl, id)
}
sort.Slice(sl, func(i, j int) bool {
return sl[i].RangeID < sl[j].RangeID
})
for _, id := range sl {
fmt.Fprintf(&buf, "r%d/%d\n", id.RangeID, id.ReplicaID)
}
case "load-and-reconcile":
rs, err := LoadAndReconcileReplicas(ctx, e.eng)
replicas, err := LoadAndReconcileReplicas(ctx, e.eng)
if err != nil {
fmt.Fprintln(&buf, err)
break
}
var merged []storage.FullReplicaID
for id := range rs.Initialized {
merged = append(merged, id)
}
for id := range rs.Uninitialized {
merged = append(merged, id)
}
sort.Slice(merged, func(i, j int) bool {
return merged[i].RangeID < merged[j].RangeID
})
for _, id := range merged {
fmt.Fprintf(&buf, "r%d/%d: ", id.RangeID, id.ReplicaID)
if desc := rs.Initialized[id]; desc != nil {
for _, repl := range replicas {
fmt.Fprintf(&buf, "%s: ", repl.ID())
if desc := repl.Desc; desc != nil {
fmt.Fprint(&buf, desc)
} else {
fmt.Fprintf(&buf, "uninitialized")
Expand Down
243 changes: 172 additions & 71 deletions pkg/kv/kvserver/kvstorage/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,19 @@ package kvstorage
import (
"bytes"
"context"
"sort"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"go.etcd.io/raft/v3/raftpb"
)

// FirstNodeID is the NodeID assigned to the node bootstrapping a new cluster.
Expand Down Expand Up @@ -249,15 +251,15 @@ func IterateRangeDescriptorsFromDisk(

allCount := 0
matchCount := 0
bySuffix := make(map[string]int)
bySuffix := make(map[redact.RedactableString]int)
kvToDesc := func(kv roachpb.KeyValue) error {
allCount++
// Only consider range metadata entries; ignore others.
startKey, suffix, _, err := keys.DecodeRangeKey(kv.Key)
if err != nil {
return err
}
bySuffix[string(suffix)]++
bySuffix[redact.RedactableString(suffix)]++
if !bytes.Equal(suffix, keys.LocalRangeDescriptorSuffix) {
return nil
}
Expand Down Expand Up @@ -291,103 +293,202 @@ func IterateRangeDescriptorsFromDisk(
return err
}

type EngineReplicas struct {
Uninitialized map[storage.FullReplicaID]struct{}
Initialized map[storage.FullReplicaID]*roachpb.RangeDescriptor
// A Replica references a CockroachDB Replica. The data in this struct does not
// represent the data of the Replica but is sufficient to access all of its
// contents via additional calls to the storage engine.
type Replica struct {
RangeID roachpb.RangeID
ReplicaID roachpb.ReplicaID
Desc *roachpb.RangeDescriptor // nil for uninitialized Replica

hardState raftpb.HardState // internal to kvstorage, see migration in LoadAndReconcileReplicas
}

// loadFullReplicaIDsFromDisk discovers all Replicas on this Store.
// There will only be one entry in the map for a given RangeID.
//
// TODO(sep-raft-log): the reader here is for the log engine.
func loadFullReplicaIDsFromDisk(
ctx context.Context, reader storage.Reader,
) (map[storage.FullReplicaID]struct{}, error) {
m := map[storage.FullReplicaID]struct{}{}
var msg roachpb.RaftReplicaID
if err := IterateIDPrefixKeys(ctx, reader, func(rangeID roachpb.RangeID) roachpb.Key {
return keys.RaftReplicaIDKey(rangeID)
}, &msg, func(rangeID roachpb.RangeID) error {
m[storage.FullReplicaID{RangeID: rangeID, ReplicaID: msg.ReplicaID}] = struct{}{}
return nil
}); err != nil {
return nil, err
// ID returns the FullReplicaID.
func (r Replica) ID() storage.FullReplicaID {
return storage.FullReplicaID{
RangeID: r.RangeID,
ReplicaID: r.ReplicaID,
}
}

// TODO(sep-raft-log): if there is any other data that we mandate is present here
// (like a HardState), validate that here.
// A replicaMap organizes a set of Replicas with unique RangeIDs.
type replicaMap map[roachpb.RangeID]Replica

return m, nil
func (m replicaMap) getOrMake(rangeID roachpb.RangeID) Replica {
ent := m[rangeID]
ent.RangeID = rangeID
return ent
}

// LoadAndReconcileReplicas loads the Replicas present on this
// store. It reconciles inconsistent state and runs validation checks.
//
// TOOD(sep-raft-log): consider a callback-visitor pattern here.
func LoadAndReconcileReplicas(ctx context.Context, eng storage.Engine) (*EngineReplicas, error) {
ident, err := ReadStoreIdent(ctx, eng)
if err != nil {
return nil, err
func (m replicaMap) setReplicaID(rangeID roachpb.RangeID, replicaID roachpb.ReplicaID) {
ent := m.getOrMake(rangeID)
ent.ReplicaID = replicaID
m[rangeID] = ent
}

func (m replicaMap) setHardState(rangeID roachpb.RangeID, hs raftpb.HardState) {
ent := m.getOrMake(rangeID)
ent.hardState = hs
m[rangeID] = ent
}

func (m replicaMap) setDesc(rangeID roachpb.RangeID, desc roachpb.RangeDescriptor) error {
ent := m.getOrMake(rangeID)
if ent.Desc != nil {
return errors.AssertionFailedf("overlapping descriptors %v and %v", ent.Desc, &desc)
}
ent.Desc = &desc
m[rangeID] = ent
return nil
}

func loadReplicas(ctx context.Context, eng storage.Engine) ([]Replica, error) {
s := replicaMap{}

initM := map[storage.FullReplicaID]*roachpb.RangeDescriptor{}
// INVARIANT: the latest visible committed version of the RangeDescriptor
// (which is what IterateRangeDescriptorsFromDisk returns) is the one reflecting
// the state of the Replica.
// INVARIANT: the descriptor for range [a,z) is located at RangeDescriptorKey(a).
// This is checked in IterateRangeDescriptorsFromDisk.
if err := IterateRangeDescriptorsFromDisk(
ctx, eng, func(desc roachpb.RangeDescriptor) error {
// INVARIANT: a Replica's RangeDescriptor always contains the local Store,
// i.e. a Store is a member of all of its local Replicas.
repDesc, found := desc.GetReplicaDescriptor(ident.StoreID)
if !found {
return errors.AssertionFailedf(
"RangeDescriptor does not contain local s%d: %s",
ident.StoreID, desc)
}
{
var lastDesc roachpb.RangeDescriptor
if err := IterateRangeDescriptorsFromDisk(
ctx, eng, func(desc roachpb.RangeDescriptor) error {
if lastDesc.RangeID != 0 && desc.StartKey.Less(lastDesc.EndKey) {
return errors.AssertionFailedf("overlapping descriptors %s and %s", lastDesc, desc)
}
lastDesc = desc
return s.setDesc(desc.RangeID, desc)
},
); err != nil {
return nil, err
}
}

// INVARIANT: all replicas have a persisted full ReplicaID (i.e. a "ReplicaID from disk").
//
// This invariant is true for replicas created in 22.1. Without further action, it
// would be violated for clusters that originated before 22.1. In this method, we
// backfill the ReplicaID (for initialized replicas) and we remove uninitialized
// replicas lacking a ReplicaID (see below for rationale).
//
// The migration can be removed when the KV host cluster MinSupportedVersion
// matches or exceeds 23.1 (i.e. once we know that a store has definitely
// started up on >=23.1 at least once).

// Collect all the RangeIDs that either have a RaftReplicaID or HardState. For
// unmigrated replicas we see only the HardState - that is how we detect
// replicas that still need to be migrated.
//
// TODO(tbg): tighten up the case where we see a RaftReplicaID but no HardState.
// This leads to the general desire to validate the internal consistency of the
// entire raft state (i.e. HardState, TruncatedState, Log).
{
var msg roachpb.RaftReplicaID
if err := IterateIDPrefixKeys(ctx, eng, func(rangeID roachpb.RangeID) roachpb.Key {
return keys.RaftReplicaIDKey(rangeID)
}, &msg, func(rangeID roachpb.RangeID) error {
s.setReplicaID(rangeID, msg.ReplicaID)
return nil
}); err != nil {
return nil, err
}

initM[storage.FullReplicaID{
RangeID: desc.RangeID,
ReplicaID: repDesc.ReplicaID,
}] = &desc
var hs raftpb.HardState
if err := IterateIDPrefixKeys(ctx, eng, func(rangeID roachpb.RangeID) roachpb.Key {
return keys.RaftHardStateKey(rangeID)
}, &hs, func(rangeID roachpb.RangeID) error {
s.setHardState(rangeID, hs)
return nil
}); err != nil {
return nil, err
}
}
sl := make([]Replica, 0, len(s))
for _, repl := range s {
sl = append(sl, repl)
}
sort.Slice(sl, func(i, j int) bool {
return sl[i].RangeID < sl[j].RangeID
})
return sl, nil
}

// LoadAndReconcileReplicas loads the Replicas present on this
// store. It reconciles inconsistent state and runs validation checks.
// The returned slice is sorted by ReplicaID.
//
// TODO(sep-raft-log): consider a callback-visitor pattern here.
func LoadAndReconcileReplicas(ctx context.Context, eng storage.Engine) ([]Replica, error) {
ident, err := ReadStoreIdent(ctx, eng)
if err != nil {
return nil, err
}

allM, err := loadFullReplicaIDsFromDisk(ctx, eng)
sl, err := loadReplicas(ctx, eng)
if err != nil {
return nil, err
}

for id := range initM {
if _, ok := allM[id]; !ok {
// INVARIANT: all replicas have a persisted full replicaID (i.e. a "replicaID from disk").
//
// This invariant is true for replicas created in 22.2, but no migration
// was ever written. So we backfill the replicaID here (as of 23.1) and
// remove this code in the future (the follow-up release, assuming it is
// forced to migrate through 23.1, otherwise later).
if buildutil.CrdbTestBuild {
return nil, errors.AssertionFailedf("%s has no persisted replicaID", initM[id])
// Check invariants.
//
// Migrate into RaftReplicaID for all replicas that need it.
var newIdx int
for _, repl := range sl {
var descReplicaID roachpb.ReplicaID
if repl.Desc != nil {
// INVARIANT: a Replica's RangeDescriptor always contains the local Store,
// i.e. a Store is a member of all of its local initialized Replicas.
replDesc, found := repl.Desc.GetReplicaDescriptor(ident.StoreID)
if !found {
return nil, errors.AssertionFailedf("s%d not found in %s", ident.StoreID, repl.Desc)
}
if err := logstore.NewStateLoader(id.RangeID).SetRaftReplicaID(ctx, eng, id.ReplicaID); err != nil {
return nil, errors.Wrapf(err, "backfilling replicaID for r%d", id.RangeID)
if repl.ReplicaID != 0 && replDesc.ReplicaID != repl.ReplicaID {
return nil, errors.AssertionFailedf("conflicting RaftReplicaID %d for %s", repl.ReplicaID, repl.Desc)
}
descReplicaID = replDesc.ReplicaID
}

if repl.ReplicaID != 0 {
sl[newIdx] = repl
newIdx++
// RaftReplicaID present, no need to migrate.
continue
}

// Migrate into RaftReplicaID. This migration can be removed once the
// BinaryMinSupportedVersion is >= 23.1, and we can assert that
// repl.ReplicaID != 0 always holds.

if descReplicaID != 0 {
// Backfill RaftReplicaID for an initialized Replica.
if err := logstore.NewStateLoader(repl.RangeID).SetRaftReplicaID(ctx, eng, descReplicaID); err != nil {
return nil, errors.Wrapf(err, "backfilling ReplicaID for r%d", repl.RangeID)
}
repl.ReplicaID = descReplicaID
sl[newIdx] = repl
newIdx++
log.Eventf(ctx, "backfilled replicaID for initialized replica %s", repl.ID())
} else {
// We found an uninitialized replica that did not have a persisted
// ReplicaID. We can't determine the ReplicaID now, so we migrate by
// removing this uninitialized replica. This technically violates raft
// invariants if this replica has cast a vote, but the conditions under
// which this matters are extremely unlikely.
//
// TODO(tbg): if clearRangeData were in this package we could destroy more
// effectively even if for some reason we had in the past written state
// other than the HardState here (not supposed to happen, but still).
if err := eng.ClearUnversioned(logstore.NewStateLoader(repl.RangeID).RaftHardStateKey()); err != nil {
return nil, errors.Wrapf(err, "removing HardState for r%d", repl.RangeID)
}
log.Eventf(ctx, "backfilled replicaID for %s", id)
log.Eventf(ctx, "removed legacy uninitialized replica for r%s", repl.RangeID)
// NB: removed from `sl` since we're not incrementing `newIdx`.
}
// `allM` will be our map of uninitialized replicas.
//
// A replica is "uninitialized" if it's not in initM (i.e. is at log position
// zero and has no visible RangeDescriptor).
delete(allM, id)
}

return &EngineReplicas{
Initialized: initM,
Uninitialized: allM, // NB: init'ed ones were deleted earlier
}, nil
return sl[:newIdx], nil
}

// A NotBootstrappedError indicates that an engine has not yet been
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ r1:{c-d} [(n1,s1):20, next=21, gen=0]
# raft state.
load-and-reconcile
----
r1:{a-c} [(n1,s1):10, next=11, gen=0] has no persisted replicaID
overlapping descriptors r1:{a-c} [(n1,s1):10, next=11, gen=0] and r1:{c-d} [(n1,s1):20, next=21, gen=0]
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ ok

load-and-reconcile
----
r1:{a-c} [(n1,s1):10, next=11, gen=0] has no persisted replicaID
conflicting RaftReplicaID 20 for r1:{a-c} [(n1,s1):10, next=11, gen=0]
Loading

0 comments on commit 672e8b1

Please sign in to comment.