Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: rename Replica.mu.replicaID to Replica.replicaID #76248

Merged
merged 1 commit into from
Feb 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 9 additions & 15 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ type Replica struct {
log.AmbientContext

RangeID roachpb.RangeID // Only set by the constructor
// The ID of the replica within the Raft group. Only set by the constructor,
// so it will not change over the lifetime of this replica. If addressed
// under a newer replicaID, the replica immediately replicaGCs itself to
// make way for the newer incarnation.
replicaID roachpb.ReplicaID

// The start key of a Range remains constant throughout its lifetime (it does
// not change through splits or merges). This field carries a copy of
Expand Down Expand Up @@ -516,13 +521,6 @@ type Replica struct {
applyingEntries bool
// The replica's Raft group "node".
internalRaftGroup *raft.RawNode
// The ID of the replica within the Raft group. This value may never be 0.
// It will not change over the lifetime of this replica. If addressed under
// a newer replicaID, the replica immediately replicaGCs itself to make
// way for the newer incarnation.
// TODO(sumeer): since this is initialized in newUnloadedReplica and never
// changed, lift this out of the mu struct.
replicaID roachpb.ReplicaID
// The minimum allowed ID for this replica. Initialized from
// RangeTombstone.NextReplicaID.
tombstoneMinReplicaID roachpb.ReplicaID
Expand Down Expand Up @@ -715,11 +713,7 @@ func (r *Replica) SafeFormat(w redact.SafePrinter, _ rune) {
// ReplicaID returns the ID for the Replica. This value is fixed for the
// lifetime of the Replica.
func (r *Replica) ReplicaID() roachpb.ReplicaID {
// The locking of mu is unnecessary. It will be removed when we lift
// replicaID out of the mu struct.
r.mu.RLock()
defer r.mu.RUnlock()
return r.mu.replicaID
return r.replicaID
}

// cleanupFailedProposal cleans up after a proposal that has failed. It
Expand Down Expand Up @@ -1320,8 +1314,8 @@ func (r *Replica) assertStateRaftMuLockedReplicaMuRLocked(
if !ok {
log.Fatalf(ctx, "%+v does not contain local store s%d", r.mu.state.Desc, r.store.StoreID())
}
if replDesc.ReplicaID != r.mu.replicaID {
log.Fatalf(ctx, "replica's replicaID %d diverges from descriptor %+v", r.mu.replicaID, r.mu.state.Desc)
if replDesc.ReplicaID != r.replicaID {
log.Fatalf(ctx, "replica's replicaID %d diverges from descriptor %+v", r.replicaID, r.mu.state.Desc)
}
}
}
Expand Down Expand Up @@ -1677,7 +1671,7 @@ func (r *Replica) isNewerThanSplitRLocked(split *roachpb.SplitTrigger) bool {
// ID which is above the replica ID of the split then we would not have
// written a tombstone but we will have a replica ID that will exceed the
// split replica ID.
r.mu.replicaID > rightDesc.ReplicaID
r.replicaID > rightDesc.ReplicaID
}

// WatchForMerge is like maybeWatchForMergeLocked, except it expects a merge to
Expand Down
16 changes: 8 additions & 8 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func newUnloadedReplica(
r := &Replica{
AmbientContext: store.cfg.AmbientCtx,
RangeID: desc.RangeID,
replicaID: replicaID,
creationTime: timeutil.Now(),
store: store,
abortSpan: abortspan.New(desc.RangeID),
Expand All @@ -93,7 +94,6 @@ func newUnloadedReplica(
r.mu.stateLoader = stateloader.Make(desc.RangeID)
r.mu.quiescent = true
r.mu.conf = store.cfg.DefaultSpanConfig
r.mu.replicaID = replicaID
split.Init(&r.loadBasedSplitter, rand.Intn, func() float64 {
return float64(SplitByLoadQPSThreshold.Get(&store.cfg.Settings.SV))
}, func() time.Duration {
Expand Down Expand Up @@ -179,7 +179,7 @@ func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor)
ctx := r.AnnotateCtx(context.TODO())
if r.mu.state.Desc != nil && r.isInitializedRLocked() {
log.Fatalf(ctx, "r%d: cannot reinitialize an initialized replica", desc.RangeID)
} else if r.mu.replicaID == 0 {
} else if r.replicaID == 0 {
// NB: This is just a defensive check as r.mu.replicaID should never be 0.
log.Fatalf(ctx, "r%d: cannot initialize replica without a replicaID", desc.RangeID)
}
Expand All @@ -204,15 +204,15 @@ func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor)

// Ensure that we're not trying to load a replica with a different ID than
// was used to construct this Replica.
replicaID := r.mu.replicaID
replicaID := r.replicaID
if replicaDesc, found := r.mu.state.Desc.GetReplicaDescriptor(r.StoreID()); found {
replicaID = replicaDesc.ReplicaID
} else if desc.IsInitialized() {
log.Fatalf(ctx, "r%d: cannot initialize replica which is not in descriptor %v", desc.RangeID, desc)
}
if r.mu.replicaID != replicaID {
if r.replicaID != replicaID {
log.Fatalf(ctx, "attempting to initialize a replica which has ID %d with ID %d",
r.mu.replicaID, replicaID)
r.replicaID, replicaID)
}

r.setDescLockedRaftMuLocked(ctx, desc)
Expand Down Expand Up @@ -346,9 +346,9 @@ func (r *Replica) setDescLockedRaftMuLocked(ctx context.Context, desc *roachpb.R
// store to exist on disk.
// 3) Various unit tests do not provide a valid descriptor.
replDesc, found := desc.GetReplicaDescriptor(r.StoreID())
if found && replDesc.ReplicaID != r.mu.replicaID {
if found && replDesc.ReplicaID != r.replicaID {
log.Fatalf(ctx, "attempted to change replica's ID from %d to %d",
r.mu.replicaID, replDesc.ReplicaID)
r.replicaID, replDesc.ReplicaID)
}

// Initialize the tenant. The must be the first time that the descriptor has
Expand Down Expand Up @@ -380,7 +380,7 @@ func (r *Replica) setDescLockedRaftMuLocked(ctx context.Context, desc *roachpb.R
r.mu.lastReplicaAddedTime = time.Time{}
}

r.rangeStr.store(r.mu.replicaID, desc)
r.rangeStr.store(r.replicaID, desc)
r.connectionClass.set(rpc.ConnectionClassForKey(desc.StartKey))
r.concMgr.OnRangeDescUpdated(desc)
r.mu.state.Desc = desc
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (r *Replica) computeChecksumPostApply(ctx context.Context, cc kvserverpb.Co

var shouldFatal bool
for _, rDesc := range cc.Terminate {
if rDesc.StoreID == r.store.StoreID() && rDesc.ReplicaID == r.mu.replicaID {
if rDesc.StoreID == r.store.StoreID() && rDesc.ReplicaID == r.replicaID {
shouldFatal = true
}
}
Expand Down Expand Up @@ -381,7 +381,7 @@ func (r *Replica) leasePostApplyLocked(
}
}

iAmTheLeaseHolder := newLease.Replica.ReplicaID == r.mu.replicaID
iAmTheLeaseHolder := newLease.Replica.ReplicaID == r.replicaID
// NB: in the case in which a node restarts, minLeaseProposedTS forces it to
// get a new lease and we make sure it gets a new sequence number, thus
// causing the right half of the disjunction to fire so that we update the
Expand Down
13 changes: 7 additions & 6 deletions pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ type proposer interface {
locker() sync.Locker
rlocker() sync.Locker
// The following require the proposer to hold (at least) a shared lock.
replicaID() roachpb.ReplicaID
getReplicaID() roachpb.ReplicaID
destroyed() destroyStatus
leaseAppliedIndex() uint64
enqueueUpdateCheck()
Expand Down Expand Up @@ -395,7 +395,8 @@ func (b *propBuf) FlushLockedWithRaftGroup(
if raftGroup != nil {
leaderInfo = b.p.leaderStatusRLocked(raftGroup)
// Sanity check.
if leaderInfo.leaderKnown && leaderInfo.leader == b.p.replicaID() && !leaderInfo.iAmTheLeader {
if leaderInfo.leaderKnown && leaderInfo.leader == b.p.getReplicaID() &&
!leaderInfo.iAmTheLeader {
log.Fatalf(ctx,
"inconsistent Raft state: state %s while the current replica is also the lead: %d",
raftGroup.BasicStatus().RaftState, leaderInfo.leader)
Expand Down Expand Up @@ -531,7 +532,7 @@ func (b *propBuf) FlushLockedWithRaftGroup(
// Flush any previously batched (non-conf change) proposals to
// preserve the correct ordering or proposals. Later proposals
// will start a new batch.
if err := proposeBatch(raftGroup, b.p.replicaID(), ents); err != nil {
if err := proposeBatch(raftGroup, b.p.getReplicaID(), ents); err != nil {
firstErr = err
continue
}
Expand Down Expand Up @@ -581,7 +582,7 @@ func (b *propBuf) FlushLockedWithRaftGroup(
if firstErr != nil {
return 0, firstErr
}
return used, proposeBatch(raftGroup, b.p.replicaID(), ents)
return used, proposeBatch(raftGroup, b.p.getReplicaID(), ents)
}

// allocateLAIAndClosedTimestampLocked computes a LAI and closed timestamp to be
Expand Down Expand Up @@ -997,8 +998,8 @@ func (rp *replicaProposer) rlocker() sync.Locker {
return rp.mu.RWMutex.RLocker()
}

func (rp *replicaProposer) replicaID() roachpb.ReplicaID {
return rp.mu.replicaID
func (rp *replicaProposer) getReplicaID() roachpb.ReplicaID {
return rp.replicaID
}

func (rp *replicaProposer) destroyed() destroyStatus {
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/replica_proposal_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (t *testProposer) rlocker() sync.Locker {
return t.RWMutex.RLocker()
}

func (t *testProposer) replicaID() roachpb.ReplicaID {
func (t *testProposer) getReplicaID() roachpb.ReplicaID {
return 1
}

Expand Down Expand Up @@ -170,7 +170,7 @@ func (t *testProposer) leaderStatusRLocked(raftGroup proposerRaft) rangeLeaderIn
var iAmTheLeader, leaderEligibleForLease bool
if leaderKnown {
leaderRep = roachpb.ReplicaID(raftGroup.BasicStatus().Lead)
iAmTheLeader = leaderRep == t.replicaID()
iAmTheLeader = leaderRep == t.getReplicaID()
repDesc := roachpb.ReplicaDescriptor{
ReplicaID: leaderRep,
Type: &t.leaderReplicaType,
Expand Down Expand Up @@ -521,9 +521,9 @@ func TestProposalBufferRejectLeaseAcqOnFollower(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
var p testProposer
var pc proposalCreator
// p.replicaID() is hardcoded; it'd better be hardcoded to what this test
// expects.
require.Equal(t, self, uint64(p.replicaID()))
// p.getReplicaID() is hardcoded; it'd better be hardcoded to what this
// test expects.
require.Equal(t, self, uint64(p.getReplicaID()))

var rejected roachpb.ReplicaID
if tc.expRejection {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_proposal_quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(

status := r.mu.internalRaftGroup.BasicStatus()
if r.mu.leaderID != lastLeaderID {
if r.mu.replicaID == r.mu.leaderID {
if r.replicaID == r.mu.leaderID {
// We're becoming the leader.
// Initialize the proposalQuotaBaseIndex at the applied index.
// After the proposal quota is enabled all entries applied by this replica
Expand Down Expand Up @@ -125,7 +125,7 @@ func (r *Replica) updateProposalQuotaRaftMuLocked(
}
return
} else if r.mu.proposalQuota == nil {
if r.mu.replicaID == r.mu.leaderID {
if r.replicaID == r.mu.leaderID {
log.Fatal(ctx, "leader has uninitialized proposalQuota pool")
}
// We're a follower.
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
r.mu.leaderID = leaderID
// Clear the remote proposal set. Would have been nil already if not
// previously the leader.
becameLeader = r.mu.leaderID == r.mu.replicaID
becameLeader = r.mu.leaderID == r.replicaID
}
r.mu.Unlock()

Expand Down Expand Up @@ -1040,8 +1040,8 @@ func (r *Replica) tick(ctx context.Context, livenessMap liveness.IsLiveMap) (boo
// into the local Raft group. The leader won't hit that path, so we update
// it whenever it ticks. In effect, this makes sure it always sees itself as
// alive.
if r.mu.replicaID == r.mu.leaderID {
r.mu.lastUpdateTimes.update(r.mu.replicaID, timeutil.Now())
if r.replicaID == r.mu.leaderID {
r.mu.lastUpdateTimes.update(r.replicaID, timeutil.Now())
}

r.mu.ticks++
Expand Down Expand Up @@ -1585,7 +1585,7 @@ func (r *Replica) withRaftGroupLocked(
ctx := r.AnnotateCtx(context.TODO())
raftGroup, err := raft.NewRawNode(newRaftConfig(
raft.Storage((*replicaRaftStorage)(r)),
uint64(r.mu.replicaID),
uint64(r.replicaID),
r.mu.state.RaftAppliedIndex,
r.store.cfg,
&raftLogger{ctx: ctx},
Expand Down Expand Up @@ -1709,7 +1709,7 @@ func (r *Replica) maybeCampaignOnWakeLocked(ctx context.Context) {
// method were to be called on an uninitialized replica (which
// has no state and thus an empty raft config), this might cause
// problems.
if _, currentMember := r.mu.state.Desc.GetReplicaDescriptorByID(r.mu.replicaID); !currentMember {
if _, currentMember := r.mu.state.Desc.GetReplicaDescriptorByID(r.replicaID); !currentMember {
return
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/replica_raft_quiesce.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,18 +402,18 @@ func shouldReplicaQuiesce(
func (r *Replica) quiesceAndNotifyRaftMuLockedReplicaMuLocked(
ctx context.Context, status *raft.Status, lagging laggingReplicaSet,
) bool {
fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(r.mu.replicaID, r.raftMu.lastToReplica)
fromReplica, fromErr := r.getReplicaDescriptorByIDRLocked(r.replicaID, r.raftMu.lastToReplica)
if fromErr != nil {
if log.V(4) {
log.Infof(ctx, "not quiescing: cannot find from replica (%d)", r.mu.replicaID)
log.Infof(ctx, "not quiescing: cannot find from replica (%d)", r.replicaID)
}
return false
}

r.quiesceLocked(ctx, lagging)

for id, prog := range status.Progress {
if roachpb.ReplicaID(id) == r.mu.replicaID {
if roachpb.ReplicaID(id) == r.replicaID {
continue
}
toReplica, toErr := r.getReplicaDescriptorByIDRLocked(
Expand Down Expand Up @@ -444,7 +444,7 @@ func (r *Replica) quiesceAndNotifyRaftMuLockedReplicaMuLocked(
curLagging = nil
}
msg := raftpb.Message{
From: uint64(r.mu.replicaID),
From: uint64(r.replicaID),
To: id,
Type: raftpb.MsgHeartbeat,
Term: status.Term,
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ func (r *Replica) applySnapshot(
// We've cleared all the raft state above, so we are forced to write the
// RaftReplicaID again here.
if err := r.raftMu.stateLoader.SetRaftReplicaID(
ctx, &unreplicatedSST, r.mu.replicaID); err != nil {
ctx, &unreplicatedSST, r.replicaID); err != nil {
return errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer")
}

Expand Down Expand Up @@ -1046,7 +1046,7 @@ func (r *Replica) applySnapshot(
// we missed the application of a merge) and we are the new leaseholder, we
// make sure to update the timestamp cache using the prior read summary to
// account for any reads that were served on the right-hand side range(s).
if len(subsumedRepls) > 0 && state.Lease.Replica.ReplicaID == r.mu.replicaID && prioReadSum != nil {
if len(subsumedRepls) > 0 && state.Lease.Replica.ReplicaID == r.replicaID && prioReadSum != nil {
applyReadSummaryToTimestampCache(r.store.tsCache, r.descRLocked(), *prioReadSum)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/store_create_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (s *Store) tryGetOrCreateReplica(
}

// The current replica needs to be removed, remove it and go back around.
if toTooOld := repl.mu.replicaID < replicaID; toTooOld {
if toTooOld := repl.replicaID < replicaID; toTooOld {
if shouldLog := log.V(1); shouldLog {
log.Infof(ctx, "found message for replica ID %d which is newer than %v",
replicaID, repl)
Expand All @@ -115,14 +115,14 @@ func (s *Store) tryGetOrCreateReplica(
}
defer repl.mu.RUnlock()

if repl.mu.replicaID > replicaID {
if repl.replicaID > replicaID {
// The sender is behind and is sending to an old replica.
// We could silently drop this message but this way we'll inform the
// sender that they may no longer exist.
repl.raftMu.Unlock()
return nil, false, &roachpb.RaftGroupDeletedError{}
}
if repl.mu.replicaID != replicaID {
if repl.replicaID != replicaID {
// This case should have been caught by handleToReplicaTooOld.
log.Fatalf(ctx, "intended replica id %d unexpectedly does not match the current replica %v",
replicaID, repl)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func (s *Store) HandleRaftResponse(
// that the replica has been removed and re-added quickly. In
// that case, we don't want to add it to the replicaGCQueue.
// If the replica is not alive then we also should ignore this error.
if tErr.ReplicaID != repl.mu.replicaID ||
if tErr.ReplicaID != repl.replicaID ||
!repl.mu.destroyStatus.IsAlive() ||
// Ignore if we want to test the replicaGC queue.
s.TestingKnobs().DisableEagerReplicaRemoval {
Expand Down
5 changes: 1 addition & 4 deletions pkg/kv/kvserver/store_remove_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ func (s *Store) removeInitializedReplicaRaftMuLocked(
// Sanity checks before committing to the removal by setting the
// destroy status.
var desc *roachpb.RangeDescriptor
var replicaID roachpb.ReplicaID
{
rep.readOnlyCmdMu.Lock()
rep.mu.Lock()
Expand Down Expand Up @@ -132,11 +131,9 @@ func (s *Store) removeInitializedReplicaRaftMuLocked(
// Mark the replica as removed before deleting data.
rep.mu.destroyStatus.Set(roachpb.NewRangeNotFoundError(rep.RangeID, rep.StoreID()),
destroyReasonRemoved)
replicaID = rep.mu.replicaID
rep.mu.Unlock()
rep.readOnlyCmdMu.Unlock()
}

// Proceed with the removal, all errors encountered from here down are fatal.

// Another sanity check that this replica is a part of this store.
Expand All @@ -150,7 +147,7 @@ func (s *Store) removeInitializedReplicaRaftMuLocked(

// During merges, the context might have the subsuming range, so we explicitly
// log the replica to be removed.
log.Infof(ctx, "removing replica r%d/%d", rep.RangeID, replicaID)
log.Infof(ctx, "removing replica r%d/%d", rep.RangeID, rep.replicaID)

s.mu.Lock()
if it := s.getOverlappingKeyRangeLocked(desc); it.repl != rep {
Expand Down
Loading