diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 1292b2483dd5..600a27a29bbe 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -19,7 +19,6 @@ import ( "context" "fmt" "math/rand" - "os" "reflect" "sync/atomic" "time" @@ -34,7 +33,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/abortspan" "github.com/cockroachdb/cockroach/pkg/storage/batcheval" - "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/rangefeed" @@ -125,17 +123,6 @@ const ( proposalIllegalLeaseIndex ) -// proposalResult indicates the result of a proposal. Exactly one of -// Reply, Err and ProposalRetry is set, and it represents the result of -// the proposal. -type proposalResult struct { - Reply *roachpb.BatchResponse - Err *roachpb.Error - ProposalRetry proposalReevaluationReason - Intents []result.IntentsWithArg - EndTxns []result.EndTxnIntents -} - type atomicDescString struct { strPtr unsafe.Pointer } @@ -168,42 +155,6 @@ func (d *atomicDescString) String() string { return *(*string)(atomic.LoadPointer(&d.strPtr)) } -func (s *destroyStatus) Set(err error, reason DestroyReason) { - s.err = err - s.reason = reason -} - -func (s *destroyStatus) Reset() { - s.Set(nil, destroyReasonAlive) -} - -// a lastUpdateTimesMap is maintained on the Raft leader to keep track of the -// last communication received from followers, which in turn informs the quota -// pool and log truncations. -type lastUpdateTimesMap map[roachpb.ReplicaID]time.Time - -func (m lastUpdateTimesMap) update(replicaID roachpb.ReplicaID, now time.Time) { - if m == nil { - return - } - m[replicaID] = now -} - -// isFollowerActive returns whether the specified follower has made -// communication with the leader in the last MaxQuotaReplicaLivenessDuration. -func (m lastUpdateTimesMap) isFollowerActive( - ctx context.Context, replicaID roachpb.ReplicaID, now time.Time, -) bool { - lastUpdateTime, ok := m[replicaID] - if !ok { - // If the follower has no entry in lastUpdateTimes, it has not been - // updated since r became the leader (at which point all then-existing - // replicas were updated). - return false - } - return now.Sub(lastUpdateTime) <= MaxQuotaReplicaLivenessDuration -} - // A Replica is a contiguous keyspace with writes managed via an // instance of the Raft consensus algorithm. Many ranges may exist // in a store and they are unlikely to be contiguous. Ranges are @@ -495,160 +446,8 @@ type KeyRange interface { var _ KeyRange = &Replica{} -// withRaftGroupLocked calls the supplied function with the (lazily -// initialized) Raft group. The supplied function should return true for the -// unquiesceAndWakeLeader argument if the replica should be unquiesced (and the -// leader awoken). See handleRaftReady for an instance of where this value -// varies. -// -// Requires that Replica.mu is held. Also requires that Replica.raftMu is held -// if either the caller can't guarantee that r.mu.internalRaftGroup != nil or -// the provided function requires Replica.raftMu. -func (r *Replica) withRaftGroupLocked( - mayCampaignOnWake bool, f func(r *raft.RawNode) (unquiesceAndWakeLeader bool, _ error), -) error { - if r.mu.destroyStatus.RemovedOrCorrupt() { - // Silently ignore all operations on destroyed replicas. We can't return an - // error here as all errors returned from this method are considered fatal. - return nil - } - - if r.mu.replicaID == 0 { - // The replica's raft group has not yet been configured (i.e. the replica - // was created from a preemptive snapshot). - return nil - } - - if r.mu.internalRaftGroup == nil { - r.raftMu.Mutex.AssertHeld() - - ctx := r.AnnotateCtx(context.TODO()) - raftGroup, err := raft.NewRawNode(newRaftConfig( - raft.Storage((*replicaRaftStorage)(r)), - uint64(r.mu.replicaID), - r.mu.state.RaftAppliedIndex, - r.store.cfg, - &raftLogger{ctx: ctx}, - ), nil) - if err != nil { - return err - } - r.mu.internalRaftGroup = raftGroup - - if mayCampaignOnWake { - r.maybeCampaignOnWakeLocked(ctx) - } - } - - // This wrapper function is a hack to add range IDs to stack traces - // using the same pattern as Replica.sendWithRangeID. - unquiesce, err := func(rangeID roachpb.RangeID, raftGroup *raft.RawNode) (bool, error) { - return f(raftGroup) - }(r.RangeID, r.mu.internalRaftGroup) - if unquiesce { - r.unquiesceAndWakeLeaderLocked() - } - return err -} - -// withRaftGroup calls the supplied function with the (lazily initialized) -// Raft group. It acquires and releases the Replica lock, so r.mu must not be -// held (or acquired by the supplied function). -// -// If mayCampaignOnWake is true, the replica may initiate a raft -// election if it was previously in a dormant state. Most callers -// should set this to true, because the prevote feature minimizes the -// disruption from unnecessary elections. The exception is that we -// should not initiate an election while handling incoming raft -// messages (which may include MsgVotes from an election in progress, -// and this election would be disrupted if we started our own). -// -// Has the same requirement for Replica.raftMu as withRaftGroupLocked. -func (r *Replica) withRaftGroup( - mayCampaignOnWake bool, f func(r *raft.RawNode) (unquiesceAndWakeLeader bool, _ error), -) error { - r.mu.Lock() - defer r.mu.Unlock() - return r.withRaftGroupLocked(mayCampaignOnWake, f) -} - -func shouldCampaignOnWake( - leaseStatus storagepb.LeaseStatus, - lease roachpb.Lease, - storeID roachpb.StoreID, - raftStatus raft.Status, -) bool { - // When waking up a range, campaign unless we know that another - // node holds a valid lease (this is most important after a split, - // when all replicas create their raft groups at about the same - // time, with a lease pre-assigned to one of them). Note that - // thanks to PreVote, unnecessary campaigns are not disruptive so - // we should err on the side of campaigining here. - anotherOwnsLease := leaseStatus.State == storagepb.LeaseState_VALID && !lease.OwnedBy(storeID) - - // If we're already campaigning or know who the leader is, don't - // start a new term. - noLeader := raftStatus.RaftState == raft.StateFollower && raftStatus.Lead == 0 - return !anotherOwnsLease && noLeader -} - -// maybeCampaignOnWakeLocked is called when the range wakes from a -// dormant state (either the initial "raftGroup == nil" state or after -// being quiescent) and campaigns for raft leadership if appropriate. -func (r *Replica) maybeCampaignOnWakeLocked(ctx context.Context) { - // Raft panics if a node that is not currently a member of the - // group tries to campaign. That happens primarily when we apply - // preemptive snapshots. - if _, currentMember := r.mu.state.Desc.GetReplicaDescriptorByID(r.mu.replicaID); !currentMember { - return - } - - leaseStatus := r.leaseStatus(*r.mu.state.Lease, r.store.Clock().Now(), r.mu.minLeaseProposedTS) - raftStatus := r.mu.internalRaftGroup.Status() - if shouldCampaignOnWake(leaseStatus, *r.mu.state.Lease, r.store.StoreID(), *raftStatus) { - log.VEventf(ctx, 3, "campaigning") - if err := r.mu.internalRaftGroup.Campaign(); err != nil { - log.VEventf(ctx, 1, "failed to campaign: %s", err) - } - } -} - var _ client.Sender = &Replica{} -func newReplica(rangeID roachpb.RangeID, store *Store) *Replica { - r := &Replica{ - AmbientContext: store.cfg.AmbientCtx, - RangeID: rangeID, - store: store, - abortSpan: abortspan.New(rangeID), - txnWaitQueue: txnwait.NewQueue(store), - } - r.mu.pendingLeaseRequest = makePendingLeaseRequest(r) - r.mu.stateLoader = stateloader.Make(rangeID) - r.mu.quiescent = true - r.mu.zone = config.DefaultZoneConfigRef() - - if leaseHistoryMaxEntries > 0 { - r.leaseHistory = newLeaseHistory() - } - if store.cfg.StorePool != nil { - r.leaseholderStats = newReplicaStats(store.Clock(), store.cfg.StorePool.getNodeLocalityString) - } - // Pass nil for the localityOracle because we intentionally don't track the - // origin locality of write load. - r.writeStats = newReplicaStats(store.Clock(), nil) - - // Init rangeStr with the range ID. - r.rangeStr.store(0, &roachpb.RangeDescriptor{RangeID: rangeID}) - // Add replica log tag - the value is rangeStr.String(). - r.AmbientContext.AddLogTag("r", &r.rangeStr) - // Add replica pointer value. NB: this was historically useful for debugging - // replica GC issues, but is a distraction at the moment. - // r.AmbientContext.AddLogTagStr("@", fmt.Sprintf("%x", unsafe.Pointer(r))) - r.raftMu.stateLoader = stateloader.Make(rangeID) - return r -} - // NewReplica initializes the replica using the given metadata. If the // replica is initialized (i.e. desc contains more than a RangeID), // replicaID should be 0 and the replicaID will be discovered from the @@ -660,93 +459,92 @@ func NewReplica( return r, r.init(desc, store.Clock(), replicaID) } -func (r *Replica) init( - desc *roachpb.RangeDescriptor, clock *hlc.Clock, replicaID roachpb.ReplicaID, -) error { - r.raftMu.Lock() - defer r.raftMu.Unlock() - r.mu.Lock() - defer r.mu.Unlock() - return r.initRaftMuLockedReplicaMuLocked(desc, clock, replicaID) +// Send executes a command on this range, dispatching it to the +// read-only, read-write, or admin execution path as appropriate. +// ctx should contain the log tags from the store (and up). +func (r *Replica) Send( + ctx context.Context, ba roachpb.BatchRequest, +) (*roachpb.BatchResponse, *roachpb.Error) { + return r.sendWithRangeID(ctx, r.RangeID, ba) } -func (r *Replica) initRaftMuLockedReplicaMuLocked( - desc *roachpb.RangeDescriptor, clock *hlc.Clock, replicaID roachpb.ReplicaID, -) error { - ctx := r.AnnotateCtx(context.TODO()) - if r.mu.state.Desc != nil && r.isInitializedRLocked() { - log.Fatalf(ctx, "r%d: cannot reinitialize an initialized replica", desc.RangeID) - } - if desc.IsInitialized() && replicaID != 0 { - return errors.Errorf("replicaID must be 0 when creating an initialized replica") +// sendWithRangeID takes an unused rangeID argument so that the range +// ID will be accessible in stack traces (both in panics and when +// sampling goroutines from a live server). This line is subject to +// the whims of the compiler and it can be difficult to find the right +// value, but as of this writing the following example shows a stack +// while processing range 21 (0x15) (the first occurrence of that +// number is the rangeID argument, the second is within the encoded +// BatchRequest, although we don't want to rely on that occurring +// within the portion printed in the stack trace): +// +// github.com/cockroachdb/cockroach/pkg/storage.(*Replica).sendWithRangeID(0xc420d1a000, 0x64bfb80, 0xc421564b10, 0x15, 0x153fd4634aeb0193, 0x0, 0x100000001, 0x1, 0x15, 0x0, ...) +func (r *Replica) sendWithRangeID( + ctx context.Context, rangeID roachpb.RangeID, ba roachpb.BatchRequest, +) (*roachpb.BatchResponse, *roachpb.Error) { + var br *roachpb.BatchResponse + if r.leaseholderStats != nil && ba.Header.GatewayNodeID != 0 { + r.leaseholderStats.record(ba.Header.GatewayNodeID) } - r.latchMgr = spanlatch.Make(r.store.stopper, r.store.metrics.SlowLatchRequests) - r.mu.proposals = map[storagebase.CmdIDKey]*ProposalData{} - r.mu.checksums = map[uuid.UUID]ReplicaChecksum{} - // Clear the internal raft group in case we're being reset. Since we're - // reloading the raft state below, it isn't safe to use the existing raft - // group. - r.mu.internalRaftGroup = nil - - var err error - if r.mu.state, err = r.mu.stateLoader.Load(ctx, r.store.Engine(), desc); err != nil { - return err - } + // Add the range log tag. + ctx = r.AnnotateCtx(ctx) + ctx, cleanup := tracing.EnsureContext(ctx, r.AmbientContext.Tracer, "replica send") + defer cleanup() - // Init the minLeaseProposedTS such that we won't use an existing lease (if - // any). This is so that, after a restart, we don't propose under old leases. - // If the replica is being created through a split, this value will be - // overridden. - if !r.store.cfg.TestingKnobs.DontPreventUseOfOldLeaseOnStart { - // Only do this if there was a previous lease. This shouldn't be important - // to do but consider that the first lease which is obtained is back-dated - // to a zero start timestamp (and this de-flakes some tests). If we set the - // min proposed TS here, this lease could not be renewed (by the semantics - // of minLeaseProposedTS); and since minLeaseProposedTS is copied on splits, - // this problem would multiply to a number of replicas at cluster bootstrap. - // Instead, we make the first lease special (which is OK) and the problem - // disappears. - if r.mu.state.Lease.Sequence > 0 { - r.mu.minLeaseProposedTS = clock.Now() - } - } + // If the internal Raft group is not initialized, create it and wake the leader. + r.maybeInitializeRaftGroup(ctx) - r.rangeStr.store(0, r.mu.state.Desc) + isReadOnly := ba.IsReadOnly() + useRaft := !isReadOnly && ba.IsWrite() - r.mu.lastIndex, err = r.mu.stateLoader.LoadLastIndex(ctx, r.store.Engine()) - if err != nil { - return err + if isReadOnly && r.store.Clock().MaxOffset() == timeutil.ClocklessMaxOffset { + // Clockless reads mode: reads go through Raft. + useRaft = true } - r.mu.lastTerm = invalidLastTerm - pErr, err := r.mu.stateLoader.LoadReplicaDestroyedError(ctx, r.store.Engine()) - if err != nil { - return err + if err := r.checkBatchRequest(ba, isReadOnly); err != nil { + return nil, roachpb.NewError(err) } - if r.mu.destroyStatus.RemovedOrCorrupt() { - if err := pErr.GetDetail(); err != nil { - r.mu.destroyStatus.Set(err, destroyReasonRemoved) + + if filter := r.store.cfg.TestingKnobs.TestingRequestFilter; filter != nil { + if pErr := filter(ba); pErr != nil { + return nil, pErr } } - if replicaID == 0 { - repDesc, ok := desc.GetReplicaDescriptor(r.store.StoreID()) - if !ok { - // This is intentionally not an error and is the code path exercised - // during preemptive snapshots. The replica ID will be sent when the - // actual raft replica change occurs. - return nil - } - replicaID = repDesc.ReplicaID + // Differentiate between admin, read-only and write. + var pErr *roachpb.Error + if useRaft { + log.Event(ctx, "read-write path") + br, pErr = r.executeWriteBatch(ctx, ba) + } else if isReadOnly { + log.Event(ctx, "read-only path") + br, pErr = r.executeReadOnlyBatch(ctx, ba) + } else if ba.IsAdmin() { + log.Event(ctx, "admin path") + br, pErr = r.executeAdminBatch(ctx, ba) + } else if len(ba.Requests) == 0 { + // empty batch; shouldn't happen (we could handle it, but it hints + // at someone doing weird things, and once we drop the key range + // from the header it won't be clear how to route those requests). + log.Fatalf(ctx, "empty batch") + } else { + log.Fatalf(ctx, "don't know how to handle command %s", ba) } - r.rangeStr.store(replicaID, r.mu.state.Desc) - if err := r.setReplicaIDRaftMuLockedMuLocked(replicaID); err != nil { - return err + if pErr != nil { + if _, ok := pErr.GetDetail().(*roachpb.RaftGroupDeletedError); ok { + // This error needs to be converted appropriately so that + // clients will retry. + pErr = roachpb.NewError(roachpb.NewRangeNotFoundError(r.RangeID, r.store.StoreID())) + } + log.Eventf(ctx, "replica.Send got error: %s", pErr) + } else { + if filter := r.store.cfg.TestingKnobs.TestingResponseFilter; filter != nil { + pErr = filter(ba, br) + } } - - r.assertStateLocked(ctx, r.store.Engine()) - return nil + return br, pErr } // String returns the string representation of the replica using an @@ -775,102 +573,6 @@ func (r *Replica) cleanupFailedProposalLocked(p *ProposalData) { } } -func (r *Replica) setReplicaID(replicaID roachpb.ReplicaID) error { - r.raftMu.Lock() - defer r.raftMu.Unlock() - r.mu.Lock() - defer r.mu.Unlock() - return r.setReplicaIDRaftMuLockedMuLocked(replicaID) -} - -func (r *Replica) setReplicaIDRaftMuLockedMuLocked(replicaID roachpb.ReplicaID) error { - if r.mu.replicaID == replicaID { - // The common case: the replica ID is unchanged. - return nil - } - if replicaID == 0 { - // If the incoming message does not have a new replica ID it is a - // preemptive snapshot. We'll update minReplicaID if the snapshot is - // accepted. - return nil - } - if replicaID < r.mu.minReplicaID { - return &roachpb.RaftGroupDeletedError{} - } - if r.mu.replicaID > replicaID { - return errors.Errorf("replicaID cannot move backwards from %d to %d", r.mu.replicaID, replicaID) - } - - if r.mu.destroyStatus.reason == destroyReasonRemovalPending { - // An earlier incarnation of this replica was removed, but apparently it has been re-added - // now, so reset the status. - r.mu.destroyStatus.Reset() - } - - // if r.mu.replicaID != 0 { - // // TODO(bdarnell): clean up previous raftGroup (update peers) - // } - - // Initialize or update the sideloaded storage. If the sideloaded storage - // already exists (which is iff the previous replicaID was non-zero), then - // we have to move the contained files over (this corresponds to the case in - // which our replica is removed and re-added to the range, without having - // the replica GC'ed in the meantime). - // - // Note that we can't race with a concurrent replicaGC here because both that - // and this is under raftMu. - var prevSideloadedDir string - if ss := r.raftMu.sideloaded; ss != nil { - prevSideloadedDir = ss.Dir() - } - var err error - if r.raftMu.sideloaded, err = newDiskSideloadStorage( - r.store.cfg.Settings, - r.mu.state.Desc.RangeID, - replicaID, - r.store.Engine().GetAuxiliaryDir(), - r.store.limiters.BulkIOWriteRate, - r.store.engine, - ); err != nil { - return errors.Wrap(err, "while initializing sideloaded storage") - } - if prevSideloadedDir != "" { - if _, err := os.Stat(prevSideloadedDir); err != nil { - if !os.IsNotExist(err) { - return err - } - // Old directory not found. - } else { - // Old directory found, so we have something to move over to the new one. - if err := os.Rename(prevSideloadedDir, r.raftMu.sideloaded.Dir()); err != nil { - return errors.Wrap(err, "while moving sideloaded directory") - } - } - } - - previousReplicaID := r.mu.replicaID - r.mu.replicaID = replicaID - - if replicaID >= r.mu.minReplicaID { - r.mu.minReplicaID = replicaID + 1 - } - // Reset the raft group to force its recreation on next usage. - r.mu.internalRaftGroup = nil - - // If there was a previous replica, repropose its pending commands under - // this new incarnation. - if previousReplicaID != 0 { - if log.V(1) { - log.Infof(r.AnnotateCtx(context.TODO()), "changed replica ID from %d to %d", - previousReplicaID, replicaID) - } - // repropose all pending commands under new replicaID. - r.refreshProposalsLocked(0, reasonReplicaIDChanged) - } - - return nil -} - // GetMinBytes gets the replica's minimum byte threshold. func (r *Replica) GetMinBytes() int64 { r.mu.RLock() @@ -909,342 +611,6 @@ func (r *Replica) isDestroyedRLocked() (DestroyReason, error) { return r.mu.destroyStatus.reason, r.mu.destroyStatus.err } -// GetLease returns the lease and, if available, the proposed next lease. -func (r *Replica) GetLease() (roachpb.Lease, roachpb.Lease) { - r.mu.RLock() - defer r.mu.RUnlock() - return r.getLeaseRLocked() -} - -func (r *Replica) getLeaseRLocked() (roachpb.Lease, roachpb.Lease) { - if nextLease, ok := r.mu.pendingLeaseRequest.RequestPending(); ok { - return *r.mu.state.Lease, nextLease - } - return *r.mu.state.Lease, roachpb.Lease{} -} - -// OwnsValidLease returns whether this replica is the current valid -// leaseholder. Note that this method does not check to see if a transfer is -// pending, but returns the status of the current lease and ownership at the -// specified point in time. -func (r *Replica) OwnsValidLease(ts hlc.Timestamp) bool { - r.mu.RLock() - defer r.mu.RUnlock() - return r.ownsValidLeaseRLocked(ts) -} - -func (r *Replica) ownsValidLeaseRLocked(ts hlc.Timestamp) bool { - return r.mu.state.Lease.OwnedBy(r.store.StoreID()) && - r.leaseStatus(*r.mu.state.Lease, ts, r.mu.minLeaseProposedTS).State == storagepb.LeaseState_VALID -} - -// IsLeaseValid returns true if the replica's lease is owned by this -// replica and is valid (not expired, not in stasis). -func (r *Replica) IsLeaseValid(lease roachpb.Lease, ts hlc.Timestamp) bool { - r.mu.RLock() - defer r.mu.RUnlock() - return r.isLeaseValidRLocked(lease, ts) -} - -func (r *Replica) isLeaseValidRLocked(lease roachpb.Lease, ts hlc.Timestamp) bool { - return r.leaseStatus(lease, ts, r.mu.minLeaseProposedTS).State == storagepb.LeaseState_VALID -} - -// newNotLeaseHolderError returns a NotLeaseHolderError initialized with the -// replica for the holder (if any) of the given lease. -// -// Note that this error can be generated on the Raft processing goroutine, so -// its output should be completely determined by its parameters. -func newNotLeaseHolderError( - l *roachpb.Lease, proposerStoreID roachpb.StoreID, rangeDesc *roachpb.RangeDescriptor, -) *roachpb.NotLeaseHolderError { - err := &roachpb.NotLeaseHolderError{ - RangeID: rangeDesc.RangeID, - } - err.Replica, _ = rangeDesc.GetReplicaDescriptor(proposerStoreID) - if l != nil { - // Normally, we return the lease-holding Replica here. However, in the - // case in which a leader removes itself, we want the followers to - // avoid handing out a misleading clue (which in itself shouldn't be - // overly disruptive as the lease would expire and then this method - // shouldn't be called for it any more, but at the very least it - // could catch tests in a loop, presumably due to manual clocks). - _, stillMember := rangeDesc.GetReplicaDescriptor(l.Replica.StoreID) - if stillMember { - err.LeaseHolder = &l.Replica - err.Lease = l - } - } - return err -} - -// leaseGoodToGo is a fast-path for lease checks which verifies that an -// existing lease is valid and owned by the current store. This method should -// not be called directly. Use redirectOnOrAcquireLease instead. -func (r *Replica) leaseGoodToGo(ctx context.Context) (storagepb.LeaseStatus, bool) { - timestamp := r.store.Clock().Now() - r.mu.RLock() - defer r.mu.RUnlock() - - if r.requiresExpiringLeaseRLocked() { - // Slow-path for expiration-based leases. - return storagepb.LeaseStatus{}, false - } - - status := r.leaseStatus(*r.mu.state.Lease, timestamp, r.mu.minLeaseProposedTS) - if status.State == storagepb.LeaseState_VALID && status.Lease.OwnedBy(r.store.StoreID()) { - // We own the lease... - if repDesc, err := r.getReplicaDescriptorRLocked(); err == nil { - if _, ok := r.mu.pendingLeaseRequest.TransferInProgress(repDesc.ReplicaID); !ok { - // ...and there is no transfer pending. - return status, true - } - } - } - return storagepb.LeaseStatus{}, false -} - -// redirectOnOrAcquireLease checks whether this replica has the lease -// at the current timestamp. If it does, returns success. If another -// replica currently holds the lease, redirects by returning -// NotLeaseHolderError. If the lease is expired, a renewal is -// synchronously requested. Leases are eagerly renewed when a request -// with a timestamp within rangeLeaseRenewalDuration of the lease -// expiration is served. -// -// TODO(spencer): for write commands, don't wait while requesting -// the range lease. If the lease acquisition fails, the write cmd -// will fail as well. If it succeeds, as is likely, then the write -// will not incur latency waiting for the command to complete. -// Reads, however, must wait. -func (r *Replica) redirectOnOrAcquireLease( - ctx context.Context, -) (storagepb.LeaseStatus, *roachpb.Error) { - if status, ok := r.leaseGoodToGo(ctx); ok { - return status, nil - } - - // Loop until the lease is held or the replica ascertains the actual - // lease holder. Returns also on context.Done() (timeout or cancellation). - var status storagepb.LeaseStatus - for attempt := 1; ; attempt++ { - timestamp := r.store.Clock().Now() - llHandle, pErr := func() (*leaseRequestHandle, *roachpb.Error) { - r.mu.Lock() - defer r.mu.Unlock() - - status = r.leaseStatus(*r.mu.state.Lease, timestamp, r.mu.minLeaseProposedTS) - switch status.State { - case storagepb.LeaseState_ERROR: - // Lease state couldn't be determined. - log.VEventf(ctx, 2, "lease state couldn't be determined") - return nil, roachpb.NewError( - newNotLeaseHolderError(nil, r.store.StoreID(), r.mu.state.Desc)) - - case storagepb.LeaseState_VALID, storagepb.LeaseState_STASIS: - if !status.Lease.OwnedBy(r.store.StoreID()) { - _, stillMember := r.mu.state.Desc.GetReplicaDescriptor(status.Lease.Replica.StoreID) - if !stillMember { - // This would be the situation in which the lease holder gets removed when - // holding the lease, or in which a lease request erroneously gets accepted - // for a replica that is not in the replica set. Neither of the two can - // happen in normal usage since appropriate mechanisms have been added: - // - // 1. Only the lease holder (at the time) schedules removal of a replica, - // but the lease can change hands and so the situation in which a follower - // coordinates a replica removal of the (new) lease holder is possible (if - // unlikely) in practice. In this situation, the new lease holder would at - // some point be asked to propose the replica change's EndTransaction to - // Raft. A check has been added that prevents proposals that amount to the - // removal of the proposer's (and hence lease holder's) Replica, preventing - // this scenario. - // - // 2. A lease is accepted for a Replica that has been removed. Without - // precautions, this could happen because lease requests are special in - // that they are the only command that is proposed on a follower (other - // commands may be proposed from followers, but not successfully so). For - // all proposals, processRaftCommand checks that their ProposalLease is - // compatible with the active lease for the log position. For commands - // proposed on the lease holder, the spanlatch manager then serializes - // everything. But lease requests get created on followers based on their - // local state and thus without being sequenced through latching. Thus - // a recently removed follower (unaware of its own removal) could submit - // a proposal for the lease (correctly using as a ProposerLease the last - // active lease), and would receive it given the up-to-date ProposerLease. - // Hence, an extra check is in order: processRaftCommand makes sure that - // lease requests for a replica not in the descriptor are bounced. - // - // However, this is possible if the `cockroach debug - // unsafe-remove-dead-replicas` command has been used, so - // this is just a logged error instead of a fatal - // assertion. - log.Errorf(ctx, "lease %s owned by replica %+v that no longer exists", - status.Lease, status.Lease.Replica) - } - // Otherwise, if the lease is currently held by another replica, redirect - // to the holder. - return nil, roachpb.NewError( - newNotLeaseHolderError(&status.Lease, r.store.StoreID(), r.mu.state.Desc)) - } - // Check that we're not in the process of transferring the lease away. - // If we are transferring the lease away, we can't serve reads or - // propose Raft commands - see comments on TransferLease. - // TODO(andrei): If the lease is being transferred, consider returning a - // new error type so the client backs off until the transfer is - // completed. - repDesc, err := r.getReplicaDescriptorRLocked() - if err != nil { - return nil, roachpb.NewError(err) - } - if transferLease, ok := r.mu.pendingLeaseRequest.TransferInProgress( - repDesc.ReplicaID); ok { - return nil, roachpb.NewError( - newNotLeaseHolderError(&transferLease, r.store.StoreID(), r.mu.state.Desc)) - } - - // If the lease is in stasis, we can't serve requests until we've - // renewed the lease, so we return the handle to block on renewal. - // Otherwise, we don't need to wait for the extension and simply - // ignore the returned handle (whose channel is buffered) and continue. - if status.State == storagepb.LeaseState_STASIS { - return r.requestLeaseLocked(ctx, status), nil - } - - // Extend the lease if this range uses expiration-based - // leases, the lease is in need of renewal, and there's not - // already an extension pending. - _, requestPending := r.mu.pendingLeaseRequest.RequestPending() - if !requestPending && r.requiresExpiringLeaseRLocked() { - renewal := status.Lease.Expiration.Add(-r.store.cfg.RangeLeaseRenewalDuration().Nanoseconds(), 0) - if !timestamp.Less(renewal) { - if log.V(2) { - log.Infof(ctx, "extending lease %s at %s", status.Lease, timestamp) - } - // We had an active lease to begin with, but we want to trigger - // a lease extension. We explicitly ignore the returned handle - // as we won't block on it. - _ = r.requestLeaseLocked(ctx, status) - } - } - - case storagepb.LeaseState_EXPIRED: - // No active lease: Request renewal if a renewal is not already pending. - log.VEventf(ctx, 2, "request range lease (attempt #%d)", attempt) - return r.requestLeaseLocked(ctx, status), nil - - case storagepb.LeaseState_PROSCRIBED: - // Lease proposed timestamp is earlier than the min proposed - // timestamp limit this replica must observe. If this store - // owns the lease, re-request. Otherwise, redirect. - if status.Lease.OwnedBy(r.store.StoreID()) { - log.VEventf(ctx, 2, "request range lease (attempt #%d)", attempt) - return r.requestLeaseLocked(ctx, status), nil - } - // If lease is currently held by another, redirect to holder. - return nil, roachpb.NewError( - newNotLeaseHolderError(&status.Lease, r.store.StoreID(), r.mu.state.Desc)) - } - - // Return a nil handle to signal that we have a valid lease. - return nil, nil - }() - if pErr != nil { - return storagepb.LeaseStatus{}, pErr - } - if llHandle == nil { - // We own a valid lease. - return status, nil - } - - // Wait for the range lease to finish, or the context to expire. - pErr = func() (pErr *roachpb.Error) { - slowTimer := timeutil.NewTimer() - defer slowTimer.Stop() - slowTimer.Reset(base.SlowRequestThreshold) - tBegin := timeutil.Now() - for { - select { - case pErr = <-llHandle.C(): - if pErr != nil { - switch tErr := pErr.GetDetail().(type) { - case *roachpb.AmbiguousResultError: - // This can happen if the RequestLease command we sent has been - // applied locally through a snapshot: the RequestLeaseRequest - // cannot be reproposed so we get this ambiguity. - // We'll just loop around. - return nil - case *roachpb.LeaseRejectedError: - if tErr.Existing.OwnedBy(r.store.StoreID()) { - // The RequestLease command we sent was rejected because another - // lease was applied in the meantime, but we own that other - // lease. So, loop until the current node becomes aware that - // it's the leaseholder. - return nil - } - - // Getting a LeaseRejectedError back means someone else got there - // first, or the lease request was somehow invalid due to a concurrent - // change. That concurrent change could have been that this replica was - // removed (see processRaftCommand), so check for that case before - // falling back to a NotLeaseHolderError. - var err error - if _, descErr := r.GetReplicaDescriptor(); descErr != nil { - err = descErr - } else if lease, _ := r.GetLease(); !r.IsLeaseValid(lease, r.store.Clock().Now()) { - err = newNotLeaseHolderError(nil, r.store.StoreID(), r.Desc()) - } else { - err = newNotLeaseHolderError(&lease, r.store.StoreID(), r.Desc()) - } - pErr = roachpb.NewError(err) - } - return pErr - } - log.Eventf(ctx, "lease acquisition succeeded: %+v", status.Lease) - return nil - case <-slowTimer.C: - slowTimer.Read = true - log.Warningf(ctx, "have been waiting %s attempting to acquire lease", - base.SlowRequestThreshold) - r.store.metrics.SlowLeaseRequests.Inc(1) - defer func() { - r.store.metrics.SlowLeaseRequests.Dec(1) - log.Infof(ctx, "slow lease acquisition finished after %s with error %v after %d attempts", timeutil.Since(tBegin), pErr, attempt) - }() - case <-ctx.Done(): - llHandle.Cancel() - log.VErrEventf(ctx, 2, "lease acquisition failed: %s", ctx.Err()) - return roachpb.NewError(newNotLeaseHolderError(nil, r.store.StoreID(), r.Desc())) - case <-r.store.Stopper().ShouldStop(): - llHandle.Cancel() - return roachpb.NewError(newNotLeaseHolderError(nil, r.store.StoreID(), r.Desc())) - } - } - }() - if pErr != nil { - return storagepb.LeaseStatus{}, pErr - } - } -} - -// IsInitialized is true if we know the metadata of this range, either -// because we created it or we have received an initial snapshot from -// another node. It is false when a range has been created in response -// to an incoming message but we are waiting for our initial snapshot. -func (r *Replica) IsInitialized() bool { - r.mu.RLock() - defer r.mu.RUnlock() - return r.isInitializedRLocked() -} - -// isInitializedRLocked is true if we know the metadata of this range, either -// because we created it or we have received an initial snapshot from -// another node. It is false when a range has been created in response -// to an incoming message but we are waiting for our initial snapshot. -// isInitializedLocked requires that the replica lock is held. -func (r *Replica) isInitializedRLocked() bool { - return r.mu.state.Desc.IsInitialized() -} - // DescAndZone returns the authoritative range descriptor as well // as the zone config for the replica. func (r *Replica) DescAndZone() (*roachpb.RangeDescriptor, *config.ZoneConfig) { @@ -1346,37 +712,6 @@ func (r *Replica) GetTxnSpanGCThreshold() hlc.Timestamp { return *r.mu.state.TxnSpanGCThreshold } -// setDesc atomically sets the replica's descriptor. It requires raftMu to be -// locked. -func (r *Replica) setDesc(ctx context.Context, desc *roachpb.RangeDescriptor) { - r.mu.Lock() - defer r.mu.Unlock() - - if desc.RangeID != r.RangeID { - log.Fatalf(ctx, "range descriptor ID (%d) does not match replica's range ID (%d)", - desc.RangeID, r.RangeID) - } - if r.mu.state.Desc != nil && r.mu.state.Desc.IsInitialized() && - (desc == nil || !desc.IsInitialized()) { - log.Fatalf(ctx, "cannot replace initialized descriptor with uninitialized one: %+v -> %+v", - r.mu.state.Desc, desc) - } - if r.mu.state.Desc != nil && r.mu.state.Desc.IsInitialized() && - !r.mu.state.Desc.StartKey.Equal(desc.StartKey) { - log.Fatalf(ctx, "attempted to change replica's start key from %s to %s", - r.mu.state.Desc.StartKey, desc.StartKey) - } - - newMaxID := maxReplicaID(desc) - if newMaxID > r.mu.lastReplicaAdded { - r.mu.lastReplicaAdded = newMaxID - r.mu.lastReplicaAddedTime = timeutil.Now() - } - - r.rangeStr.store(r.mu.replicaID, desc) - r.mu.state.Desc = desc -} - func maxReplicaID(desc *roachpb.RangeDescriptor) roachpb.ReplicaID { if desc == nil || !desc.IsInitialized() { return 0 @@ -1579,120 +914,6 @@ func (r *Replica) assertStateLocked(ctx context.Context, reader engine.Reader) { } } -// maybeInitializeRaftGroup check whether the internal Raft group has -// not yet been initialized. If not, it is created and set to campaign -// if this replica is the most recent owner of the range lease. -func (r *Replica) maybeInitializeRaftGroup(ctx context.Context) { - r.mu.RLock() - // If this replica hasn't initialized the Raft group, create it and - // unquiesce and wake the leader to ensure the replica comes up to date. - initialized := r.mu.internalRaftGroup != nil - r.mu.RUnlock() - if initialized { - return - } - - // Acquire raftMu, but need to maintain lock ordering (raftMu then mu). - r.raftMu.Lock() - defer r.raftMu.Unlock() - r.mu.Lock() - defer r.mu.Unlock() - - if err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { - return true, nil - }); err != nil { - log.VErrEventf(ctx, 1, "unable to initialize raft group: %s", err) - } -} - -// Send executes a command on this range, dispatching it to the -// read-only, read-write, or admin execution path as appropriate. -// ctx should contain the log tags from the store (and up). -func (r *Replica) Send( - ctx context.Context, ba roachpb.BatchRequest, -) (*roachpb.BatchResponse, *roachpb.Error) { - return r.sendWithRangeID(ctx, r.RangeID, ba) -} - -// sendWithRangeID takes an unused rangeID argument so that the range -// ID will be accessible in stack traces (both in panics and when -// sampling goroutines from a live server). This line is subject to -// the whims of the compiler and it can be difficult to find the right -// value, but as of this writing the following example shows a stack -// while processing range 21 (0x15) (the first occurrence of that -// number is the rangeID argument, the second is within the encoded -// BatchRequest, although we don't want to rely on that occurring -// within the portion printed in the stack trace): -// -// github.com/cockroachdb/cockroach/pkg/storage.(*Replica).sendWithRangeID(0xc420d1a000, 0x64bfb80, 0xc421564b10, 0x15, 0x153fd4634aeb0193, 0x0, 0x100000001, 0x1, 0x15, 0x0, ...) -func (r *Replica) sendWithRangeID( - ctx context.Context, rangeID roachpb.RangeID, ba roachpb.BatchRequest, -) (*roachpb.BatchResponse, *roachpb.Error) { - var br *roachpb.BatchResponse - if r.leaseholderStats != nil && ba.Header.GatewayNodeID != 0 { - r.leaseholderStats.record(ba.Header.GatewayNodeID) - } - - // Add the range log tag. - ctx = r.AnnotateCtx(ctx) - ctx, cleanup := tracing.EnsureContext(ctx, r.AmbientContext.Tracer, "replica send") - defer cleanup() - - // If the internal Raft group is not initialized, create it and wake the leader. - r.maybeInitializeRaftGroup(ctx) - - isReadOnly := ba.IsReadOnly() - useRaft := !isReadOnly && ba.IsWrite() - - if isReadOnly && r.store.Clock().MaxOffset() == timeutil.ClocklessMaxOffset { - // Clockless reads mode: reads go through Raft. - useRaft = true - } - - if err := r.checkBatchRequest(ba, isReadOnly); err != nil { - return nil, roachpb.NewError(err) - } - - if filter := r.store.cfg.TestingKnobs.TestingRequestFilter; filter != nil { - if pErr := filter(ba); pErr != nil { - return nil, pErr - } - } - - // Differentiate between admin, read-only and write. - var pErr *roachpb.Error - if useRaft { - log.Event(ctx, "read-write path") - br, pErr = r.executeWriteBatch(ctx, ba) - } else if isReadOnly { - log.Event(ctx, "read-only path") - br, pErr = r.executeReadOnlyBatch(ctx, ba) - } else if ba.IsAdmin() { - log.Event(ctx, "admin path") - br, pErr = r.executeAdminBatch(ctx, ba) - } else if len(ba.Requests) == 0 { - // empty batch; shouldn't happen (we could handle it, but it hints - // at someone doing weird things, and once we drop the key range - // from the header it won't be clear how to route those requests). - log.Fatalf(ctx, "empty batch") - } else { - log.Fatalf(ctx, "don't know how to handle command %s", ba) - } - if pErr != nil { - if _, ok := pErr.GetDetail().(*roachpb.RaftGroupDeletedError); ok { - // This error needs to be converted appropriately so that - // clients will retry. - pErr = roachpb.NewError(roachpb.NewRangeNotFoundError(r.RangeID, r.store.StoreID())) - } - log.Eventf(ctx, "replica.Send got error: %s", pErr) - } else { - if filter := r.store.cfg.TestingKnobs.TestingResponseFilter; filter != nil { - pErr = filter(ba, br) - } - } - return br, pErr -} - // requestCanProceed returns an error if a request (identified by its // key span and timestamp) can proceed. It may be called multiple // times during the processing of the request (i.e. during both @@ -2280,151 +1501,6 @@ func (r *Replica) maybeWatchForMerge(ctx context.Context) error { return err } -// requestToProposal converts a BatchRequest into a ProposalData, by -// evaluating it. The returned ProposalData is partially valid even -// on a non-nil *roachpb.Error and should be proposed through Raft -// if ProposalData.command is non-nil. -func (r *Replica) requestToProposal( - ctx context.Context, - idKey storagebase.CmdIDKey, - ba roachpb.BatchRequest, - endCmds *endCmds, - spans *spanset.SpanSet, -) (*ProposalData, *roachpb.Error) { - res, needConsensus, pErr := r.evaluateProposal(ctx, idKey, ba, spans) - - // Fill out the results even if pErr != nil; we'll return the error below. - proposal := &ProposalData{ - ctx: ctx, - idKey: idKey, - endCmds: endCmds, - doneCh: make(chan proposalResult, 1), - Local: &res.Local, - Request: &ba, - } - - if needConsensus { - proposal.command = &storagepb.RaftCommand{ - ReplicatedEvalResult: res.Replicated, - WriteBatch: res.WriteBatch, - LogicalOpLog: res.LogicalOpLog, - } - } - - return proposal, pErr -} - -// evaluateProposal generates a Result from the given request by -// evaluating it, returning both state which is held only on the -// proposer and that which is to be replicated through Raft. The -// return value is ready to be inserted into Replica's proposal map -// and subsequently passed to submitProposalLocked. -// -// The method also returns a flag indicating if the request needs to -// be proposed through Raft and replicated. This flag will be false -// either if the request was a no-op or if it hit an error. In this -// case, the result can be sent directly back to the client without -// going through Raft, but while still handling LocalEvalResult. -// -// Replica.mu must not be held. -func (r *Replica) evaluateProposal( - ctx context.Context, idKey storagebase.CmdIDKey, ba roachpb.BatchRequest, spans *spanset.SpanSet, -) (*result.Result, bool, *roachpb.Error) { - if ba.Timestamp == (hlc.Timestamp{}) { - return nil, false, roachpb.NewErrorf("can't propose Raft command with zero timestamp") - } - - // Evaluate the commands. If this returns without an error, the batch should - // be committed. Note that we don't hold any locks at this point. This is - // important since evaluating a proposal is expensive. - // TODO(tschottdorf): absorb all returned values in `res` below this point - // in the call stack as well. - batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, spans) - - // Note: reusing the proposer's batch when applying the command on the - // proposer was explored as an optimization but resulted in no performance - // benefit. - defer batch.Close() - - if pErr != nil { - pErr = r.maybeSetCorrupt(ctx, pErr) - - txn := pErr.GetTxn() - if txn != nil && ba.Txn == nil { - log.Fatalf(ctx, "error had a txn but batch is non-transactional. Err txn: %s", txn) - } - - // Failed proposals can't have any Result except for what's - // whitelisted here. - intents := res.Local.DetachIntents() - endTxns := res.Local.DetachEndTxns(true /* alwaysOnly */) - res.Local = result.LocalResult{ - Intents: &intents, - EndTxns: &endTxns, - Metrics: res.Local.Metrics, - } - res.Replicated.Reset() - return &res, false /* needConsensus */, pErr - } - - // Set the local reply, which is held only on the proposing replica and is - // returned to the client after the proposal completes, or immediately if - // replication is not necessary. - res.Local.Reply = br - - // needConsensus determines if the result needs to be replicated and - // proposed through Raft. This is necessary if at least one of the - // following conditions is true: - // 1. the request created a non-empty write batch. - // 2. the request had an impact on the MVCCStats. NB: this is possible - // even with an empty write batch when stats are recomputed. - // 3. the request has replicated side-effects. - // 4. the cluster is in "clockless" mode, in which case consensus is - // used to enforce a linearization of all reads and writes. - needConsensus := !batch.Empty() || - ms != (enginepb.MVCCStats{}) || - !res.Replicated.Equal(storagepb.ReplicatedEvalResult{}) || - r.store.Clock().MaxOffset() == timeutil.ClocklessMaxOffset - - if needConsensus { - // Set the proposal's WriteBatch, which is the serialized representation of - // the proposals effect on RocksDB. - res.WriteBatch = &storagepb.WriteBatch{ - Data: batch.Repr(), - } - - // Set the proposal's replicated result, which contains metadata and - // side-effects that are to be replicated to all replicas. - res.Replicated.IsLeaseRequest = ba.IsLeaseRequest() - res.Replicated.Timestamp = ba.Timestamp - if r.store.cfg.Settings.Version.IsActive(cluster.VersionMVCCNetworkStats) { - res.Replicated.Delta = ms.ToStatsDelta() - } else { - res.Replicated.DeprecatedDelta = &ms - } - // If the RangeAppliedState key is not being used and the cluster version is - // high enough to guarantee that all current and future binaries will - // understand the key, we send the migration flag through Raft. Because - // there is a delay between command proposal and application, we may end up - // setting this migration flag multiple times. This is ok, because the - // migration is idempotent. - // TODO(nvanbenschoten): This will be baked in to 2.1, so it can be removed - // in the 2.2 release. - r.mu.RLock() - usingAppliedStateKey := r.mu.state.UsingAppliedStateKey - r.mu.RUnlock() - if !usingAppliedStateKey && - r.ClusterSettings().Version.IsMinSupported(cluster.VersionRangeAppliedStateKey) { - if res.Replicated.State == nil { - res.Replicated.State = &storagepb.ReplicaState{} - } - res.Replicated.State.UsingAppliedStateKey = true - } - } - - return &res, needConsensus, nil -} - func (r *Replica) maybeTransferRaftLeadership(ctx context.Context) { r.mu.Lock() r.maybeTransferRaftLeadershipLocked(ctx) diff --git a/pkg/storage/replica_destroy.go b/pkg/storage/replica_destroy.go index f1ff96382ac3..92b01e15a2da 100644 --- a/pkg/storage/replica_destroy.go +++ b/pkg/storage/replica_destroy.go @@ -49,6 +49,15 @@ type destroyStatus struct { err error } +func (s *destroyStatus) Set(err error, reason DestroyReason) { + s.err = err + s.reason = reason +} + +func (s *destroyStatus) Reset() { + s.Set(nil, destroyReasonAlive) +} + // IsAlive returns true when a replica is alive. func (s destroyStatus) IsAlive() bool { return s.reason == destroyReasonAlive diff --git a/pkg/storage/replica_init.go b/pkg/storage/replica_init.go new file mode 100644 index 000000000000..5edac3e91696 --- /dev/null +++ b/pkg/storage/replica_init.go @@ -0,0 +1,329 @@ +// Copyright 2019 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package storage + +import ( + "context" + "os" + + "github.com/cockroachdb/cockroach/pkg/config" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/abortspan" + "github.com/cockroachdb/cockroach/pkg/storage/spanlatch" + "github.com/cockroachdb/cockroach/pkg/storage/stateloader" + "github.com/cockroachdb/cockroach/pkg/storage/storagebase" + "github.com/cockroachdb/cockroach/pkg/storage/txnwait" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/pkg/errors" + "go.etcd.io/etcd/raft" +) + +func newReplica(rangeID roachpb.RangeID, store *Store) *Replica { + r := &Replica{ + AmbientContext: store.cfg.AmbientCtx, + RangeID: rangeID, + store: store, + abortSpan: abortspan.New(rangeID), + txnWaitQueue: txnwait.NewQueue(store), + } + r.mu.pendingLeaseRequest = makePendingLeaseRequest(r) + r.mu.stateLoader = stateloader.Make(rangeID) + r.mu.quiescent = true + r.mu.zone = config.DefaultZoneConfigRef() + + if leaseHistoryMaxEntries > 0 { + r.leaseHistory = newLeaseHistory() + } + if store.cfg.StorePool != nil { + r.leaseholderStats = newReplicaStats(store.Clock(), store.cfg.StorePool.getNodeLocalityString) + } + // Pass nil for the localityOracle because we intentionally don't track the + // origin locality of write load. + r.writeStats = newReplicaStats(store.Clock(), nil) + + // Init rangeStr with the range ID. + r.rangeStr.store(0, &roachpb.RangeDescriptor{RangeID: rangeID}) + // Add replica log tag - the value is rangeStr.String(). + r.AmbientContext.AddLogTag("r", &r.rangeStr) + // Add replica pointer value. NB: this was historically useful for debugging + // replica GC issues, but is a distraction at the moment. + // r.AmbientContext.AddLogTagStr("@", fmt.Sprintf("%x", unsafe.Pointer(r))) + r.raftMu.stateLoader = stateloader.Make(rangeID) + return r +} + +func (r *Replica) init( + desc *roachpb.RangeDescriptor, clock *hlc.Clock, replicaID roachpb.ReplicaID, +) error { + r.raftMu.Lock() + defer r.raftMu.Unlock() + r.mu.Lock() + defer r.mu.Unlock() + return r.initRaftMuLockedReplicaMuLocked(desc, clock, replicaID) +} + +func (r *Replica) initRaftMuLockedReplicaMuLocked( + desc *roachpb.RangeDescriptor, clock *hlc.Clock, replicaID roachpb.ReplicaID, +) error { + ctx := r.AnnotateCtx(context.TODO()) + if r.mu.state.Desc != nil && r.isInitializedRLocked() { + log.Fatalf(ctx, "r%d: cannot reinitialize an initialized replica", desc.RangeID) + } + if desc.IsInitialized() && replicaID != 0 { + return errors.Errorf("replicaID must be 0 when creating an initialized replica") + } + + r.latchMgr = spanlatch.Make(r.store.stopper, r.store.metrics.SlowLatchRequests) + r.mu.proposals = map[storagebase.CmdIDKey]*ProposalData{} + r.mu.checksums = map[uuid.UUID]ReplicaChecksum{} + // Clear the internal raft group in case we're being reset. Since we're + // reloading the raft state below, it isn't safe to use the existing raft + // group. + r.mu.internalRaftGroup = nil + + var err error + if r.mu.state, err = r.mu.stateLoader.Load(ctx, r.store.Engine(), desc); err != nil { + return err + } + + // Init the minLeaseProposedTS such that we won't use an existing lease (if + // any). This is so that, after a restart, we don't propose under old leases. + // If the replica is being created through a split, this value will be + // overridden. + if !r.store.cfg.TestingKnobs.DontPreventUseOfOldLeaseOnStart { + // Only do this if there was a previous lease. This shouldn't be important + // to do but consider that the first lease which is obtained is back-dated + // to a zero start timestamp (and this de-flakes some tests). If we set the + // min proposed TS here, this lease could not be renewed (by the semantics + // of minLeaseProposedTS); and since minLeaseProposedTS is copied on splits, + // this problem would multiply to a number of replicas at cluster bootstrap. + // Instead, we make the first lease special (which is OK) and the problem + // disappears. + if r.mu.state.Lease.Sequence > 0 { + r.mu.minLeaseProposedTS = clock.Now() + } + } + + r.rangeStr.store(0, r.mu.state.Desc) + + r.mu.lastIndex, err = r.mu.stateLoader.LoadLastIndex(ctx, r.store.Engine()) + if err != nil { + return err + } + r.mu.lastTerm = invalidLastTerm + + pErr, err := r.mu.stateLoader.LoadReplicaDestroyedError(ctx, r.store.Engine()) + if err != nil { + return err + } + if r.mu.destroyStatus.RemovedOrCorrupt() { + if err := pErr.GetDetail(); err != nil { + r.mu.destroyStatus.Set(err, destroyReasonRemoved) + } + } + + if replicaID == 0 { + repDesc, ok := desc.GetReplicaDescriptor(r.store.StoreID()) + if !ok { + // This is intentionally not an error and is the code path exercised + // during preemptive snapshots. The replica ID will be sent when the + // actual raft replica change occurs. + return nil + } + replicaID = repDesc.ReplicaID + } + r.rangeStr.store(replicaID, r.mu.state.Desc) + if err := r.setReplicaIDRaftMuLockedMuLocked(replicaID); err != nil { + return err + } + + r.assertStateLocked(ctx, r.store.Engine()) + return nil +} + +func (r *Replica) setReplicaID(replicaID roachpb.ReplicaID) error { + r.raftMu.Lock() + defer r.raftMu.Unlock() + r.mu.Lock() + defer r.mu.Unlock() + return r.setReplicaIDRaftMuLockedMuLocked(replicaID) +} + +func (r *Replica) setReplicaIDRaftMuLockedMuLocked(replicaID roachpb.ReplicaID) error { + if r.mu.replicaID == replicaID { + // The common case: the replica ID is unchanged. + return nil + } + if replicaID == 0 { + // If the incoming message does not have a new replica ID it is a + // preemptive snapshot. We'll update minReplicaID if the snapshot is + // accepted. + return nil + } + if replicaID < r.mu.minReplicaID { + return &roachpb.RaftGroupDeletedError{} + } + if r.mu.replicaID > replicaID { + return errors.Errorf("replicaID cannot move backwards from %d to %d", r.mu.replicaID, replicaID) + } + + if r.mu.destroyStatus.reason == destroyReasonRemovalPending { + // An earlier incarnation of this replica was removed, but apparently it has been re-added + // now, so reset the status. + r.mu.destroyStatus.Reset() + } + + // if r.mu.replicaID != 0 { + // // TODO(bdarnell): clean up previous raftGroup (update peers) + // } + + // Initialize or update the sideloaded storage. If the sideloaded storage + // already exists (which is iff the previous replicaID was non-zero), then + // we have to move the contained files over (this corresponds to the case in + // which our replica is removed and re-added to the range, without having + // the replica GC'ed in the meantime). + // + // Note that we can't race with a concurrent replicaGC here because both that + // and this is under raftMu. + var prevSideloadedDir string + if ss := r.raftMu.sideloaded; ss != nil { + prevSideloadedDir = ss.Dir() + } + var err error + if r.raftMu.sideloaded, err = newDiskSideloadStorage( + r.store.cfg.Settings, + r.mu.state.Desc.RangeID, + replicaID, + r.store.Engine().GetAuxiliaryDir(), + r.store.limiters.BulkIOWriteRate, + r.store.engine, + ); err != nil { + return errors.Wrap(err, "while initializing sideloaded storage") + } + if prevSideloadedDir != "" { + if _, err := os.Stat(prevSideloadedDir); err != nil { + if !os.IsNotExist(err) { + return err + } + // Old directory not found. + } else { + // Old directory found, so we have something to move over to the new one. + if err := os.Rename(prevSideloadedDir, r.raftMu.sideloaded.Dir()); err != nil { + return errors.Wrap(err, "while moving sideloaded directory") + } + } + } + + previousReplicaID := r.mu.replicaID + r.mu.replicaID = replicaID + + if replicaID >= r.mu.minReplicaID { + r.mu.minReplicaID = replicaID + 1 + } + // Reset the raft group to force its recreation on next usage. + r.mu.internalRaftGroup = nil + + // If there was a previous replica, repropose its pending commands under + // this new incarnation. + if previousReplicaID != 0 { + if log.V(1) { + log.Infof(r.AnnotateCtx(context.TODO()), "changed replica ID from %d to %d", + previousReplicaID, replicaID) + } + // repropose all pending commands under new replicaID. + r.refreshProposalsLocked(0, reasonReplicaIDChanged) + } + + return nil +} + +// IsInitialized is true if we know the metadata of this range, either +// because we created it or we have received an initial snapshot from +// another node. It is false when a range has been created in response +// to an incoming message but we are waiting for our initial snapshot. +func (r *Replica) IsInitialized() bool { + r.mu.RLock() + defer r.mu.RUnlock() + return r.isInitializedRLocked() +} + +// isInitializedRLocked is true if we know the metadata of this range, either +// because we created it or we have received an initial snapshot from +// another node. It is false when a range has been created in response +// to an incoming message but we are waiting for our initial snapshot. +// isInitializedLocked requires that the replica lock is held. +func (r *Replica) isInitializedRLocked() bool { + return r.mu.state.Desc.IsInitialized() +} + +// maybeInitializeRaftGroup check whether the internal Raft group has +// not yet been initialized. If not, it is created and set to campaign +// if this replica is the most recent owner of the range lease. +func (r *Replica) maybeInitializeRaftGroup(ctx context.Context) { + r.mu.RLock() + // If this replica hasn't initialized the Raft group, create it and + // unquiesce and wake the leader to ensure the replica comes up to date. + initialized := r.mu.internalRaftGroup != nil + r.mu.RUnlock() + if initialized { + return + } + + // Acquire raftMu, but need to maintain lock ordering (raftMu then mu). + r.raftMu.Lock() + defer r.raftMu.Unlock() + r.mu.Lock() + defer r.mu.Unlock() + + if err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { + return true, nil + }); err != nil { + log.VErrEventf(ctx, 1, "unable to initialize raft group: %s", err) + } +} + +// setDesc atomically sets the replica's descriptor. It requires raftMu to be +// locked. +func (r *Replica) setDesc(ctx context.Context, desc *roachpb.RangeDescriptor) { + r.mu.Lock() + defer r.mu.Unlock() + + if desc.RangeID != r.RangeID { + log.Fatalf(ctx, "range descriptor ID (%d) does not match replica's range ID (%d)", + desc.RangeID, r.RangeID) + } + if r.mu.state.Desc != nil && r.mu.state.Desc.IsInitialized() && + (desc == nil || !desc.IsInitialized()) { + log.Fatalf(ctx, "cannot replace initialized descriptor with uninitialized one: %+v -> %+v", + r.mu.state.Desc, desc) + } + if r.mu.state.Desc != nil && r.mu.state.Desc.IsInitialized() && + !r.mu.state.Desc.StartKey.Equal(desc.StartKey) { + log.Fatalf(ctx, "attempted to change replica's start key from %s to %s", + r.mu.state.Desc.StartKey, desc.StartKey) + } + + newMaxID := maxReplicaID(desc) + if newMaxID > r.mu.lastReplicaAdded { + r.mu.lastReplicaAdded = newMaxID + r.mu.lastReplicaAddedTime = timeutil.Now() + } + + r.rangeStr.store(r.mu.replicaID, desc) + r.mu.state.Desc = desc +} diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index 672b98588201..2a38a2bfc43b 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/rditer" + "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" "github.com/cockroachdb/cockroach/pkg/util" @@ -790,3 +791,159 @@ func (r *Replica) handleEvalResultRaftMuLocked( r.mu.Unlock() } } + +// proposalResult indicates the result of a proposal. Exactly one of +// Reply, Err and ProposalRetry is set, and it represents the result of +// the proposal. +type proposalResult struct { + Reply *roachpb.BatchResponse + Err *roachpb.Error + ProposalRetry proposalReevaluationReason + Intents []result.IntentsWithArg + EndTxns []result.EndTxnIntents +} + +// evaluateProposal generates a Result from the given request by +// evaluating it, returning both state which is held only on the +// proposer and that which is to be replicated through Raft. The +// return value is ready to be inserted into Replica's proposal map +// and subsequently passed to submitProposalLocked. +// +// The method also returns a flag indicating if the request needs to +// be proposed through Raft and replicated. This flag will be false +// either if the request was a no-op or if it hit an error. In this +// case, the result can be sent directly back to the client without +// going through Raft, but while still handling LocalEvalResult. +// +// Replica.mu must not be held. +func (r *Replica) evaluateProposal( + ctx context.Context, idKey storagebase.CmdIDKey, ba roachpb.BatchRequest, spans *spanset.SpanSet, +) (*result.Result, bool, *roachpb.Error) { + if ba.Timestamp == (hlc.Timestamp{}) { + return nil, false, roachpb.NewErrorf("can't propose Raft command with zero timestamp") + } + + // Evaluate the commands. If this returns without an error, the batch should + // be committed. Note that we don't hold any locks at this point. This is + // important since evaluating a proposal is expensive. + // TODO(tschottdorf): absorb all returned values in `res` below this point + // in the call stack as well. + batch, ms, br, res, pErr := r.evaluateWriteBatch(ctx, idKey, ba, spans) + + // Note: reusing the proposer's batch when applying the command on the + // proposer was explored as an optimization but resulted in no performance + // benefit. + defer batch.Close() + + if pErr != nil { + pErr = r.maybeSetCorrupt(ctx, pErr) + + txn := pErr.GetTxn() + if txn != nil && ba.Txn == nil { + log.Fatalf(ctx, "error had a txn but batch is non-transactional. Err txn: %s", txn) + } + + // Failed proposals can't have any Result except for what's + // whitelisted here. + intents := res.Local.DetachIntents() + endTxns := res.Local.DetachEndTxns(true /* alwaysOnly */) + res.Local = result.LocalResult{ + Intents: &intents, + EndTxns: &endTxns, + Metrics: res.Local.Metrics, + } + res.Replicated.Reset() + return &res, false /* needConsensus */, pErr + } + + // Set the local reply, which is held only on the proposing replica and is + // returned to the client after the proposal completes, or immediately if + // replication is not necessary. + res.Local.Reply = br + + // needConsensus determines if the result needs to be replicated and + // proposed through Raft. This is necessary if at least one of the + // following conditions is true: + // 1. the request created a non-empty write batch. + // 2. the request had an impact on the MVCCStats. NB: this is possible + // even with an empty write batch when stats are recomputed. + // 3. the request has replicated side-effects. + // 4. the cluster is in "clockless" mode, in which case consensus is + // used to enforce a linearization of all reads and writes. + needConsensus := !batch.Empty() || + ms != (enginepb.MVCCStats{}) || + !res.Replicated.Equal(storagepb.ReplicatedEvalResult{}) || + r.store.Clock().MaxOffset() == timeutil.ClocklessMaxOffset + + if needConsensus { + // Set the proposal's WriteBatch, which is the serialized representation of + // the proposals effect on RocksDB. + res.WriteBatch = &storagepb.WriteBatch{ + Data: batch.Repr(), + } + + // Set the proposal's replicated result, which contains metadata and + // side-effects that are to be replicated to all replicas. + res.Replicated.IsLeaseRequest = ba.IsLeaseRequest() + res.Replicated.Timestamp = ba.Timestamp + if r.store.cfg.Settings.Version.IsActive(cluster.VersionMVCCNetworkStats) { + res.Replicated.Delta = ms.ToStatsDelta() + } else { + res.Replicated.DeprecatedDelta = &ms + } + // If the RangeAppliedState key is not being used and the cluster version is + // high enough to guarantee that all current and future binaries will + // understand the key, we send the migration flag through Raft. Because + // there is a delay between command proposal and application, we may end up + // setting this migration flag multiple times. This is ok, because the + // migration is idempotent. + // TODO(nvanbenschoten): This will be baked in to 2.1, so it can be removed + // in the 2.2 release. + r.mu.RLock() + usingAppliedStateKey := r.mu.state.UsingAppliedStateKey + r.mu.RUnlock() + if !usingAppliedStateKey && + r.ClusterSettings().Version.IsMinSupported(cluster.VersionRangeAppliedStateKey) { + if res.Replicated.State == nil { + res.Replicated.State = &storagepb.ReplicaState{} + } + res.Replicated.State.UsingAppliedStateKey = true + } + } + + return &res, needConsensus, nil +} + +// requestToProposal converts a BatchRequest into a ProposalData, by +// evaluating it. The returned ProposalData is partially valid even +// on a non-nil *roachpb.Error and should be proposed through Raft +// if ProposalData.command is non-nil. +func (r *Replica) requestToProposal( + ctx context.Context, + idKey storagebase.CmdIDKey, + ba roachpb.BatchRequest, + endCmds *endCmds, + spans *spanset.SpanSet, +) (*ProposalData, *roachpb.Error) { + res, needConsensus, pErr := r.evaluateProposal(ctx, idKey, ba, spans) + + // Fill out the results even if pErr != nil; we'll return the error below. + proposal := &ProposalData{ + ctx: ctx, + idKey: idKey, + endCmds: endCmds, + doneCh: make(chan proposalResult, 1), + Local: &res.Local, + Request: &ba, + } + + if needConsensus { + proposal.command = &storagepb.RaftCommand{ + ReplicatedEvalResult: res.Replicated, + WriteBatch: res.WriteBatch, + LogicalOpLog: res.LogicalOpLog, + } + } + + return proposal, pErr +} diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index e367db14a5e8..e796381eeddc 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -1441,6 +1441,232 @@ func (r *Replica) checkForcedErrLocked( return leaseIndex, proposalNoReevaluation, nil } +type snapTruncationInfo struct { + index uint64 + deadline time.Time +} + +func (r *Replica) addSnapshotLogTruncationConstraintLocked( + ctx context.Context, snapUUID uuid.UUID, index uint64, +) { + if r.mu.snapshotLogTruncationConstraints == nil { + r.mu.snapshotLogTruncationConstraints = make(map[uuid.UUID]snapTruncationInfo) + } + item, ok := r.mu.snapshotLogTruncationConstraints[snapUUID] + if ok { + // Uh-oh, there's either a programming error (resulting in the same snapshot + // fed into this method twice) or a UUID collision. We discard the update + // (which is benign) but log it loudly. If the index is the same, it's + // likely the former, otherwise the latter. + log.Warningf(ctx, "UUID collision at %s for %+v (index %d)", snapUUID, item, index) + return + } + + r.mu.snapshotLogTruncationConstraints[snapUUID] = snapTruncationInfo{index: index} +} + +func (r *Replica) completeSnapshotLogTruncationConstraint( + ctx context.Context, snapUUID uuid.UUID, now time.Time, +) { + deadline := now.Add(raftLogQueuePendingSnapshotGracePeriod) + + r.mu.Lock() + defer r.mu.Unlock() + item, ok := r.mu.snapshotLogTruncationConstraints[snapUUID] + if !ok { + // UUID collision while adding the snapshot in originally. Nothing + // else to do. + return + } + + item.deadline = deadline + r.mu.snapshotLogTruncationConstraints[snapUUID] = item +} + +func (r *Replica) getAndGCSnapshotLogTruncationConstraintsLocked( + now time.Time, +) (minSnapIndex uint64) { + for snapUUID, item := range r.mu.snapshotLogTruncationConstraints { + if item.deadline != (time.Time{}) && item.deadline.Before(now) { + // The snapshot has finished and its grace period has passed. + // Ignore it when making truncation decisions. + delete(r.mu.snapshotLogTruncationConstraints, snapUUID) + continue + } + if minSnapIndex == 0 || minSnapIndex > item.index { + minSnapIndex = item.index + } + } + if len(r.mu.snapshotLogTruncationConstraints) == 0 { + // Save a little bit of memory. + r.mu.snapshotLogTruncationConstraints = nil + } + return minSnapIndex +} + +func isRaftLeader(raftStatus *raft.Status) bool { + return raftStatus != nil && raftStatus.SoftState.RaftState == raft.StateLeader +} + +// HasRaftLeader returns true if the raft group has a raft leader currently. +func HasRaftLeader(raftStatus *raft.Status) bool { + return raftStatus != nil && raftStatus.SoftState.Lead != 0 +} + +// pendingCmdSlice sorts by increasing MaxLeaseIndex. +type pendingCmdSlice []*ProposalData + +func (s pendingCmdSlice) Len() int { return len(s) } +func (s pendingCmdSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s pendingCmdSlice) Less(i, j int) bool { + return s[i].command.MaxLeaseIndex < s[j].command.MaxLeaseIndex +} + +// withRaftGroupLocked calls the supplied function with the (lazily +// initialized) Raft group. The supplied function should return true for the +// unquiesceAndWakeLeader argument if the replica should be unquiesced (and the +// leader awoken). See handleRaftReady for an instance of where this value +// varies. +// +// Requires that Replica.mu is held. Also requires that Replica.raftMu is held +// if either the caller can't guarantee that r.mu.internalRaftGroup != nil or +// the provided function requires Replica.raftMu. +func (r *Replica) withRaftGroupLocked( + mayCampaignOnWake bool, f func(r *raft.RawNode) (unquiesceAndWakeLeader bool, _ error), +) error { + if r.mu.destroyStatus.RemovedOrCorrupt() { + // Silently ignore all operations on destroyed replicas. We can't return an + // error here as all errors returned from this method are considered fatal. + return nil + } + + if r.mu.replicaID == 0 { + // The replica's raft group has not yet been configured (i.e. the replica + // was created from a preemptive snapshot). + return nil + } + + if r.mu.internalRaftGroup == nil { + r.raftMu.Mutex.AssertHeld() + + ctx := r.AnnotateCtx(context.TODO()) + raftGroup, err := raft.NewRawNode(newRaftConfig( + raft.Storage((*replicaRaftStorage)(r)), + uint64(r.mu.replicaID), + r.mu.state.RaftAppliedIndex, + r.store.cfg, + &raftLogger{ctx: ctx}, + ), nil) + if err != nil { + return err + } + r.mu.internalRaftGroup = raftGroup + + if mayCampaignOnWake { + r.maybeCampaignOnWakeLocked(ctx) + } + } + + // This wrapper function is a hack to add range IDs to stack traces + // using the same pattern as Replica.sendWithRangeID. + unquiesce, err := func(rangeID roachpb.RangeID, raftGroup *raft.RawNode) (bool, error) { + return f(raftGroup) + }(r.RangeID, r.mu.internalRaftGroup) + if unquiesce { + r.unquiesceAndWakeLeaderLocked() + } + return err +} + +// withRaftGroup calls the supplied function with the (lazily initialized) +// Raft group. It acquires and releases the Replica lock, so r.mu must not be +// held (or acquired by the supplied function). +// +// If mayCampaignOnWake is true, the replica may initiate a raft +// election if it was previously in a dormant state. Most callers +// should set this to true, because the prevote feature minimizes the +// disruption from unnecessary elections. The exception is that we +// should not initiate an election while handling incoming raft +// messages (which may include MsgVotes from an election in progress, +// and this election would be disrupted if we started our own). +// +// Has the same requirement for Replica.raftMu as withRaftGroupLocked. +func (r *Replica) withRaftGroup( + mayCampaignOnWake bool, f func(r *raft.RawNode) (unquiesceAndWakeLeader bool, _ error), +) error { + r.mu.Lock() + defer r.mu.Unlock() + return r.withRaftGroupLocked(mayCampaignOnWake, f) +} + +func shouldCampaignOnWake( + leaseStatus storagepb.LeaseStatus, + lease roachpb.Lease, + storeID roachpb.StoreID, + raftStatus raft.Status, +) bool { + // When waking up a range, campaign unless we know that another + // node holds a valid lease (this is most important after a split, + // when all replicas create their raft groups at about the same + // time, with a lease pre-assigned to one of them). Note that + // thanks to PreVote, unnecessary campaigns are not disruptive so + // we should err on the side of campaigining here. + anotherOwnsLease := leaseStatus.State == storagepb.LeaseState_VALID && !lease.OwnedBy(storeID) + + // If we're already campaigning or know who the leader is, don't + // start a new term. + noLeader := raftStatus.RaftState == raft.StateFollower && raftStatus.Lead == 0 + return !anotherOwnsLease && noLeader +} + +// maybeCampaignOnWakeLocked is called when the range wakes from a +// dormant state (either the initial "raftGroup == nil" state or after +// being quiescent) and campaigns for raft leadership if appropriate. +func (r *Replica) maybeCampaignOnWakeLocked(ctx context.Context) { + // Raft panics if a node that is not currently a member of the + // group tries to campaign. That happens primarily when we apply + // preemptive snapshots. + if _, currentMember := r.mu.state.Desc.GetReplicaDescriptorByID(r.mu.replicaID); !currentMember { + return + } + + leaseStatus := r.leaseStatus(*r.mu.state.Lease, r.store.Clock().Now(), r.mu.minLeaseProposedTS) + raftStatus := r.mu.internalRaftGroup.Status() + if shouldCampaignOnWake(leaseStatus, *r.mu.state.Lease, r.store.StoreID(), *raftStatus) { + log.VEventf(ctx, 3, "campaigning") + if err := r.mu.internalRaftGroup.Campaign(); err != nil { + log.VEventf(ctx, 1, "failed to campaign: %s", err) + } + } +} + +// a lastUpdateTimesMap is maintained on the Raft leader to keep track of the +// last communication received from followers, which in turn informs the quota +// pool and log truncations. +type lastUpdateTimesMap map[roachpb.ReplicaID]time.Time + +func (m lastUpdateTimesMap) update(replicaID roachpb.ReplicaID, now time.Time) { + if m == nil { + return + } + m[replicaID] = now +} + +// isFollowerActive returns whether the specified follower has made +// communication with the leader in the last MaxQuotaReplicaLivenessDuration. +func (m lastUpdateTimesMap) isFollowerActive( + ctx context.Context, replicaID roachpb.ReplicaID, now time.Time, +) bool { + lastUpdateTime, ok := m[replicaID] + if !ok { + // If the follower has no entry in lastUpdateTimes, it has not been + // updated since r became the leader (at which point all then-existing + // replicas were updated). + return false + } + return now.Sub(lastUpdateTime) <= MaxQuotaReplicaLivenessDuration +} + // processRaftCommand processes a raft command by unpacking the // command struct to get args and reply and then applying the command // to the state machine via applyRaftCommand(). The result is sent on @@ -2090,84 +2316,3 @@ func (r *Replica) applyRaftCommand( r.store.metrics.RaftCommandCommitLatency.RecordValue(elapsed.Nanoseconds()) return deltaStats, nil } - -type snapTruncationInfo struct { - index uint64 - deadline time.Time -} - -func (r *Replica) addSnapshotLogTruncationConstraintLocked( - ctx context.Context, snapUUID uuid.UUID, index uint64, -) { - if r.mu.snapshotLogTruncationConstraints == nil { - r.mu.snapshotLogTruncationConstraints = make(map[uuid.UUID]snapTruncationInfo) - } - item, ok := r.mu.snapshotLogTruncationConstraints[snapUUID] - if ok { - // Uh-oh, there's either a programming error (resulting in the same snapshot - // fed into this method twice) or a UUID collision. We discard the update - // (which is benign) but log it loudly. If the index is the same, it's - // likely the former, otherwise the latter. - log.Warningf(ctx, "UUID collision at %s for %+v (index %d)", snapUUID, item, index) - return - } - - r.mu.snapshotLogTruncationConstraints[snapUUID] = snapTruncationInfo{index: index} -} - -func (r *Replica) completeSnapshotLogTruncationConstraint( - ctx context.Context, snapUUID uuid.UUID, now time.Time, -) { - deadline := now.Add(raftLogQueuePendingSnapshotGracePeriod) - - r.mu.Lock() - defer r.mu.Unlock() - item, ok := r.mu.snapshotLogTruncationConstraints[snapUUID] - if !ok { - // UUID collision while adding the snapshot in originally. Nothing - // else to do. - return - } - - item.deadline = deadline - r.mu.snapshotLogTruncationConstraints[snapUUID] = item -} - -func (r *Replica) getAndGCSnapshotLogTruncationConstraintsLocked( - now time.Time, -) (minSnapIndex uint64) { - for snapUUID, item := range r.mu.snapshotLogTruncationConstraints { - if item.deadline != (time.Time{}) && item.deadline.Before(now) { - // The snapshot has finished and its grace period has passed. - // Ignore it when making truncation decisions. - delete(r.mu.snapshotLogTruncationConstraints, snapUUID) - continue - } - if minSnapIndex == 0 || minSnapIndex > item.index { - minSnapIndex = item.index - } - } - if len(r.mu.snapshotLogTruncationConstraints) == 0 { - // Save a little bit of memory. - r.mu.snapshotLogTruncationConstraints = nil - } - return minSnapIndex -} - -func isRaftLeader(raftStatus *raft.Status) bool { - return raftStatus != nil && raftStatus.SoftState.RaftState == raft.StateLeader -} - -// HasRaftLeader returns true if the raft group has a raft leader currently. -func HasRaftLeader(raftStatus *raft.Status) bool { - return raftStatus != nil && raftStatus.SoftState.Lead != 0 -} - -// pendingCmdSlice sorts by increasing MaxLeaseIndex. -type pendingCmdSlice []*ProposalData - -func (s pendingCmdSlice) Len() int { return len(s) } -func (s pendingCmdSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -func (s pendingCmdSlice) Less(i, j int) bool { - return s[i].command.MaxLeaseIndex < s[j].command.MaxLeaseIndex -} diff --git a/pkg/storage/replica_range_lease.go b/pkg/storage/replica_range_lease.go index 18d3dcd45021..4f78b6d3df2b 100644 --- a/pkg/storage/replica_range_lease.go +++ b/pkg/storage/replica_range_lease.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" @@ -652,3 +653,320 @@ func (r *Replica) AdminTransferLease(ctx context.Context, target roachpb.StoreID } } } + +// GetLease returns the lease and, if available, the proposed next lease. +func (r *Replica) GetLease() (roachpb.Lease, roachpb.Lease) { + r.mu.RLock() + defer r.mu.RUnlock() + return r.getLeaseRLocked() +} + +func (r *Replica) getLeaseRLocked() (roachpb.Lease, roachpb.Lease) { + if nextLease, ok := r.mu.pendingLeaseRequest.RequestPending(); ok { + return *r.mu.state.Lease, nextLease + } + return *r.mu.state.Lease, roachpb.Lease{} +} + +// OwnsValidLease returns whether this replica is the current valid +// leaseholder. Note that this method does not check to see if a transfer is +// pending, but returns the status of the current lease and ownership at the +// specified point in time. +func (r *Replica) OwnsValidLease(ts hlc.Timestamp) bool { + r.mu.RLock() + defer r.mu.RUnlock() + return r.ownsValidLeaseRLocked(ts) +} + +func (r *Replica) ownsValidLeaseRLocked(ts hlc.Timestamp) bool { + return r.mu.state.Lease.OwnedBy(r.store.StoreID()) && + r.leaseStatus(*r.mu.state.Lease, ts, r.mu.minLeaseProposedTS).State == storagepb.LeaseState_VALID +} + +// IsLeaseValid returns true if the replica's lease is owned by this +// replica and is valid (not expired, not in stasis). +func (r *Replica) IsLeaseValid(lease roachpb.Lease, ts hlc.Timestamp) bool { + r.mu.RLock() + defer r.mu.RUnlock() + return r.isLeaseValidRLocked(lease, ts) +} + +func (r *Replica) isLeaseValidRLocked(lease roachpb.Lease, ts hlc.Timestamp) bool { + return r.leaseStatus(lease, ts, r.mu.minLeaseProposedTS).State == storagepb.LeaseState_VALID +} + +// newNotLeaseHolderError returns a NotLeaseHolderError initialized with the +// replica for the holder (if any) of the given lease. +// +// Note that this error can be generated on the Raft processing goroutine, so +// its output should be completely determined by its parameters. +func newNotLeaseHolderError( + l *roachpb.Lease, proposerStoreID roachpb.StoreID, rangeDesc *roachpb.RangeDescriptor, +) *roachpb.NotLeaseHolderError { + err := &roachpb.NotLeaseHolderError{ + RangeID: rangeDesc.RangeID, + } + err.Replica, _ = rangeDesc.GetReplicaDescriptor(proposerStoreID) + if l != nil { + // Normally, we return the lease-holding Replica here. However, in the + // case in which a leader removes itself, we want the followers to + // avoid handing out a misleading clue (which in itself shouldn't be + // overly disruptive as the lease would expire and then this method + // shouldn't be called for it any more, but at the very least it + // could catch tests in a loop, presumably due to manual clocks). + _, stillMember := rangeDesc.GetReplicaDescriptor(l.Replica.StoreID) + if stillMember { + err.LeaseHolder = &l.Replica + err.Lease = l + } + } + return err +} + +// leaseGoodToGo is a fast-path for lease checks which verifies that an +// existing lease is valid and owned by the current store. This method should +// not be called directly. Use redirectOnOrAcquireLease instead. +func (r *Replica) leaseGoodToGo(ctx context.Context) (storagepb.LeaseStatus, bool) { + timestamp := r.store.Clock().Now() + r.mu.RLock() + defer r.mu.RUnlock() + + if r.requiresExpiringLeaseRLocked() { + // Slow-path for expiration-based leases. + return storagepb.LeaseStatus{}, false + } + + status := r.leaseStatus(*r.mu.state.Lease, timestamp, r.mu.minLeaseProposedTS) + if status.State == storagepb.LeaseState_VALID && status.Lease.OwnedBy(r.store.StoreID()) { + // We own the lease... + if repDesc, err := r.getReplicaDescriptorRLocked(); err == nil { + if _, ok := r.mu.pendingLeaseRequest.TransferInProgress(repDesc.ReplicaID); !ok { + // ...and there is no transfer pending. + return status, true + } + } + } + return storagepb.LeaseStatus{}, false +} + +// redirectOnOrAcquireLease checks whether this replica has the lease +// at the current timestamp. If it does, returns success. If another +// replica currently holds the lease, redirects by returning +// NotLeaseHolderError. If the lease is expired, a renewal is +// synchronously requested. Leases are eagerly renewed when a request +// with a timestamp within rangeLeaseRenewalDuration of the lease +// expiration is served. +// +// TODO(spencer): for write commands, don't wait while requesting +// the range lease. If the lease acquisition fails, the write cmd +// will fail as well. If it succeeds, as is likely, then the write +// will not incur latency waiting for the command to complete. +// Reads, however, must wait. +func (r *Replica) redirectOnOrAcquireLease( + ctx context.Context, +) (storagepb.LeaseStatus, *roachpb.Error) { + if status, ok := r.leaseGoodToGo(ctx); ok { + return status, nil + } + + // Loop until the lease is held or the replica ascertains the actual + // lease holder. Returns also on context.Done() (timeout or cancellation). + var status storagepb.LeaseStatus + for attempt := 1; ; attempt++ { + timestamp := r.store.Clock().Now() + llHandle, pErr := func() (*leaseRequestHandle, *roachpb.Error) { + r.mu.Lock() + defer r.mu.Unlock() + + status = r.leaseStatus(*r.mu.state.Lease, timestamp, r.mu.minLeaseProposedTS) + switch status.State { + case storagepb.LeaseState_ERROR: + // Lease state couldn't be determined. + log.VEventf(ctx, 2, "lease state couldn't be determined") + return nil, roachpb.NewError( + newNotLeaseHolderError(nil, r.store.StoreID(), r.mu.state.Desc)) + + case storagepb.LeaseState_VALID, storagepb.LeaseState_STASIS: + if !status.Lease.OwnedBy(r.store.StoreID()) { + _, stillMember := r.mu.state.Desc.GetReplicaDescriptor(status.Lease.Replica.StoreID) + if !stillMember { + // This would be the situation in which the lease holder gets removed when + // holding the lease, or in which a lease request erroneously gets accepted + // for a replica that is not in the replica set. Neither of the two can + // happen in normal usage since appropriate mechanisms have been added: + // + // 1. Only the lease holder (at the time) schedules removal of a replica, + // but the lease can change hands and so the situation in which a follower + // coordinates a replica removal of the (new) lease holder is possible (if + // unlikely) in practice. In this situation, the new lease holder would at + // some point be asked to propose the replica change's EndTransaction to + // Raft. A check has been added that prevents proposals that amount to the + // removal of the proposer's (and hence lease holder's) Replica, preventing + // this scenario. + // + // 2. A lease is accepted for a Replica that has been removed. Without + // precautions, this could happen because lease requests are special in + // that they are the only command that is proposed on a follower (other + // commands may be proposed from followers, but not successfully so). For + // all proposals, processRaftCommand checks that their ProposalLease is + // compatible with the active lease for the log position. For commands + // proposed on the lease holder, the spanlatch manager then serializes + // everything. But lease requests get created on followers based on their + // local state and thus without being sequenced through latching. Thus + // a recently removed follower (unaware of its own removal) could submit + // a proposal for the lease (correctly using as a ProposerLease the last + // active lease), and would receive it given the up-to-date ProposerLease. + // Hence, an extra check is in order: processRaftCommand makes sure that + // lease requests for a replica not in the descriptor are bounced. + // + // However, this is possible if the `cockroach debug + // unsafe-remove-dead-replicas` command has been used, so + // this is just a logged error instead of a fatal + // assertion. + log.Errorf(ctx, "lease %s owned by replica %+v that no longer exists", + status.Lease, status.Lease.Replica) + } + // Otherwise, if the lease is currently held by another replica, redirect + // to the holder. + return nil, roachpb.NewError( + newNotLeaseHolderError(&status.Lease, r.store.StoreID(), r.mu.state.Desc)) + } + // Check that we're not in the process of transferring the lease away. + // If we are transferring the lease away, we can't serve reads or + // propose Raft commands - see comments on TransferLease. + // TODO(andrei): If the lease is being transferred, consider returning a + // new error type so the client backs off until the transfer is + // completed. + repDesc, err := r.getReplicaDescriptorRLocked() + if err != nil { + return nil, roachpb.NewError(err) + } + if transferLease, ok := r.mu.pendingLeaseRequest.TransferInProgress( + repDesc.ReplicaID); ok { + return nil, roachpb.NewError( + newNotLeaseHolderError(&transferLease, r.store.StoreID(), r.mu.state.Desc)) + } + + // If the lease is in stasis, we can't serve requests until we've + // renewed the lease, so we return the handle to block on renewal. + // Otherwise, we don't need to wait for the extension and simply + // ignore the returned handle (whose channel is buffered) and continue. + if status.State == storagepb.LeaseState_STASIS { + return r.requestLeaseLocked(ctx, status), nil + } + + // Extend the lease if this range uses expiration-based + // leases, the lease is in need of renewal, and there's not + // already an extension pending. + _, requestPending := r.mu.pendingLeaseRequest.RequestPending() + if !requestPending && r.requiresExpiringLeaseRLocked() { + renewal := status.Lease.Expiration.Add(-r.store.cfg.RangeLeaseRenewalDuration().Nanoseconds(), 0) + if !timestamp.Less(renewal) { + if log.V(2) { + log.Infof(ctx, "extending lease %s at %s", status.Lease, timestamp) + } + // We had an active lease to begin with, but we want to trigger + // a lease extension. We explicitly ignore the returned handle + // as we won't block on it. + _ = r.requestLeaseLocked(ctx, status) + } + } + + case storagepb.LeaseState_EXPIRED: + // No active lease: Request renewal if a renewal is not already pending. + log.VEventf(ctx, 2, "request range lease (attempt #%d)", attempt) + return r.requestLeaseLocked(ctx, status), nil + + case storagepb.LeaseState_PROSCRIBED: + // Lease proposed timestamp is earlier than the min proposed + // timestamp limit this replica must observe. If this store + // owns the lease, re-request. Otherwise, redirect. + if status.Lease.OwnedBy(r.store.StoreID()) { + log.VEventf(ctx, 2, "request range lease (attempt #%d)", attempt) + return r.requestLeaseLocked(ctx, status), nil + } + // If lease is currently held by another, redirect to holder. + return nil, roachpb.NewError( + newNotLeaseHolderError(&status.Lease, r.store.StoreID(), r.mu.state.Desc)) + } + + // Return a nil handle to signal that we have a valid lease. + return nil, nil + }() + if pErr != nil { + return storagepb.LeaseStatus{}, pErr + } + if llHandle == nil { + // We own a valid lease. + return status, nil + } + + // Wait for the range lease to finish, or the context to expire. + pErr = func() (pErr *roachpb.Error) { + slowTimer := timeutil.NewTimer() + defer slowTimer.Stop() + slowTimer.Reset(base.SlowRequestThreshold) + tBegin := timeutil.Now() + for { + select { + case pErr = <-llHandle.C(): + if pErr != nil { + switch tErr := pErr.GetDetail().(type) { + case *roachpb.AmbiguousResultError: + // This can happen if the RequestLease command we sent has been + // applied locally through a snapshot: the RequestLeaseRequest + // cannot be reproposed so we get this ambiguity. + // We'll just loop around. + return nil + case *roachpb.LeaseRejectedError: + if tErr.Existing.OwnedBy(r.store.StoreID()) { + // The RequestLease command we sent was rejected because another + // lease was applied in the meantime, but we own that other + // lease. So, loop until the current node becomes aware that + // it's the leaseholder. + return nil + } + + // Getting a LeaseRejectedError back means someone else got there + // first, or the lease request was somehow invalid due to a concurrent + // change. That concurrent change could have been that this replica was + // removed (see processRaftCommand), so check for that case before + // falling back to a NotLeaseHolderError. + var err error + if _, descErr := r.GetReplicaDescriptor(); descErr != nil { + err = descErr + } else if lease, _ := r.GetLease(); !r.IsLeaseValid(lease, r.store.Clock().Now()) { + err = newNotLeaseHolderError(nil, r.store.StoreID(), r.Desc()) + } else { + err = newNotLeaseHolderError(&lease, r.store.StoreID(), r.Desc()) + } + pErr = roachpb.NewError(err) + } + return pErr + } + log.Eventf(ctx, "lease acquisition succeeded: %+v", status.Lease) + return nil + case <-slowTimer.C: + slowTimer.Read = true + log.Warningf(ctx, "have been waiting %s attempting to acquire lease", + base.SlowRequestThreshold) + r.store.metrics.SlowLeaseRequests.Inc(1) + defer func() { + r.store.metrics.SlowLeaseRequests.Dec(1) + log.Infof(ctx, "slow lease acquisition finished after %s with error %v after %d attempts", timeutil.Since(tBegin), pErr, attempt) + }() + case <-ctx.Done(): + llHandle.Cancel() + log.VErrEventf(ctx, 2, "lease acquisition failed: %s", ctx.Err()) + return roachpb.NewError(newNotLeaseHolderError(nil, r.store.StoreID(), r.Desc())) + case <-r.store.Stopper().ShouldStop(): + llHandle.Cancel() + return roachpb.NewError(newNotLeaseHolderError(nil, r.store.StoreID(), r.Desc())) + } + } + }() + if pErr != nil { + return storagepb.LeaseStatus{}, pErr + } + } +}