Skip to content

Commit

Permalink
kvstorage: complete RaftReplicaID migration
Browse files Browse the repository at this point in the history
As of v22.2[^1], we always write the RaftReplicaID when creating a
Replica or updating a snapshot. However, since this is
persisted state that could've originated in older versions and not
updated yet, we couldn't rely on a persisted ReplicaID yet.

This commit adds code to the `(*Store).Start` boot sequence that

- persists a RaftReplicaID for all initialized replicas (using the
  ReplicaID from the descriptor)
- deletes all uninitialized replicas (since we don't know their
  ReplicaID at this point).

The second item in theory violates Raft invariants, as uninitialized
Replicas are allowed to vote (though they then cannot accept log
entries). So in theory:

- an uninitialized replica casts a decisive vote for a leader
- it restarts
- code in this commit removes the uninited replica (and its vote)
- delayed MsgVote from another leader arrives
- it casts another vote for the same term for a dueling leader
- now there are two leaders in the same term.

The above in addition presupposes that the two leaders cannot
communicate with each other. Also, even if that is the case, since the
two leaders cannot append to the uninitialized replica (it doesn't
accept entries), we also need additional voters to return at the exact
right time.

Since an uninitialized replica without RaftReplicaID in is necessarily
at least one release old, this is exceedingly unlikely and we will
live with this theoretical risk.

This commit also introduces a few assertions that make sure that
we don't have overlapping initialized replicas (which would be
detected at Store.Start time otherwise while inserting in the
btree, but it's nice to catch this earlier) or duplicate
RangeIDs.

[^1]: cockroachdb#75761

Epic: CRDB-220
Release note: None
  • Loading branch information
tbg committed Jan 23, 2023
1 parent 88b6a9c commit 18a7ff7
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 27 deletions.
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/kvstorage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ go_library(
"//pkg/kv/kvserver/logstore",
"//pkg/roachpb",
"//pkg/storage",
"//pkg/util/buildutil",
"//pkg/util/hlc",
"//pkg/util/iterutil",
"//pkg/util/log",
"//pkg/util/protoutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@io_etcd_go_raft_v3//raftpb",
],
)

Expand Down
148 changes: 128 additions & 20 deletions pkg/kv/kvserver/kvstorage/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,19 @@ package kvstorage
import (
"bytes"
"context"
"sort"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"go.etcd.io/raft/v3/raftpb"
)

// FirstNodeID is the NodeID assigned to the node bootstrapping a new cluster.
Expand Down Expand Up @@ -249,15 +251,15 @@ func IterateRangeDescriptorsFromDisk(

allCount := 0
matchCount := 0
bySuffix := make(map[string]int)
bySuffix := make(map[redact.RedactableString]int)
kvToDesc := func(kv roachpb.KeyValue) error {
allCount++
// Only consider range metadata entries; ignore others.
startKey, suffix, _, err := keys.DecodeRangeKey(kv.Key)
if err != nil {
return err
}
bySuffix[string(suffix)]++
bySuffix[redact.RedactableString(suffix)]++
if !bytes.Equal(suffix, keys.LocalRangeDescriptorSuffix) {
return nil
}
Expand Down Expand Up @@ -299,11 +301,15 @@ type EngineReplicas struct {
// loadFullReplicaIDsFromDisk discovers all Replicas on this Store.
// There will only be one entry in the map for a given RangeID.
//
// Replicas which were created before the RaftReplicaID was
// introduced will be returned with a zero ReplicaID.
//
// TODO(sep-raft-log): the reader here is for the log engine.
func loadFullReplicaIDsFromDisk(
ctx context.Context, reader storage.Reader,
) (map[storage.FullReplicaID]struct{}, error) {
m := map[storage.FullReplicaID]struct{}{}
mBackfill := map[roachpb.RangeID]raftpb.HardState{}
var msg roachpb.RaftReplicaID
if err := IterateIDPrefixKeys(ctx, reader, func(rangeID roachpb.RangeID) roachpb.Key {
return keys.RaftReplicaIDKey(rangeID)
Expand All @@ -314,6 +320,27 @@ func loadFullReplicaIDsFromDisk(
return nil, err
}

var hs raftpb.HardState
if err := IterateIDPrefixKeys(ctx, reader, func(rangeID roachpb.RangeID) roachpb.Key {
return keys.RaftHardStateKey(rangeID)
}, &hs, func(rangeID roachpb.RangeID) error {
mBackfill[rangeID] = hs
return nil
}); err != nil {
return nil, err
}

// mBackfill := mBackfill - m, i.e. keep only those entries in mBackfill for which
// we didn't have a persisted RaftReplicaID.
for id := range m {
delete(mBackfill, id.RangeID)
}

// Patch the resulting mBackfill back into m, with a zero ReplicaID.
for rangeID := range mBackfill {
m[storage.FullReplicaID{RangeID: rangeID}] = struct{}{}
}

// TODO(sep-raft-log): if there is any other data that we mandate is present here
// (like a HardState), validate that here.

Expand All @@ -336,6 +363,7 @@ func LoadAndReconcileReplicas(ctx context.Context, eng storage.Engine) (*EngineR
// the state of the Replica.
// INVARIANT: the descriptor for range [a,z) is located at RangeDescriptorKey(a).
// This is checked in IterateRangeDescriptorsFromDisk.
var sg roachpb.SpanGroup
if err := IterateRangeDescriptorsFromDisk(
ctx, eng, func(desc roachpb.RangeDescriptor) error {
// INVARIANT: a Replica's RangeDescriptor always contains the local Store,
Expand All @@ -347,6 +375,20 @@ func LoadAndReconcileReplicas(ctx context.Context, eng storage.Engine) (*EngineR
ident.StoreID, desc)
}

sp := desc.RSpan().AsRawSpanWithNoLocals()
if sg.Sub(sp) {
// `sp` overlapped `sg`.
for _, other := range initM {
if sp.Overlaps(other.RSpan().AsRawSpanWithNoLocals()) {
return errors.AssertionFailedf("%s overlaps %s", desc, other)
}
}
// If we hit this it's basically a bug in either SpanGroup or
// the above `for` loop.
return errors.AssertionFailedf("%s overlaps unknown other descriptor", desc)
}
sg.Add(sp)

initM[storage.FullReplicaID{
RangeID: desc.RangeID,
ReplicaID: repDesc.ReplicaID,
Expand All @@ -356,37 +398,103 @@ func LoadAndReconcileReplicas(ctx context.Context, eng storage.Engine) (*EngineR
return nil, err
}

// INVARIANT: all replicas have a persisted full replicaID (i.e. a "replicaID from disk").
//
// This invariant is true for replicas created in 22.2. Without further action, it
// maybe be violated for clusters that originated before 22.2.
//
// loadFullReplicaIDsFromDisk returns a zero ReplicaID when none is found.
//
// In this method, we migrate to always satisfy the invariant:
//
// 1. for initialized replicas, use ReplicaID from RangeDescriptor as RaftReplicaID
// 2. remove uninitialized replicas lacking a RaftReplicaID.
//
// The migration can be removed when the KV host cluster MinSupportedVersion
// matches or exceeds 23.1 (i.e. once we know that a store has definitely
// started up on 23.1 at least once).

allM, err := loadFullReplicaIDsFromDisk(ctx, eng)
if err != nil {
return nil, err
}

for id := range initM {
if _, ok := allM[id]; !ok {
// INVARIANT: all replicas have a persisted full replicaID (i.e. a "replicaID from disk").
//
// This invariant is true for replicas created in 22.2, but no migration
// was ever written. So we backfill the replicaID here (as of 23.1) and
// remove this code in the future (the follow-up release, assuming it is
// forced to migrate through 23.1, otherwise later).
if buildutil.CrdbTestBuild {
return nil, errors.AssertionFailedf("%s has no persisted replicaID", initM[id])
}
if err := logstore.NewStateLoader(id.RangeID).SetRaftReplicaID(ctx, eng, id.ReplicaID); err != nil {
return nil, errors.Wrapf(err, "backfilling replicaID for r%d", id.RangeID)
}
log.Eventf(ctx, "backfilled replicaID for %s", id)
for id := range allM {
if id.ReplicaID != 0 {
// RaftReplicaID was present, no need to backfill.
continue
}
// `allM` will be our map of uninitialized replicas.

}

for id := range initM {
// `allM` will be our map of uninitialized replicas when this loop is done.
//
// A replica is "uninitialized" if it's not in initM (i.e. is at log position
// zero and has no visible RangeDescriptor).

// If a ReplicaRaftID was not persisted yet, allM will have a zero ReplicaID
// in its key, but we must find a match either with the ReplicaID or zero or
// something is awry.
idZero := storage.FullReplicaID{RangeID: id.RangeID, ReplicaID: 0}
_, foundWithID := allM[id]
_, foundWithZero := allM[idZero]
if !foundWithID && !foundWithZero {
return nil, errors.AssertionFailedf("initialized replica %s not present in allM", id)
}
if foundWithID && foundWithZero {
return nil, errors.AssertionFailedf("initialized replica %s duplicated in allM", allM)
}
// Now we know that foundWithID xor foundWithZero holds.

// INVARIANT: all replicas have a persisted full replicaID (i.e. a "replicaID from disk").
if foundWithZero {
// Backfill RaftReplicaID.
if err := logstore.NewStateLoader(id.RangeID).SetRaftReplicaID(ctx, eng, id.ReplicaID); err != nil {
return nil, errors.Wrapf(err, "backfilling replicaID for r%d", id.RangeID)
}
log.Eventf(ctx, "backfilled replicaID for initialized replica %s", id)
// `allM` contains `idZero` in this case, and we want to make sure this range
// isn't accidentally considered an uninitialized replica that needs to be
// removed. `initM` was already keyed with `id`, so nothing to do there.
delete(allM, idZero)
} // else foundWithID==true, so nothing to backfill

// allM will be our uninitM, so delete all initialized replicas from it.
delete(allM, id)
}

uninitM := allM

var removedLegacy []roachpb.RangeID // for deterministic logging
for id := range uninitM {
// We've backfilled RaftReplicaID for initialized replicas above, but there
// can be uninitialized ones, too. We don't have a RaftReplicaID for them so
// instead of making one up we remove this replica. This is theoretically
// unsound - after all, it could have voted - but in practice this won't
// matter.
if id.ReplicaID != 0 {
continue
}
// TODO(tbg): if clearRangeData were in this package we could destroy more
// effectively even if for some reason we had in the past written state
// other than the HardState here.
if err := eng.ClearUnversioned(logstore.NewStateLoader(id.RangeID).RaftHardStateKey()); err != nil {
return nil, errors.Wrapf(err, "removing HardState for r%d", id.RangeID)
}
removedLegacy = append(removedLegacy, id.RangeID)
delete(uninitM, id)
}
sort.Slice(removedLegacy, func(i, j int) bool {
return removedLegacy[i] < removedLegacy[j]
})
for _, rangeID := range removedLegacy {
log.Eventf(ctx, "removed legacy uninitialized replica for r%s", rangeID)
}

return &EngineReplicas{
Initialized: initM,
Uninitialized: allM, // NB: init'ed ones were deleted earlier
Uninitialized: uninitM,
}, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ r1:{c-d} [(n1,s1):20, next=21, gen=0]
# raft state.
load-and-reconcile
----
r1:{a-c} [(n1,s1):10, next=11, gen=0] has no persisted replicaID
initialized replica r1/10 not present in allM
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ ok

load-and-reconcile
----
r1:{a-c} [(n1,s1):10, next=11, gen=0] has no persisted replicaID
initialized replica r1/10 not present in allM
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,4 @@ r2:{b-d} [(n1,s1):20, next=21, gen=0]

load-and-reconcile
----
r1/10: r1:{a-c} [(n1,s1):10, next=11, gen=0]
r2/20: r2:{b-d} [(n1,s1):20, next=21, gen=0]
r2:{b-d} [(n1,s1):20, next=21, gen=0] overlaps r1:{a-c} [(n1,s1):10, next=11, gen=0]
11 changes: 9 additions & 2 deletions pkg/kv/kvserver/kvstorage/testdata/init
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,26 @@ r8:{c-f} [(n1,s1):80, next=81, gen=0]

list-range-ids
----
r5/0
r6/60
r7/0
r8/80

# Loading the replicas returns only the ones that
# had a ReplicaID.
load-and-reconcile trace=true
----
r7:{a-c} [(n1,s1):70, next=71, gen=0] has no persisted replicaID
r6/60: uninitialized
r7/70: r7:{a-c} [(n1,s1):70, next=71, gen=0]
r8/80: r8:{c-f} [(n1,s1):80, next=81, gen=0]
beginning range descriptor iteration
iterated over 2 keys to find 2 range descriptors (by suffix: map[‹rdsc›:2])
iterated over 2 keys to find 2 range descriptors (by suffix: map[rdsc:2])
backfilled replicaID for initialized replica r7/70
removed legacy uninitialized replica for r5

# r5 was removed, and r7 had its RaftReplicaID backfilled.
list-range-ids
----
r6/60
r7/70
r8/80

0 comments on commit 18a7ff7

Please sign in to comment.