Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
93317: kvserver: also load uninitialized replicas, verify replicaID r=pavelkalinnikov a=tbg

To support a separate raft log[^1] we need to perform certain
reconciliation operations at start-up to recover from a lack
of atomicity between the state and log engines.

This commit gets us closer to being able to do so by listing
all replicas before starting the store, which means we now
have a handle on which uninitialized replicas exist in the
system.

As a first application of this knowledge, we now ensure that every
replica has a persisted FullReplicaID. Since this would not necessarily
be true for stores that have seen older releases, we backfill the
ReplicaID in 23.1 and can then require it to be present in a future
release that forces a migration through 23.1.

[^1]: #16624

Epic: CRDB-220
Release note: None


93721: roachpb: include tenant name in invalid name error r=andreimatei a=andreimatei

Release note: none
Epic: none

93736: ring: make ring.Buffer generic r=ajwerner a=ajwerner

Epic: none

Release note: None

Co-authored-by: Tobias Grieger <tobias.b.grieger@gmail.com>
Co-authored-by: Andrei Matei <andrei@cockroachlabs.com>
Co-authored-by: Andrew Werner <awerner32@gmail.com>
  • Loading branch information
4 people committed Dec 19, 2022
4 parents 819258b + 48f0a0a + e69a71f + 3f3218d commit 369c405
Show file tree
Hide file tree
Showing 22 changed files with 199 additions and 157 deletions.
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1328,6 +1328,16 @@ func (r *Replica) assertStateRaftMuLockedReplicaMuRLocked(
log.Fatalf(ctx, "replica's replicaID %d diverges from descriptor %+v", r.replicaID, r.mu.state.Desc)
}
}
diskReplID, found, err := r.mu.stateLoader.LoadRaftReplicaID(ctx, reader)
if err != nil {
log.Fatalf(ctx, "%s", err)
}
if !found {
log.Fatalf(ctx, "no replicaID persisted")
}
if diskReplID.ReplicaID != r.replicaID {
log.Fatalf(ctx, "disk replicaID %d does not match in-mem %d", diskReplID, r.replicaID)
}
}

// TODO(nvanbenschoten): move the following 5 methods to replica_send.go.
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/split_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -96,7 +97,10 @@ func TestSplitQueueShouldQueue(t *testing.T) {
cpy := *tc.repl.Desc()
cpy.StartKey = test.start
cpy.EndKey = test.end
repl, err := newReplica(ctx, &cpy, tc.store, cpy.Replicas().VoterDescriptors()[0].ReplicaID)
replicaID := cpy.Replicas().VoterDescriptors()[0].ReplicaID
require.NoError(t,
logstore.NewStateLoader(cpy.RangeID).SetRaftReplicaID(ctx, tc.store.engine, replicaID))
repl, err := newReplica(ctx, &cpy, tc.store, replicaID)
if err != nil {
t.Fatal(err)
}
Expand Down
82 changes: 69 additions & 13 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/multiqueue"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed"
Expand All @@ -64,6 +65,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
Expand Down Expand Up @@ -1904,19 +1906,47 @@ func ReadStoreIdent(ctx context.Context, eng storage.Engine) (roachpb.StoreIdent
return ident, err
}

type engineReplicas struct {
uninitialized map[storage.FullReplicaID]struct{}
initialized map[storage.FullReplicaID]*roachpb.RangeDescriptor
}

// loadFullReplicaIDsFromDisk discovers all Replicas on this Store.
// There will only be one entry in the map for a given RangeID.
//
// TODO(sep-raft-log): the reader here is for the log engine.
func loadFullReplicaIDsFromDisk(
ctx context.Context, reader storage.Reader,
) (map[storage.FullReplicaID]struct{}, error) {
m := map[storage.FullReplicaID]struct{}{}
var msg roachpb.RaftReplicaID
if err := IterateIDPrefixKeys(ctx, reader, func(rangeID roachpb.RangeID) roachpb.Key {
return keys.RaftReplicaIDKey(rangeID)
}, &msg, func(rangeID roachpb.RangeID) error {
m[storage.FullReplicaID{RangeID: rangeID, ReplicaID: msg.ReplicaID}] = struct{}{}
return nil
}); err != nil {
return nil, err
}

// TODO(sep-raft-log): if there is any other data that we mandate is present here
// (like a HardState), validate that here.

return m, nil
}

// loadAndReconcileReplicas loads the Replicas present on this
// store. It reconciles inconsistent state and runs validation checks.
//
// TODO(sep-raft-log): also load *uninitialized* Replicas.
func loadAndReconcileReplicas(
ctx context.Context, eng storage.Engine,
) (map[storage.FullReplicaID]*roachpb.RangeDescriptor, error) {
// TOOD(sep-raft-log): consider a callback-visitor pattern here.
func loadAndReconcileReplicas(ctx context.Context, eng storage.Engine) (*engineReplicas, error) {
ident, err := ReadStoreIdent(ctx, eng)
if err != nil {
return nil, err
}

m := map[storage.FullReplicaID]*roachpb.RangeDescriptor{}
initM := map[storage.FullReplicaID]*roachpb.RangeDescriptor{}
// INVARIANT: the latest visible committed version of the RangeDescriptor
// (which is what IterateRangeDescriptorsFromDisk returns) is the one reflecting
// the state of the Replica.
Expand All @@ -1933,12 +1963,7 @@ func loadAndReconcileReplicas(
ident.StoreID, desc)
}

// INVARIANT: every Replica has a persisted ReplicaID. For initialized
// Replicas, it matches that of the descriptor.
//
// TODO(sep-raft-log): actually load the persisted ReplicaID and validate
// the above invariant.
m[storage.FullReplicaID{
initM[storage.FullReplicaID{
RangeID: desc.RangeID,
ReplicaID: repDesc.ReplicaID,
}] = &desc
Expand All @@ -1947,7 +1972,38 @@ func loadAndReconcileReplicas(
return nil, err
}

return m, nil
allM, err := loadFullReplicaIDsFromDisk(ctx, eng)
if err != nil {
return nil, err
}

for id := range initM {
if _, ok := allM[id]; !ok {
// INVARIANT: all replicas have a persisted full replicaID (i.e. a "replicaID from disk").
//
// This invariant is true for replicas created in 22.2, but no migration
// was ever written. So we backfill the replicaID here (as of 23.1) and
// remove this code in the future (the follow-up release, assuming it is
// forced to migrate through 23.1, otherwise later).
if buildutil.CrdbTestBuild {
return nil, errors.AssertionFailedf("%s has no persisted replicaID", initM[id])
}
if err := logstore.NewStateLoader(id.RangeID).SetRaftReplicaID(ctx, eng, id.ReplicaID); err != nil {
return nil, errors.Wrapf(err, "backfilling replicaID for r%d", id.RangeID)
}
log.Eventf(ctx, "backfilled replicaID for %s", id)
}
// `allM` will be our map of uninitialized replicas.
//
// A replica is "uninitialized" if it's not in initM (i.e. is at log position
// zero and has no visible RangeDescriptor).
delete(allM, id)
}

return &engineReplicas{
initialized: initM,
uninitialized: allM, // NB: init'ed ones were deleted earlier
}, nil
}

// Start the engine, set the GC and read the StoreIdent.
Expand Down Expand Up @@ -2065,11 +2121,11 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
// concurrently. Note that while we can perform this initialization
// concurrently, all of the initialization must be performed before we start
// listening for Raft messages and starting the process Raft loop.
descs, err := loadAndReconcileReplicas(ctx, s.engine)
engRepls, err := loadAndReconcileReplicas(ctx, s.engine)
if err != nil {
return err
}
for fullID, desc := range descs {
for fullID, desc := range engRepls.initialized {
rep, err := newReplica(ctx, desc, s, fullID.ReplicaID)
if err != nil {
return err
Expand Down
7 changes: 6 additions & 1 deletion pkg/kv/kvserver/store_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -216,7 +217,11 @@ func TestStorePoolUpdateLocalStoreBeforeGossip(t *testing.T) {
NextReplicaID: 1,
}
rg.AddReplica(1, 1, roachpb.VOTER_FULL)
replica, err := newReplica(ctx, &rg, store, 1)

const replicaID = 1
require.NoError(t,
logstore.NewStateLoader(rg.RangeID).SetRaftReplicaID(ctx, store.engine, replicaID))
replica, err := newReplica(ctx, &rg, store, replicaID)
if err != nil {
t.Fatalf("make replica error : %+v", err)
}
Expand Down
17 changes: 14 additions & 3 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
Expand Down Expand Up @@ -521,6 +522,7 @@ func TestInitializeEngineErrors(t *testing.T) {
// deprecated; new tests should create replicas by splitting from a
// properly-bootstrapped initial range.
func createReplica(s *Store, rangeID roachpb.RangeID, start, end roachpb.RKey) *Replica {
ctx := context.Background()
desc := &roachpb.RangeDescriptor{
RangeID: rangeID,
StartKey: start,
Expand All @@ -532,9 +534,15 @@ func createReplica(s *Store, rangeID roachpb.RangeID, start, end roachpb.RKey) *
}},
NextReplicaID: 2,
}
r, err := newReplica(context.Background(), desc, s, 1)
const replicaID = 1
if err := stateloader.WriteInitialRangeState(
ctx, s.engine, *desc, replicaID, clusterversion.TestingClusterVersion.Version,
); err != nil {
panic(err)
}
r, err := newReplica(ctx, desc, s, replicaID)
if err != nil {
log.Fatalf(context.Background(), "%v", err)
panic(err)
}
return r
}
Expand Down Expand Up @@ -825,7 +833,10 @@ func TestMaybeMarkReplicaInitialized(t *testing.T) {
RangeID: newRangeID,
}

r, err := newReplica(ctx, desc, store, 1)
const replicaID = 1
require.NoError(t,
logstore.NewStateLoader(desc.RangeID).SetRaftReplicaID(ctx, store.engine, replicaID))
r, err := newReplica(ctx, desc, store, replicaID)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func newStores(ambientCtx log.AmbientContext, clock *hlc.Clock) *Stores {
Expand Down Expand Up @@ -153,6 +155,8 @@ func TestStoresGetReplicaForRangeID(t *testing.T) {
},
}

require.NoError(t,
logstore.NewStateLoader(desc.RangeID).SetRaftReplicaID(ctx, store.engine, replicaID))
replica, err := newReplica(ctx, desc, store, replicaID)
if err != nil {
t.Fatalf("unexpected error when creating replica: %+v", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/roachpb/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ var tenantNameRe = regexp.MustCompile(`^[a-z0-9]([a-z0-9---]{0,98}[a-z0-9])?$`)

func (n TenantName) IsValid() error {
if !tenantNameRe.MatchString(string(n)) {
return errors.WithHint(errors.Newf("invalid tenant name"),
return errors.WithHint(errors.Newf("invalid tenant name: %q", n),
"Tenant names must start and end with a lowercase letter or digit, contain only lowercase letters, digits or hyphens, with a maximum of 100 characters.")
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/conn_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type StmtBuf struct {
cond *sync.Cond

// data contains the elements of the buffer.
data ring.Buffer // []Command
data ring.Buffer[Command]

// startPos indicates the index of the first command currently in data
// relative to the start of the connection.
Expand Down Expand Up @@ -459,7 +459,7 @@ func (buf *StmtBuf) CurCmd() (Command, CmdPos, error) {
}
len := buf.mu.data.Len()
if cmdIdx < len {
return buf.mu.data.Get(cmdIdx).(Command), curPos, nil
return buf.mu.data.Get(cmdIdx), curPos, nil
}
if cmdIdx != len {
return nil, 0, errors.AssertionFailedf(
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ type cancelFlowsCoordinator struct {
mu struct {
syncutil.Mutex
// deadFlowsByNode is a ring of pointers to deadFlowsOnNode objects.
deadFlowsByNode ring.Buffer
deadFlowsByNode ring.Buffer[*deadFlowsOnNode]
}
// workerWait should be used by canceling workers to block until there are
// some dead flows to cancel.
Expand All @@ -301,7 +301,7 @@ func (c *cancelFlowsCoordinator) getFlowsToCancel() (
if c.mu.deadFlowsByNode.Len() == 0 {
return nil, base.SQLInstanceID(0)
}
deadFlows := c.mu.deadFlowsByNode.GetFirst().(*deadFlowsOnNode)
deadFlows := c.mu.deadFlowsByNode.GetFirst()
c.mu.deadFlowsByNode.RemoveFirst()
req := &execinfrapb.CancelDeadFlowsRequest{
FlowIDs: deadFlows.ids,
Expand All @@ -322,7 +322,7 @@ func (c *cancelFlowsCoordinator) addFlowsToCancel(
// sufficiently fast.
found := false
for j := 0; j < c.mu.deadFlowsByNode.Len(); j++ {
deadFlows := c.mu.deadFlowsByNode.Get(j).(*deadFlowsOnNode)
deadFlows := c.mu.deadFlowsByNode.Get(j)
if sqlInstanceID == deadFlows.sqlInstanceID {
deadFlows.ids = append(deadFlows.ids, f.FlowID)
found = true
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql_running_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ func TestCancelFlowsCoordinator(t *testing.T) {
require.GreaterOrEqual(t, numNodes-1, c.mu.deadFlowsByNode.Len())
seen := make(map[base.SQLInstanceID]struct{})
for i := 0; i < c.mu.deadFlowsByNode.Len(); i++ {
deadFlows := c.mu.deadFlowsByNode.Get(i).(*deadFlowsOnNode)
deadFlows := c.mu.deadFlowsByNode.Get(i)
require.NotEqual(t, gatewaySQLInstanceID, deadFlows.sqlInstanceID)
_, ok := seen[deadFlows.sqlInstanceID]
require.False(t, ok)
Expand Down
Loading

0 comments on commit 369c405

Please sign in to comment.