From b8fc30a4213f5a1cf79fd6f1b64fe2412498e840 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Tue, 9 Apr 2019 14:47:37 +0200 Subject: [PATCH] storage: delay application of preemptive snapshots Currently, in-memory Replica objects can end up having a replicaID zero. Roughly speaking, this is always the case when a Replica's range descriptor does not contain the Replica's store, though sometimes we do have a replicaID taken from incoming Raft messages (which then won't survive across a restart). We end up in this unnatural state mostly due to preemptive snapshots, which are a snapshot of the Range state before adding a certain replica, sent to the store that will house that replica once the configuration change to add it has completed. The range descriptor in the snapshot cannot yet assign the Replica a proper replicaID because none has been allocated yet (and this allocation has to be done in the replica change transaction, which hasn't started yet). Even when the configuration change completes and the leader starts "catching up" the preemptive snapshot and informs it of the replicaID, it will take a few moments until the Replica catches up to the log entry that actually updates the descriptor. If the node reboots before that log entry is durably applied, the replicaID will "restart" at zero until the leader contacts the Replica again. This suggests that preemptive snapshots introduce fundamental complexity which we'd like to avoid - as long as we use preemptive snapshots there will not be sanity in this department. This PR introduces a mechanism which delays the application of preemptive snapshots so that we apply them only when the first request *after* the completed configuration change comes in (at which point a replicaID is present). Superficially, this seems to solve the above problem (since the Replica will only be instantiated the moment a replicaID is known), though it doesn't do so across restarts. However, if we synchronously persisted (not done in this PR) the replicaID from incoming Raft messages whenever it changed, it seems that we should always be able to assign a replicaID when creating a Replica, even when dealing with descriptors that don't contain the replica itself (since there would've been a Raft message with a replicaID at some point, and we persist that). This roughly corresponds to persisting `Replica.lastToReplica`. We ultimately want to switch to learner replicas instead of preemptive snapshots. Learner replicas have the advantage that they are always represented in the replica descriptor, and so the snapshot that initializes them will be a proper Raft snapshot containing a descriptor containing the learner Replica itself. However, it's clear that we need to continue supporting preemptive snapshots in 19.2 due to the need to support mixed 19.1/19.2 clusters. This PR in conjunction with persisting the replicaID (and auxiliary work, for example on the split lock which currently also creates a replica with replicaID zero and which we know [has bugs]) should allow us to remove replicaID zero from the code base without waiting out the 19.1 release cycle. [has bugs]: https://github.com/cockroachdb/cockroach/issues/21146 Release note: None --- pkg/storage/client_raft_test.go | 29 +-- pkg/storage/metrics.go | 42 ++-- pkg/storage/raft.go | 8 - pkg/storage/replica_command.go | 9 +- pkg/storage/replica_raft.go | 13 +- pkg/storage/replica_raftstorage.go | 22 +- pkg/storage/store.go | 332 ++++++++++++++--------------- pkg/storage/store_snapshot.go | 141 ++++++++++-- pkg/storage/store_snapshot_test.go | 9 +- pkg/storage/store_test.go | 6 +- pkg/storage/txnwait/txnqueue.go | 2 +- 11 files changed, 359 insertions(+), 254 deletions(-) diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index d39d1d2519d0..29f666815fdf 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -1422,8 +1422,8 @@ func TestStoreRangeUpReplicate(t *testing.T) { if normalApplied != 0 { t.Fatalf("expected 0 normal snapshots, but found %d", normalApplied) } - if generated != preemptiveApplied { - t.Fatalf("expected %d preemptive snapshots, but found %d", generated, preemptiveApplied) + if preemptiveApplied == 0 { + t.Fatalf("expected nonzero preemptive snapshots, but found none") } } @@ -1472,11 +1472,6 @@ func TestUnreplicateFirstRange(t *testing.T) { // TestChangeReplicasDescriptorInvariant tests that a replica change aborts if // another change has been made to the RangeDescriptor since it was initiated. -// -// TODO(tschottdorf): If this test is flaky because the snapshot count does not -// increase, it's likely because with proposer-evaluated KV, less gets proposed -// and so sometimes Raft discards the preemptive snapshot (though we count that -// case in stats already) or doesn't produce a Ready. func TestChangeReplicasDescriptorInvariant(t *testing.T) { defer leaktest.AfterTest(t)() mtc := &multiTestContext{ @@ -1531,16 +1526,15 @@ func TestChangeReplicasDescriptorInvariant(t *testing.T) { t.Fatalf("got unexpected error: %v", err) } - testutils.SucceedsSoon(t, func() error { - after := mtc.stores[2].Metrics().RangeSnapshotsPreemptiveApplied.Count() - // The failed ChangeReplicas call should have applied a preemptive snapshot. - if after != before+1 { - return errors.Errorf( - "ChangeReplicas call should have applied a preemptive snapshot, before %d after %d", - before, after) - } - return nil - }) + after := mtc.stores[2].Metrics().RangeSnapshotsPreemptiveApplied.Count() + // The failed ChangeReplicas call should not have applied a preemptive + // snapshot. It should've sent it to the target replica, but since we're + // delaying preemptive snapshots, it would never have been applied. + if after != before { + t.Fatalf( + "ChangeReplicas call should not have applied a preemptive snapshot, before %d after %d", + before, after) + } before = mtc.stores[2].Metrics().RangeSnapshotsPreemptiveApplied.Count() // Add to third store with fresh descriptor. @@ -1550,7 +1544,6 @@ func TestChangeReplicasDescriptorInvariant(t *testing.T) { testutils.SucceedsSoon(t, func() error { after := mtc.stores[2].Metrics().RangeSnapshotsPreemptiveApplied.Count() - // The failed ChangeReplicas call should have applied a preemptive snapshot. if after != before+1 { return errors.Errorf( "ChangeReplicas call should have applied a preemptive snapshot, before %d after %d", diff --git a/pkg/storage/metrics.go b/pkg/storage/metrics.go index 2a1aed7fa9ea..51f9aeacdc4b 100644 --- a/pkg/storage/metrics.go +++ b/pkg/storage/metrics.go @@ -407,10 +407,16 @@ var ( } metaRangeSnapshotsPreemptiveApplied = metric.Metadata{ Name: "range.snapshots.preemptive-applied", - Help: "Number of applied pre-emptive snapshots", + Help: "Number of applied (delayed or undelayed) pre-emptive snapshots", Measurement: "Snapshots", Unit: metric.Unit_COUNT, } + metaRangeSnapshotsPreemptiveDelayedBytes = metric.Metadata{ + Name: "range.snapshots.preemptive-delayed-bytes", + Help: "Bytes held in-memory in delayed preemptive snapshots", + Measurement: "Memory", + Unit: metric.Unit_COUNT, + } metaRangeRaftLeaderTransfers = metric.Metadata{ Name: "range.raftleadertransfers", Help: "Number of raft leader transfers", @@ -1040,14 +1046,15 @@ type StoreMetrics struct { // accordingly. // Range event metrics. - RangeSplits *metric.Counter - RangeMerges *metric.Counter - RangeAdds *metric.Counter - RangeRemoves *metric.Counter - RangeSnapshotsGenerated *metric.Counter - RangeSnapshotsNormalApplied *metric.Counter - RangeSnapshotsPreemptiveApplied *metric.Counter - RangeRaftLeaderTransfers *metric.Counter + RangeSplits *metric.Counter + RangeMerges *metric.Counter + RangeAdds *metric.Counter + RangeRemoves *metric.Counter + RangeSnapshotsGenerated *metric.Counter + RangeSnapshotsNormalApplied *metric.Counter + RangeSnapshotsPreemptiveApplied *metric.Counter + RangeSnapshotsPreemptiveDelayedBytes *metric.Gauge + RangeRaftLeaderTransfers *metric.Counter // Raft processing metrics. RaftTicks *metric.Counter @@ -1249,14 +1256,15 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { RdbNumSSTables: metric.NewGauge(metaRdbNumSSTables), // Range event metrics. - RangeSplits: metric.NewCounter(metaRangeSplits), - RangeMerges: metric.NewCounter(metaRangeMerges), - RangeAdds: metric.NewCounter(metaRangeAdds), - RangeRemoves: metric.NewCounter(metaRangeRemoves), - RangeSnapshotsGenerated: metric.NewCounter(metaRangeSnapshotsGenerated), - RangeSnapshotsNormalApplied: metric.NewCounter(metaRangeSnapshotsNormalApplied), - RangeSnapshotsPreemptiveApplied: metric.NewCounter(metaRangeSnapshotsPreemptiveApplied), - RangeRaftLeaderTransfers: metric.NewCounter(metaRangeRaftLeaderTransfers), + RangeSplits: metric.NewCounter(metaRangeSplits), + RangeMerges: metric.NewCounter(metaRangeMerges), + RangeAdds: metric.NewCounter(metaRangeAdds), + RangeRemoves: metric.NewCounter(metaRangeRemoves), + RangeSnapshotsGenerated: metric.NewCounter(metaRangeSnapshotsGenerated), + RangeSnapshotsNormalApplied: metric.NewCounter(metaRangeSnapshotsNormalApplied), + RangeSnapshotsPreemptiveApplied: metric.NewCounter(metaRangeSnapshotsPreemptiveApplied), + RangeSnapshotsPreemptiveDelayedBytes: metric.NewGauge(metaRangeSnapshotsPreemptiveDelayedBytes), + RangeRaftLeaderTransfers: metric.NewCounter(metaRangeRaftLeaderTransfers), // Raft processing metrics. RaftTicks: metric.NewCounter(metaRaftTicks), diff --git a/pkg/storage/raft.go b/pkg/storage/raft.go index 6de017ea76d9..2bfb181a8e9c 100644 --- a/pkg/storage/raft.go +++ b/pkg/storage/raft.go @@ -179,14 +179,6 @@ func raftEntryFormatter(data []byte) string { return fmt.Sprintf("[%x] [%d]", commandID, len(data)) } -// IsPreemptive returns whether this is a preemptive snapshot or a Raft -// snapshot. -func (h *SnapshotRequest_Header) IsPreemptive() bool { - // Preemptive snapshots are addressed to replica ID 0. No other requests to - // replica ID 0 are allowed. - return h.RaftMessageRequest.ToReplica.ReplicaID == 0 -} - // traceEntries records the provided event for all proposals corresponding // to the entries contained in ents. The vmodule level for raft must be at // least 1. diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index c5e17476e1de..6feac3a96a87 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -934,9 +934,12 @@ func (r *Replica) sendSnapshot( FromReplica: fromRepDesc, ToReplica: repDesc, Message: raftpb.Message{ - Type: raftpb.MsgSnap, - To: uint64(repDesc.ReplicaID), - From: uint64(fromRepDesc.ReplicaID), + Type: raftpb.MsgSnap, + To: uint64(repDesc.ReplicaID), + From: uint64(fromRepDesc.ReplicaID), + // TODO(tbg): is it kosher to pick a random Term from a RaftStatus + // and to stick that in a message? Should this term match that of + // the HardState in the snapshot from which the data was taken? Term: status.Term, Snapshot: snap.RaftSnap, }, diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 1ec049822df8..4c267842246b 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -2122,9 +2122,20 @@ func (r *Replica) maybeAcquireSnapshotMergeLock( // left-hand neighbor has no pending merges to apply. And that RHS replica // could not have been further split or merged, as it never processes another // command after the merge commits. + // + // Note that this reasoning does not extend to preemptive snapshots. If a range + // starts out as [a,b), then merges [b,d), creates a preemptive snapshot S, + // splits into [a,b)', [b,c), and [b,d), then a node not involved in any of + // the previous activity could receive an up-to-date replica of [b,c), and + // then observe the preemptive snapshot S which must not be applied, for [b,c) + // is "more recent" and must not be destroyed. If the node has picked + // up a replica of `[a,b)'` by that time, the snapshot will be discarded by + // Raft. If it has a replica of `[a,b)` (i.e. before the merge), the snapshot + // again must be refused, for + endKey := r.Desc().EndKey if endKey == nil { - // The existing replica is unitialized, in which case we've already + // The existing replica is uninitialized, in which case we've already // installed a placeholder for snapshot's keyspace. No merge lock needed. return nil, func() {} } diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 65bd758c584a..5c4617a40aa0 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -501,6 +501,11 @@ type IncomingSnapshot struct { snapType string } +// IsPreemptive returns whether this is a delayed or undelayed preemptive snapshot. +func (is *IncomingSnapshot) IsPreemptive() bool { + return is.snapType != snapTypeRaft +} + // snapshot creates an OutgoingSnapshot containing a rocksdb snapshot for the // given range. Note that snapshot() is called without Replica.raftMu held. func snapshot( @@ -667,8 +672,9 @@ func (r *Replica) updateRangeInfo(desc *roachpb.RangeDescriptor) error { } const ( - snapTypeRaft = "Raft" - snapTypePreemptive = "preemptive" + snapTypeRaft = "Raft" + snapTypePreemptive = "preemptive" + snapTypeLocalDelayedPreemptive = "delayed preemptive" ) func clearRangeData( @@ -767,10 +773,18 @@ func (r *Replica) applySnapshot( snapType := inSnap.snapType defer func() { if err == nil { - if snapType == snapTypeRaft { + switch snapType { + case snapTypeRaft: r.store.metrics.RangeSnapshotsNormalApplied.Inc(1) - } else { + case snapTypePreemptive: + log.Fatalf(ctx, "unexpectedly asked to apply an undelayed preemptive snapshot") + case snapTypeLocalDelayedPreemptive: + // We treat application of a delayed preemptive snapshot as an + // application of a preemptive snapshot as far as stats are + // concerned. r.store.metrics.RangeSnapshotsPreemptiveApplied.Inc(1) + default: + log.Fatalf(ctx, "unknown snapshot type %s", snapType) } } }() diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 3e751cdd7d9b..bd6121ee60bf 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -587,6 +587,8 @@ type Store struct { roachpb.StoreCapacity } + delayedPreemptiveSnaps delayedPreemptiveSnaps + counts struct { // Number of placeholders removed due to error. removedPlaceholders int32 @@ -1321,7 +1323,25 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { return false, errors.Errorf("found uninitialized RangeDescriptor: %+v", desc) } + // TODO(tbg): we're passing zero here because either + // - the data is really a preemptive snapshot, in which case it's a holdover from before + // delayed preemptive snapshots and we actually just want to delete it right here. + // - or it isn't but the descriptor does not include a replica on this node. + // This can happen if the data started as a delayed preemptive snapshot that + // was turned into a Replica, but the log never caught up to the log index + // at which the descriptor includes this replica and thus its replicaID itself. + // We can't just delete everything if there's probably a nontrivial HardState + // and ack'ed Raft log that we've promised to keep. But we can avoid instantiating the + // Replica; we'll only ever try to instantiate it when we have a replicaID and so the + // problem goes away. The tricky part is that now we can't tell if we're ever going + // to be able to delete this data. The way out is to let the GC queue check even though + // this is not an initialized replica. So we just stash a rangeID in some map on the + // store and the GC queue will make sure that is cleared out, either leaving the data + // around (presumably to be contacted soon by peers) or removing it in case we're not + // a member any more. + // - or we can load the replicaID from the descriptor and not face any problems. rep, err := NewReplica(&desc, s, 0) + if err != nil { return false, err } @@ -3285,8 +3305,14 @@ func (s *Store) HandleRaftUncoalescedRequest( func (s *Store) withReplicaForRequest( ctx context.Context, req *RaftMessageRequest, f func(context.Context, *Replica) *roachpb.Error, ) *roachpb.Error { + // Nothing should ever ask us for a message to replicaID zero. The exception + // are preemptive snapshots but those are intercepted well above this method. + if req.ToReplica.ReplicaID == 0 { + log.Fatalf(ctx, "unexpected request addressed to replicaID zero: %+v", req) + } + // Lazily create the replica. - r, _, err := s.getOrCreateReplica( + r, created, err := s.getOrCreateReplica( ctx, req.RangeID, req.ToReplica.ReplicaID, @@ -3297,7 +3323,41 @@ func (s *Store) withReplicaForRequest( } defer r.raftMu.Unlock() ctx = r.AnnotateCtx(ctx) + // NB: it's important that we call this before applying a delayed preemptive + // snapshot, for otherwise the response created from it will be dropped + // (which should be fine) along with an error message (which is not fine). r.setLastReplicaDescriptors(req) + + if created { + snap, haveInSnap := s.delayedPreemptiveSnaps.getAndRemove(req.RangeID) + + if haveInSnap { + snap.Header.RaftMessageRequest.ToReplica.ReplicaID = req.ToReplica.ReplicaID + snap.IncomingSnapshot.snapType = snapTypeLocalDelayedPreemptive + + if pErr := s.processRaftSnapshotRequestWithReplicaRaftMuLocked( + ctx, &snap.Header, snap.IncomingSnapshot, r, + ); pErr != nil { + // A delayed preemptive snapshot may be rejected at apply time + // due to overlapping another replica's keyspace. We check this + // already when the snapshot transfer is first initiated, so + // actually hitting this path should be rare. + log.Info(ctx, "delayed preemptive snapshot was rejected: %v", pErr) + } + } else { + // The case in which an uninitialized (no data) replica doesn't find + // the snapshot is possible. (The split-snapshot-race). It doesn't + // require a snapshot in the common case because the split trigger + // populates the RHS. + if r.IsInitialized() { + // But if the replica already has data, it couldn't be the RHS + // of a split before the LHS applied the trigger. Such RHS have + // to have blank state. + log.Fatalf(ctx, "unexpectedly no snap found: %+v", req) + } + } + ctx = r.AnnotateCtx(ctx) + } return f(ctx, r) } @@ -3348,192 +3408,88 @@ func (s *Store) processRaftRequestWithReplica( return nil } -// processRaftSnapshotRequest processes the incoming snapshot Raft request on -// the request's specified replica. This snapshot can be preemptive or not. If -// not, the function makes sure to handle any updated Raft Ready state. -func (s *Store) processRaftSnapshotRequest( - ctx context.Context, snapHeader *SnapshotRequest_Header, inSnap IncomingSnapshot, +func (s *Store) processRaftSnapshotRequestWithReplicaRaftMuLocked( + ctx context.Context, snapHeader *SnapshotRequest_Header, inSnap IncomingSnapshot, r *Replica, ) *roachpb.Error { - return s.withReplicaForRequest(ctx, &snapHeader.RaftMessageRequest, func( - ctx context.Context, r *Replica, - ) (pErr *roachpb.Error) { - if snapHeader.RaftMessageRequest.Message.Type != raftpb.MsgSnap { - log.Fatalf(ctx, "expected snapshot: %+v", snapHeader.RaftMessageRequest) - } - - // Check to see if a snapshot can be applied. Snapshots can always be applied - // to initialized replicas. Note that if we add a placeholder we need to - // already be holding Replica.raftMu in order to prevent concurrent - // raft-ready processing of uninitialized replicas. - var addedPlaceholder bool - var removePlaceholder bool - if err := func() error { - s.mu.Lock() - defer s.mu.Unlock() - placeholder, err := s.canApplySnapshotLocked(ctx, snapHeader, true /* authoritative */) - if err != nil { - // If the storage cannot accept the snapshot, return an - // error before passing it to RawNode.Step, since our - // error handling options past that point are limited. - log.Infof(ctx, "cannot apply snapshot: %s", err) - return err - } - - if placeholder != nil { - // NB: The placeholder added here is either removed below after a - // preemptive snapshot is applied or after the next call to - // Replica.handleRaftReady. Note that we can only get here if the - // replica doesn't exist or is uninitialized. - if err := s.addPlaceholderLocked(placeholder); err != nil { - log.Fatalf(ctx, "could not add vetted placeholder %s: %s", placeholder, err) - } - addedPlaceholder = true - } - return nil - }(); err != nil { - return roachpb.NewError(err) - } + if snapHeader.RaftMessageRequest.Message.Type != raftpb.MsgSnap { + log.Fatalf(ctx, "expected snapshot: %+v", snapHeader.RaftMessageRequest) + } - if addedPlaceholder { - // If we added a placeholder remove it before we return unless some other - // part of the code takes ownership of the removal (indicated by setting - // removePlaceholder to false). - removePlaceholder = true - defer func() { - if removePlaceholder { - if s.removePlaceholder(ctx, snapHeader.RaftMessageRequest.RangeID) { - atomic.AddInt32(&s.counts.removedPlaceholders, 1) - } - } - }() + // Check to see if a snapshot can be applied. Snapshots can always be applied + // to initialized replicas. Note that if we add a placeholder we need to + // already be holding Replica.raftMu in order to prevent concurrent + // raft-ready processing of uninitialized replicas. + var addedPlaceholder bool + var removePlaceholder bool + if err := func() error { + s.mu.Lock() + defer s.mu.Unlock() + placeholder, err := s.canApplySnapshotLocked(ctx, inSnap.IsPreemptive(), snapHeader, true /* authoritative */) + if err != nil { + // If the storage cannot accept the snapshot, return an + // error before passing it to RawNode.Step, since our + // error handling options past that point are limited. + log.Infof(ctx, "cannot apply snapshot: %s", err) + return err } - if snapHeader.IsPreemptive() { - // Requiring that the Term is set in a message makes sure that we - // get all of Raft's internal safety checks (it confuses messages - // at term zero for internal messages). The sending side uses the - // term from the snapshot itself, but we'll just check nonzero. - if snapHeader.RaftMessageRequest.Message.Term == 0 { - return roachpb.NewErrorf( - "preemptive snapshot from term %d received with zero term", - snapHeader.RaftMessageRequest.Message.Snapshot.Metadata.Term, - ) - } - // TODO(tschottdorf): A lot of locking of the individual Replica - // going on below as well. I think that's more easily refactored - // away; what really matters is that the Store doesn't do anything - // else with that same Replica (or one that might conflict with us - // while we still run). In effect, we'd want something like: - // - // 1. look up the snapshot's key range - // 2. get an exclusive lock for operations on that key range from - // the store (or discard the snapshot) - // (at the time of writing, we have checked the key range in - // canApplySnapshotLocked above, but there are concerns about two - // conflicting operations passing that check simultaneously, - // see #7830) - // 3. do everything below (apply the snapshot through temp Raft group) - // 4. release the exclusive lock on the snapshot's key range - // - // There are two future outcomes: Either we begin receiving - // legitimate Raft traffic for this Range (hence learning the - // ReplicaID and becoming a real Replica), or the Replica GC queue - // decides that the ChangeReplicas as a part of which the - // preemptive snapshot was sent has likely failed and removes both - // in-memory and on-disk state. - r.mu.Lock() - // We are paranoid about applying preemptive snapshots (which - // were constructed via our code rather than raft) to the "real" - // raft group. Instead, we destroy the "real" raft group if one - // exists (this is rare in production, although it occurs in - // tests), apply the preemptive snapshot to a temporary raft - // group, then discard that one as well to be replaced by a real - // raft group when we get a new replica ID. - // - // It might be OK instead to apply preemptive snapshots just - // like normal ones (essentially switching between regular and - // preemptive mode based on whether or not we have a raft group, - // instead of based on the replica ID of the snapshot message). - // However, this is a risk that we're not yet willing to take. - // Additionally, without some additional plumbing work, doing so - // would limit the effectiveness of RaftTransport.SendSync for - // preemptive snapshots. - r.mu.internalRaftGroup = nil - needTombstone := r.mu.state.Desc.NextReplicaID != 0 - r.mu.Unlock() - - appliedIndex, _, err := r.raftMu.stateLoader.LoadAppliedIndex(ctx, r.store.Engine()) - if err != nil { - return roachpb.NewError(err) - } - raftGroup, err := raft.NewRawNode( - newRaftConfig( - raft.Storage((*replicaRaftStorage)(r)), - preemptiveSnapshotRaftGroupID, - // We pass the "real" applied index here due to subtleties - // arising in the case in which Raft discards the snapshot: - // It would instruct us to apply entries, which would have - // crashing potential for any choice of dummy value below. - appliedIndex, - r.store.cfg, - &raftLogger{ctx: ctx}, - ), nil) - if err != nil { - return roachpb.NewError(err) - } - // We have a Raft group; feed it the message. - if err := raftGroup.Step(snapHeader.RaftMessageRequest.Message); err != nil { - return roachpb.NewError(errors.Wrap(err, "unable to process preemptive snapshot")) - } - // In the normal case, the group should ask us to apply a snapshot. - // If it doesn't, our snapshot was probably stale. In that case we - // still go ahead and apply a noop because we want that case to be - // counted by stats as a successful application. - var ready raft.Ready - if raftGroup.HasReady() { - ready = raftGroup.Ready() + if placeholder != nil { + // NB: The placeholder added here is either removed below after a + // preemptive snapshot is applied or after the next call to + // Replica.handleRaftReady. Note that we can only get here if the + // replica doesn't exist or is uninitialized. + if err := s.addPlaceholderLocked(placeholder); err != nil { + log.Fatalf(ctx, "could not add vetted placeholder %s: %s", placeholder, err) } + addedPlaceholder = true + } + return nil + }(); err != nil { + return roachpb.NewError(err) + } - if needTombstone { - // Bump the min replica ID, but don't write the tombstone key. The - // tombstone key is not expected to be present when normal replica data - // is present and applySnapshot would delete the key in most cases. If - // Raft has decided the snapshot shouldn't be applied we would be - // writing the tombstone key incorrectly. - r.mu.Lock() - if r.mu.state.Desc.NextReplicaID > r.mu.minReplicaID { - r.mu.minReplicaID = r.mu.state.Desc.NextReplicaID + if addedPlaceholder { + // If we added a placeholder remove it before we return unless some other + // part of the code takes ownership of the removal (indicated by setting + // removePlaceholder to false). + removePlaceholder = true + defer func() { + if removePlaceholder { + if s.removePlaceholder(ctx, snapHeader.RaftMessageRequest.RangeID) { + atomic.AddInt32(&s.counts.removedPlaceholders, 1) } - r.mu.Unlock() } + }() + } - // Apply the snapshot, as Raft told us to. Preemptive snapshots never - // subsume replicas (this is guaranteed by Store.canApplySnapshot), so - // we can simply pass nil for the subsumedRepls parameter. - if err := r.applySnapshot( - ctx, inSnap, ready.Snapshot, ready.HardState, nil, /* subsumedRepls */ - ); err != nil { - return roachpb.NewError(err) - } - // applySnapshot has already removed the placeholder. - removePlaceholder = false + if snapHeader.RaftMessageRequest.Message.Term == 0 { + return roachpb.NewErrorf( + "snapshot from term %d received with zero term", + snapHeader.RaftMessageRequest.Message.Snapshot.Metadata.Term, + ) + } - // At this point, the Replica has data but no ReplicaID. We hope - // that it turns into a "real" Replica by means of receiving Raft - // messages addressed to it with a ReplicaID, but if that doesn't - // happen, at some point the Replica GC queue will have to grab it. - return nil - } + if err := r.stepRaftGroup(&snapHeader.RaftMessageRequest); err != nil { + return roachpb.NewError(err) + } - if err := r.stepRaftGroup(&snapHeader.RaftMessageRequest); err != nil { - return roachpb.NewError(err) - } + if _, expl, err := r.handleRaftReadyRaftMuLocked(ctx, inSnap); err != nil { + fatalOnRaftReadyErr(ctx, expl, err) + } + removePlaceholder = false + return nil +} - if _, expl, err := r.handleRaftReadyRaftMuLocked(ctx, inSnap); err != nil { - fatalOnRaftReadyErr(ctx, expl, err) - } - removePlaceholder = false - return nil +// processRaftSnapshotRequest processes the incoming snapshot Raft request on +// the request's specified replica. This snapshot can be preemptive or not. If +// not, the function makes sure to handle any updated Raft Ready state. +func (s *Store) processRaftSnapshotRequest( + ctx context.Context, snapHeader *SnapshotRequest_Header, inSnap IncomingSnapshot, +) *roachpb.Error { + return s.withReplicaForRequest(ctx, &snapHeader.RaftMessageRequest, func( + ctx context.Context, r *Replica, + ) (pErr *roachpb.Error) { + return s.processRaftSnapshotRequestWithReplicaRaftMuLocked(ctx, snapHeader, inSnap, r) }) } @@ -3941,6 +3897,23 @@ func (s *Store) getOrCreateReplica( replicaID roachpb.ReplicaID, creatingReplica *roachpb.ReplicaDescriptor, ) (_ *Replica, created bool, _ error) { + if false && replicaID == 0 { + // TODO(tbg): ideally we want to avoid ever creating a replica with id zero, + // but at the time of writing we're not quite there yet because split locks + // use a replica with id zero that they acquire the raftMu of. + // This is a bit silly because really what splits should be doing is acquiring + // a ReplicaPlaceholder: + _ = (*ReplicaPlaceholder)(nil) + // These are currently only used for short amounts of time while we're + // applying snapshots (i.e. not while we're receiving them) and they + // aren't checked in this method, but that should change and we should + // either delay the call to getOrCreateReplica that runs into a + // placeholder (until the placeholder resolves to a *Replica or gets + // canceled), or even just return an error, whichever one works better. + // At that point, we can truly assert throughout all of the code that + // the replicaID is never zero. + return nil, false, errors.Errorf("unable to create replica for r%d with replicaID zero", rangeID) + } for { r, created, err := s.tryGetOrCreateReplica( ctx, @@ -4171,6 +4144,15 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { s.metrics.AverageWritesPerSecond.Update(averageWritesPerSecond) s.recordNewPerSecondStats(averageQueriesPerSecond, averageWritesPerSecond) + // Discard excess delayed preemptive snapshots before updating the + // corresponding gauges. + if bytesDel, numDel := s.delayedPreemptiveSnaps.gc(timeutil.Now()); numDel > 0 { + log.Infof(ctx, "deleted %d (%s) unused delayed preemptive snapshots", + numDel, humanizeutil.IBytes(int64(bytesDel)), + ) + } + s.metrics.RangeSnapshotsPreemptiveDelayedBytes.Update(int64(s.delayedPreemptiveSnaps.bytes())) + s.metrics.RangeCount.Update(rangeCount) s.metrics.UnavailableRangeCount.Update(unavailableRangeCount) s.metrics.UnderReplicatedRangeCount.Update(underreplicatedRangeCount) diff --git a/pkg/storage/store_snapshot.go b/pkg/storage/store_snapshot.go index cf77e4da84d2..75a741a1b46a 100644 --- a/pkg/storage/store_snapshot.go +++ b/pkg/storage/store_snapshot.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "io" - "math" "time" "github.com/cockroachdb/cockroach/pkg/base" @@ -30,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/pkg/errors" @@ -38,10 +38,6 @@ import ( ) const ( - // preemptiveSnapshotRaftGroupID is a bogus ID for which a Raft group is - // temporarily created during the application of a preemptive snapshot. - preemptiveSnapshotRaftGroupID = math.MaxUint64 - // Messages that provide detail about why a preemptive snapshot was rejected. snapshotStoreTooFullMsg = "store almost out of disk space" snapshotApplySemBusyMsg = "store busy applying snapshots" @@ -52,6 +48,89 @@ const ( IntersectingSnapshotMsg = "snapshot intersects existing range" ) +type delayedPreemptiveSnap struct { + Header SnapshotRequest_Header + IncomingSnapshot IncomingSnapshot + // Received is the time at which the snapshot was fully received. + Received time.Time +} + +type delayedPreemptiveSnaps struct { + mu syncutil.Mutex + m map[roachpb.RangeID]delayedPreemptiveSnap // init on first use + + // When zero, sane defaults are used. + maxSize int + maxAge time.Duration +} + +func (dps *delayedPreemptiveSnaps) insert(rangeID roachpb.RangeID, snap delayedPreemptiveSnap) { + dps.mu.Lock() + defer dps.mu.Unlock() + if dps.m == nil { + dps.m = map[roachpb.RangeID]delayedPreemptiveSnap{} + } + dps.m[rangeID] = snap +} + +func (dps *delayedPreemptiveSnaps) getAndRemove( + rangeID roachpb.RangeID, +) (_ delayedPreemptiveSnap, found bool) { + dps.mu.Lock() + defer dps.mu.Unlock() + snap, ok := dps.m[rangeID] + if ok { + delete(dps.m, rangeID) + } + return snap, ok +} + +func (dps *delayedPreemptiveSnaps) gc(now time.Time) (bytesDeleted int, numDeleted int) { + dps.mu.Lock() + defer dps.mu.Unlock() + + maxAge := 20 * time.Second + if dps.maxAge != 0 { + maxAge = dps.maxAge + } + + maxSize := 128 * (1 << 20) // 128MB + if dps.maxSize > 0 { + maxSize = dps.maxSize + } + + var totalSize int + for rangeID, snap := range dps.m { + var size int + for _, b := range snap.IncomingSnapshot.Batches { + size += len(b) + } + totalSize += size + // Delete snapshots that we see once more than maxSize memory is + // allocated. Also delete all snapshots that have been sitting around + // for 20 seconds or more. + if totalSize > maxSize || now.Sub(snap.Received) > maxAge { + numDeleted++ + bytesDeleted += size + delete(dps.m, rangeID) + } + } + return bytesDeleted, numDeleted +} + +func (dps *delayedPreemptiveSnaps) bytes() int { + dps.mu.Lock() + defer dps.mu.Unlock() + + var totalSize int + for _, snap := range dps.m { + for _, b := range snap.IncomingSnapshot.Batches { + totalSize += len(b) + } + } + return totalSize +} + // incomingSnapshotStream is the minimal interface on a GRPC stream required // to receive a snapshot over the network. type incomingSnapshotStream interface { @@ -408,16 +487,8 @@ func (s *Store) reserveSnapshot( // The authoritative bool determines whether the check is carried out with the // intention of actually applying the snapshot (in which case an existing replica // must exist and have its raftMu locked) or as a preliminary check. -func (s *Store) canApplySnapshot( - ctx context.Context, snapHeader *SnapshotRequest_Header, authoritative bool, -) (*ReplicaPlaceholder, error) { - s.mu.Lock() - defer s.mu.Unlock() - return s.canApplySnapshotLocked(ctx, snapHeader, authoritative) -} - func (s *Store) canApplySnapshotLocked( - ctx context.Context, snapHeader *SnapshotRequest_Header, authoritative bool, + ctx context.Context, preemptive bool, snapHeader *SnapshotRequest_Header, authoritative bool, ) (*ReplicaPlaceholder, error) { desc := *snapHeader.State.Desc @@ -457,7 +528,7 @@ func (s *Store) canApplySnapshotLocked( existingRepl.mu.RUnlock() if existingIsInitialized { - if !snapHeader.IsPreemptive() { + if !preemptive { // Regular Raft snapshots can't be refused at this point, // even if they widen the existing replica. See the comments // in Replica.maybeAcquireSnapshotMergeLock for how this is @@ -502,7 +573,7 @@ func (s *Store) canApplySnapshotLocked( // sure enough that this couldn't happen by accident to GC the // replica ourselves - the replica GC queue will perform the proper // check). - } else if snapHeader.IsPreemptive() { + } else if preemptive { // Morally, the existing replica now has a nonzero replica ID // because we already know that it is not initialized (i.e. has no // data). Interestingly, the case in which it has a zero replica ID @@ -618,15 +689,27 @@ func (s *Store) receiveSnapshot( } defer cleanup() + // At this stage, we detect preemptive snapshots by the fact that they are + // addressed to replicaID zero. Note that this won't be true when the snapshot + // is applied as a delayed preemptive snapshot: at that point it will have a + // replicaID (but we'll instead use IncomingSnapshot.IsPreemptive()). + preemptive := header.RaftMessageRequest.ToReplica.ReplicaID == 0 + // Check to see if the snapshot can be applied but don't attempt to add // a placeholder here, because we're not holding the replica's raftMu. // We'll perform this check again later after receiving the rest of the // snapshot data - this is purely an optimization to prevent downloading // a snapshot that we know we won't be able to apply. - if _, err := s.canApplySnapshot(ctx, header, false /* authoritative */); err != nil { - return sendSnapshotError(stream, - errors.Wrapf(err, "%s,r%d: cannot apply snapshot", s, header.State.Desc.RangeID), - ) + { + s.mu.Lock() + _, err := s.canApplySnapshotLocked(ctx, preemptive, header, false /* authoritative */) + s.mu.Unlock() + + if err != nil { + return sendSnapshotError(stream, + errors.Wrapf(err, "%s,r%d: cannot apply snapshot", s, header.State.Desc.RangeID), + ) + } } // Determine which snapshot strategy the sender is using to send this @@ -656,8 +739,22 @@ func (s *Store) receiveSnapshot( if err != nil { return err } - if err := s.processRaftSnapshotRequest(ctx, header, inSnap); err != nil { - return sendSnapshotError(stream, errors.Wrap(err.GoError(), "failed to apply snapshot")) + + if preemptive { + now := timeutil.Now() + // Store the preemptive snapshot. It will be applied as a Raft snapshot + // right when the replica is created (with a replicaID). + var snap delayedPreemptiveSnap + snap.Header = *header + snap.IncomingSnapshot = inSnap + snap.Received = now + + s.delayedPreemptiveSnaps.gc(now) + s.delayedPreemptiveSnaps.insert(header.State.Desc.RangeID, snap) + } else { + if err := s.processRaftSnapshotRequest(ctx, header, inSnap); err != nil { + return sendSnapshotError(stream, errors.Wrap(err.GoError(), "failed to apply snapshot")) + } } return stream.Send(&SnapshotResponse{Status: SnapshotResponse_APPLIED}) diff --git a/pkg/storage/store_snapshot_test.go b/pkg/storage/store_snapshot_test.go index a78d00e3c41f..3804251a3398 100644 --- a/pkg/storage/store_snapshot_test.go +++ b/pkg/storage/store_snapshot_test.go @@ -135,13 +135,16 @@ func TestSnapshotPreemptiveOnUninitializedReplica(t *testing.T) { header := &SnapshotRequest_Header{} header.State.Desc = &desc + inSnap := IncomingSnapshot{snapType: snapTypeLocalDelayedPreemptive} - if !header.IsPreemptive() { + if !inSnap.IsPreemptive() { t.Fatal("mock snapshot isn't preemptive") } - if _, err := store.canApplySnapshot( - ctx, header, true, /* authoritative */ + store.mu.Lock() + defer store.mu.Unlock() + if _, err := store.canApplySnapshotLocked( + ctx, inSnap.IsPreemptive(), header, true, /* authoritative */ ); !testutils.IsError(err, "intersects existing range") { t.Fatal(err) } diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index b3adac9c03c5..6ccabe9d2888 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -2798,6 +2798,7 @@ func TestStoreRangePlaceholders(t *testing.T) { // Test that we remove snapshot placeholders on error conditions. func TestStoreRemovePlaceholderOnError(t *testing.T) { defer leaktest.AfterTest(t)() + tc := testContext{} stopper := stop.NewStopper() defer stopper.Stop(context.TODO()) @@ -2835,7 +2836,7 @@ func TestStoreRemovePlaceholderOnError(t *testing.T) { ToReplica: roachpb.ReplicaDescriptor{ NodeID: 1, StoreID: 1, - ReplicaID: 0, + ReplicaID: 17, // Raft snapshot }, FromReplica: roachpb.ReplicaDescriptor{ NodeID: 2, @@ -2850,7 +2851,7 @@ func TestStoreRemovePlaceholderOnError(t *testing.T) { }, }, } - const expected = "preemptive snapshot from term 0 received" + const expected = "snapshot from term 0 received" if err := s.processRaftSnapshotRequest(ctx, snapHeader, IncomingSnapshot{ SnapUUID: uuid.MakeV4(), @@ -2931,6 +2932,7 @@ func TestStoreRemovePlaceholderOnRaftIgnored(t *testing.T) { }, Message: raftpb.Message{ Type: raftpb.MsgSnap, + Term: 1, Snapshot: raftpb.Snapshot{ Data: data, Metadata: raftpb.SnapshotMetadata{ diff --git a/pkg/storage/txnwait/txnqueue.go b/pkg/storage/txnwait/txnqueue.go index a43934f6d2ec..65074804a914 100644 --- a/pkg/storage/txnwait/txnqueue.go +++ b/pkg/storage/txnwait/txnqueue.go @@ -643,7 +643,7 @@ func (q *Queue) MaybeWaitForQuery( q.mu.Lock() // If the txn wait queue is not enabled or if the request is not - // contained within the replica, do nothing. The request can fall + // contained within the replica, do nothing. The request can fallp // outside of the replica after a split or merge. Note that the // ContainsKey check is done under the txn wait queue's lock to // ensure that it's not cleared before an incorrect insertion happens.