diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index d39d1d2519d0..29f666815fdf 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -1422,8 +1422,8 @@ func TestStoreRangeUpReplicate(t *testing.T) { if normalApplied != 0 { t.Fatalf("expected 0 normal snapshots, but found %d", normalApplied) } - if generated != preemptiveApplied { - t.Fatalf("expected %d preemptive snapshots, but found %d", generated, preemptiveApplied) + if preemptiveApplied == 0 { + t.Fatalf("expected nonzero preemptive snapshots, but found none") } } @@ -1472,11 +1472,6 @@ func TestUnreplicateFirstRange(t *testing.T) { // TestChangeReplicasDescriptorInvariant tests that a replica change aborts if // another change has been made to the RangeDescriptor since it was initiated. -// -// TODO(tschottdorf): If this test is flaky because the snapshot count does not -// increase, it's likely because with proposer-evaluated KV, less gets proposed -// and so sometimes Raft discards the preemptive snapshot (though we count that -// case in stats already) or doesn't produce a Ready. func TestChangeReplicasDescriptorInvariant(t *testing.T) { defer leaktest.AfterTest(t)() mtc := &multiTestContext{ @@ -1531,16 +1526,15 @@ func TestChangeReplicasDescriptorInvariant(t *testing.T) { t.Fatalf("got unexpected error: %v", err) } - testutils.SucceedsSoon(t, func() error { - after := mtc.stores[2].Metrics().RangeSnapshotsPreemptiveApplied.Count() - // The failed ChangeReplicas call should have applied a preemptive snapshot. - if after != before+1 { - return errors.Errorf( - "ChangeReplicas call should have applied a preemptive snapshot, before %d after %d", - before, after) - } - return nil - }) + after := mtc.stores[2].Metrics().RangeSnapshotsPreemptiveApplied.Count() + // The failed ChangeReplicas call should not have applied a preemptive + // snapshot. It should've sent it to the target replica, but since we're + // delaying preemptive snapshots, it would never have been applied. + if after != before { + t.Fatalf( + "ChangeReplicas call should not have applied a preemptive snapshot, before %d after %d", + before, after) + } before = mtc.stores[2].Metrics().RangeSnapshotsPreemptiveApplied.Count() // Add to third store with fresh descriptor. @@ -1550,7 +1544,6 @@ func TestChangeReplicasDescriptorInvariant(t *testing.T) { testutils.SucceedsSoon(t, func() error { after := mtc.stores[2].Metrics().RangeSnapshotsPreemptiveApplied.Count() - // The failed ChangeReplicas call should have applied a preemptive snapshot. if after != before+1 { return errors.Errorf( "ChangeReplicas call should have applied a preemptive snapshot, before %d after %d", diff --git a/pkg/storage/metrics.go b/pkg/storage/metrics.go index 2a1aed7fa9ea..51f9aeacdc4b 100644 --- a/pkg/storage/metrics.go +++ b/pkg/storage/metrics.go @@ -407,10 +407,16 @@ var ( } metaRangeSnapshotsPreemptiveApplied = metric.Metadata{ Name: "range.snapshots.preemptive-applied", - Help: "Number of applied pre-emptive snapshots", + Help: "Number of applied (delayed or undelayed) pre-emptive snapshots", Measurement: "Snapshots", Unit: metric.Unit_COUNT, } + metaRangeSnapshotsPreemptiveDelayedBytes = metric.Metadata{ + Name: "range.snapshots.preemptive-delayed-bytes", + Help: "Bytes held in-memory in delayed preemptive snapshots", + Measurement: "Memory", + Unit: metric.Unit_COUNT, + } metaRangeRaftLeaderTransfers = metric.Metadata{ Name: "range.raftleadertransfers", Help: "Number of raft leader transfers", @@ -1040,14 +1046,15 @@ type StoreMetrics struct { // accordingly. // Range event metrics. - RangeSplits *metric.Counter - RangeMerges *metric.Counter - RangeAdds *metric.Counter - RangeRemoves *metric.Counter - RangeSnapshotsGenerated *metric.Counter - RangeSnapshotsNormalApplied *metric.Counter - RangeSnapshotsPreemptiveApplied *metric.Counter - RangeRaftLeaderTransfers *metric.Counter + RangeSplits *metric.Counter + RangeMerges *metric.Counter + RangeAdds *metric.Counter + RangeRemoves *metric.Counter + RangeSnapshotsGenerated *metric.Counter + RangeSnapshotsNormalApplied *metric.Counter + RangeSnapshotsPreemptiveApplied *metric.Counter + RangeSnapshotsPreemptiveDelayedBytes *metric.Gauge + RangeRaftLeaderTransfers *metric.Counter // Raft processing metrics. RaftTicks *metric.Counter @@ -1249,14 +1256,15 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { RdbNumSSTables: metric.NewGauge(metaRdbNumSSTables), // Range event metrics. - RangeSplits: metric.NewCounter(metaRangeSplits), - RangeMerges: metric.NewCounter(metaRangeMerges), - RangeAdds: metric.NewCounter(metaRangeAdds), - RangeRemoves: metric.NewCounter(metaRangeRemoves), - RangeSnapshotsGenerated: metric.NewCounter(metaRangeSnapshotsGenerated), - RangeSnapshotsNormalApplied: metric.NewCounter(metaRangeSnapshotsNormalApplied), - RangeSnapshotsPreemptiveApplied: metric.NewCounter(metaRangeSnapshotsPreemptiveApplied), - RangeRaftLeaderTransfers: metric.NewCounter(metaRangeRaftLeaderTransfers), + RangeSplits: metric.NewCounter(metaRangeSplits), + RangeMerges: metric.NewCounter(metaRangeMerges), + RangeAdds: metric.NewCounter(metaRangeAdds), + RangeRemoves: metric.NewCounter(metaRangeRemoves), + RangeSnapshotsGenerated: metric.NewCounter(metaRangeSnapshotsGenerated), + RangeSnapshotsNormalApplied: metric.NewCounter(metaRangeSnapshotsNormalApplied), + RangeSnapshotsPreemptiveApplied: metric.NewCounter(metaRangeSnapshotsPreemptiveApplied), + RangeSnapshotsPreemptiveDelayedBytes: metric.NewGauge(metaRangeSnapshotsPreemptiveDelayedBytes), + RangeRaftLeaderTransfers: metric.NewCounter(metaRangeRaftLeaderTransfers), // Raft processing metrics. RaftTicks: metric.NewCounter(metaRaftTicks), diff --git a/pkg/storage/raft.go b/pkg/storage/raft.go index 6de017ea76d9..2bfb181a8e9c 100644 --- a/pkg/storage/raft.go +++ b/pkg/storage/raft.go @@ -179,14 +179,6 @@ func raftEntryFormatter(data []byte) string { return fmt.Sprintf("[%x] [%d]", commandID, len(data)) } -// IsPreemptive returns whether this is a preemptive snapshot or a Raft -// snapshot. -func (h *SnapshotRequest_Header) IsPreemptive() bool { - // Preemptive snapshots are addressed to replica ID 0. No other requests to - // replica ID 0 are allowed. - return h.RaftMessageRequest.ToReplica.ReplicaID == 0 -} - // traceEntries records the provided event for all proposals corresponding // to the entries contained in ents. The vmodule level for raft must be at // least 1. diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index c5e17476e1de..6feac3a96a87 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -934,9 +934,12 @@ func (r *Replica) sendSnapshot( FromReplica: fromRepDesc, ToReplica: repDesc, Message: raftpb.Message{ - Type: raftpb.MsgSnap, - To: uint64(repDesc.ReplicaID), - From: uint64(fromRepDesc.ReplicaID), + Type: raftpb.MsgSnap, + To: uint64(repDesc.ReplicaID), + From: uint64(fromRepDesc.ReplicaID), + // TODO(tbg): is it kosher to pick a random Term from a RaftStatus + // and to stick that in a message? Should this term match that of + // the HardState in the snapshot from which the data was taken? Term: status.Term, Snapshot: snap.RaftSnap, }, diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 1ec049822df8..4c267842246b 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -2122,9 +2122,20 @@ func (r *Replica) maybeAcquireSnapshotMergeLock( // left-hand neighbor has no pending merges to apply. And that RHS replica // could not have been further split or merged, as it never processes another // command after the merge commits. + // + // Note that this reasoning does not extend to preemptive snapshots. If a range + // starts out as [a,b), then merges [b,d), creates a preemptive snapshot S, + // splits into [a,b)', [b,c), and [b,d), then a node not involved in any of + // the previous activity could receive an up-to-date replica of [b,c), and + // then observe the preemptive snapshot S which must not be applied, for [b,c) + // is "more recent" and must not be destroyed. If the node has picked + // up a replica of `[a,b)'` by that time, the snapshot will be discarded by + // Raft. If it has a replica of `[a,b)` (i.e. before the merge), the snapshot + // again must be refused, for + endKey := r.Desc().EndKey if endKey == nil { - // The existing replica is unitialized, in which case we've already + // The existing replica is uninitialized, in which case we've already // installed a placeholder for snapshot's keyspace. No merge lock needed. return nil, func() {} } diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 65bd758c584a..5c4617a40aa0 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -501,6 +501,11 @@ type IncomingSnapshot struct { snapType string } +// IsPreemptive returns whether this is a delayed or undelayed preemptive snapshot. +func (is *IncomingSnapshot) IsPreemptive() bool { + return is.snapType != snapTypeRaft +} + // snapshot creates an OutgoingSnapshot containing a rocksdb snapshot for the // given range. Note that snapshot() is called without Replica.raftMu held. func snapshot( @@ -667,8 +672,9 @@ func (r *Replica) updateRangeInfo(desc *roachpb.RangeDescriptor) error { } const ( - snapTypeRaft = "Raft" - snapTypePreemptive = "preemptive" + snapTypeRaft = "Raft" + snapTypePreemptive = "preemptive" + snapTypeLocalDelayedPreemptive = "delayed preemptive" ) func clearRangeData( @@ -767,10 +773,18 @@ func (r *Replica) applySnapshot( snapType := inSnap.snapType defer func() { if err == nil { - if snapType == snapTypeRaft { + switch snapType { + case snapTypeRaft: r.store.metrics.RangeSnapshotsNormalApplied.Inc(1) - } else { + case snapTypePreemptive: + log.Fatalf(ctx, "unexpectedly asked to apply an undelayed preemptive snapshot") + case snapTypeLocalDelayedPreemptive: + // We treat application of a delayed preemptive snapshot as an + // application of a preemptive snapshot as far as stats are + // concerned. r.store.metrics.RangeSnapshotsPreemptiveApplied.Inc(1) + default: + log.Fatalf(ctx, "unknown snapshot type %s", snapType) } } }() diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 3e751cdd7d9b..bd6121ee60bf 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -587,6 +587,8 @@ type Store struct { roachpb.StoreCapacity } + delayedPreemptiveSnaps delayedPreemptiveSnaps + counts struct { // Number of placeholders removed due to error. removedPlaceholders int32 @@ -1321,7 +1323,25 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { return false, errors.Errorf("found uninitialized RangeDescriptor: %+v", desc) } + // TODO(tbg): we're passing zero here because either + // - the data is really a preemptive snapshot, in which case it's a holdover from before + // delayed preemptive snapshots and we actually just want to delete it right here. + // - or it isn't but the descriptor does not include a replica on this node. + // This can happen if the data started as a delayed preemptive snapshot that + // was turned into a Replica, but the log never caught up to the log index + // at which the descriptor includes this replica and thus its replicaID itself. + // We can't just delete everything if there's probably a nontrivial HardState + // and ack'ed Raft log that we've promised to keep. But we can avoid instantiating the + // Replica; we'll only ever try to instantiate it when we have a replicaID and so the + // problem goes away. The tricky part is that now we can't tell if we're ever going + // to be able to delete this data. The way out is to let the GC queue check even though + // this is not an initialized replica. So we just stash a rangeID in some map on the + // store and the GC queue will make sure that is cleared out, either leaving the data + // around (presumably to be contacted soon by peers) or removing it in case we're not + // a member any more. + // - or we can load the replicaID from the descriptor and not face any problems. rep, err := NewReplica(&desc, s, 0) + if err != nil { return false, err } @@ -3285,8 +3305,14 @@ func (s *Store) HandleRaftUncoalescedRequest( func (s *Store) withReplicaForRequest( ctx context.Context, req *RaftMessageRequest, f func(context.Context, *Replica) *roachpb.Error, ) *roachpb.Error { + // Nothing should ever ask us for a message to replicaID zero. The exception + // are preemptive snapshots but those are intercepted well above this method. + if req.ToReplica.ReplicaID == 0 { + log.Fatalf(ctx, "unexpected request addressed to replicaID zero: %+v", req) + } + // Lazily create the replica. - r, _, err := s.getOrCreateReplica( + r, created, err := s.getOrCreateReplica( ctx, req.RangeID, req.ToReplica.ReplicaID, @@ -3297,7 +3323,41 @@ func (s *Store) withReplicaForRequest( } defer r.raftMu.Unlock() ctx = r.AnnotateCtx(ctx) + // NB: it's important that we call this before applying a delayed preemptive + // snapshot, for otherwise the response created from it will be dropped + // (which should be fine) along with an error message (which is not fine). r.setLastReplicaDescriptors(req) + + if created { + snap, haveInSnap := s.delayedPreemptiveSnaps.getAndRemove(req.RangeID) + + if haveInSnap { + snap.Header.RaftMessageRequest.ToReplica.ReplicaID = req.ToReplica.ReplicaID + snap.IncomingSnapshot.snapType = snapTypeLocalDelayedPreemptive + + if pErr := s.processRaftSnapshotRequestWithReplicaRaftMuLocked( + ctx, &snap.Header, snap.IncomingSnapshot, r, + ); pErr != nil { + // A delayed preemptive snapshot may be rejected at apply time + // due to overlapping another replica's keyspace. We check this + // already when the snapshot transfer is first initiated, so + // actually hitting this path should be rare. + log.Info(ctx, "delayed preemptive snapshot was rejected: %v", pErr) + } + } else { + // The case in which an uninitialized (no data) replica doesn't find + // the snapshot is possible. (The split-snapshot-race). It doesn't + // require a snapshot in the common case because the split trigger + // populates the RHS. + if r.IsInitialized() { + // But if the replica already has data, it couldn't be the RHS + // of a split before the LHS applied the trigger. Such RHS have + // to have blank state. + log.Fatalf(ctx, "unexpectedly no snap found: %+v", req) + } + } + ctx = r.AnnotateCtx(ctx) + } return f(ctx, r) } @@ -3348,192 +3408,88 @@ func (s *Store) processRaftRequestWithReplica( return nil } -// processRaftSnapshotRequest processes the incoming snapshot Raft request on -// the request's specified replica. This snapshot can be preemptive or not. If -// not, the function makes sure to handle any updated Raft Ready state. -func (s *Store) processRaftSnapshotRequest( - ctx context.Context, snapHeader *SnapshotRequest_Header, inSnap IncomingSnapshot, +func (s *Store) processRaftSnapshotRequestWithReplicaRaftMuLocked( + ctx context.Context, snapHeader *SnapshotRequest_Header, inSnap IncomingSnapshot, r *Replica, ) *roachpb.Error { - return s.withReplicaForRequest(ctx, &snapHeader.RaftMessageRequest, func( - ctx context.Context, r *Replica, - ) (pErr *roachpb.Error) { - if snapHeader.RaftMessageRequest.Message.Type != raftpb.MsgSnap { - log.Fatalf(ctx, "expected snapshot: %+v", snapHeader.RaftMessageRequest) - } - - // Check to see if a snapshot can be applied. Snapshots can always be applied - // to initialized replicas. Note that if we add a placeholder we need to - // already be holding Replica.raftMu in order to prevent concurrent - // raft-ready processing of uninitialized replicas. - var addedPlaceholder bool - var removePlaceholder bool - if err := func() error { - s.mu.Lock() - defer s.mu.Unlock() - placeholder, err := s.canApplySnapshotLocked(ctx, snapHeader, true /* authoritative */) - if err != nil { - // If the storage cannot accept the snapshot, return an - // error before passing it to RawNode.Step, since our - // error handling options past that point are limited. - log.Infof(ctx, "cannot apply snapshot: %s", err) - return err - } - - if placeholder != nil { - // NB: The placeholder added here is either removed below after a - // preemptive snapshot is applied or after the next call to - // Replica.handleRaftReady. Note that we can only get here if the - // replica doesn't exist or is uninitialized. - if err := s.addPlaceholderLocked(placeholder); err != nil { - log.Fatalf(ctx, "could not add vetted placeholder %s: %s", placeholder, err) - } - addedPlaceholder = true - } - return nil - }(); err != nil { - return roachpb.NewError(err) - } + if snapHeader.RaftMessageRequest.Message.Type != raftpb.MsgSnap { + log.Fatalf(ctx, "expected snapshot: %+v", snapHeader.RaftMessageRequest) + } - if addedPlaceholder { - // If we added a placeholder remove it before we return unless some other - // part of the code takes ownership of the removal (indicated by setting - // removePlaceholder to false). - removePlaceholder = true - defer func() { - if removePlaceholder { - if s.removePlaceholder(ctx, snapHeader.RaftMessageRequest.RangeID) { - atomic.AddInt32(&s.counts.removedPlaceholders, 1) - } - } - }() + // Check to see if a snapshot can be applied. Snapshots can always be applied + // to initialized replicas. Note that if we add a placeholder we need to + // already be holding Replica.raftMu in order to prevent concurrent + // raft-ready processing of uninitialized replicas. + var addedPlaceholder bool + var removePlaceholder bool + if err := func() error { + s.mu.Lock() + defer s.mu.Unlock() + placeholder, err := s.canApplySnapshotLocked(ctx, inSnap.IsPreemptive(), snapHeader, true /* authoritative */) + if err != nil { + // If the storage cannot accept the snapshot, return an + // error before passing it to RawNode.Step, since our + // error handling options past that point are limited. + log.Infof(ctx, "cannot apply snapshot: %s", err) + return err } - if snapHeader.IsPreemptive() { - // Requiring that the Term is set in a message makes sure that we - // get all of Raft's internal safety checks (it confuses messages - // at term zero for internal messages). The sending side uses the - // term from the snapshot itself, but we'll just check nonzero. - if snapHeader.RaftMessageRequest.Message.Term == 0 { - return roachpb.NewErrorf( - "preemptive snapshot from term %d received with zero term", - snapHeader.RaftMessageRequest.Message.Snapshot.Metadata.Term, - ) - } - // TODO(tschottdorf): A lot of locking of the individual Replica - // going on below as well. I think that's more easily refactored - // away; what really matters is that the Store doesn't do anything - // else with that same Replica (or one that might conflict with us - // while we still run). In effect, we'd want something like: - // - // 1. look up the snapshot's key range - // 2. get an exclusive lock for operations on that key range from - // the store (or discard the snapshot) - // (at the time of writing, we have checked the key range in - // canApplySnapshotLocked above, but there are concerns about two - // conflicting operations passing that check simultaneously, - // see #7830) - // 3. do everything below (apply the snapshot through temp Raft group) - // 4. release the exclusive lock on the snapshot's key range - // - // There are two future outcomes: Either we begin receiving - // legitimate Raft traffic for this Range (hence learning the - // ReplicaID and becoming a real Replica), or the Replica GC queue - // decides that the ChangeReplicas as a part of which the - // preemptive snapshot was sent has likely failed and removes both - // in-memory and on-disk state. - r.mu.Lock() - // We are paranoid about applying preemptive snapshots (which - // were constructed via our code rather than raft) to the "real" - // raft group. Instead, we destroy the "real" raft group if one - // exists (this is rare in production, although it occurs in - // tests), apply the preemptive snapshot to a temporary raft - // group, then discard that one as well to be replaced by a real - // raft group when we get a new replica ID. - // - // It might be OK instead to apply preemptive snapshots just - // like normal ones (essentially switching between regular and - // preemptive mode based on whether or not we have a raft group, - // instead of based on the replica ID of the snapshot message). - // However, this is a risk that we're not yet willing to take. - // Additionally, without some additional plumbing work, doing so - // would limit the effectiveness of RaftTransport.SendSync for - // preemptive snapshots. - r.mu.internalRaftGroup = nil - needTombstone := r.mu.state.Desc.NextReplicaID != 0 - r.mu.Unlock() - - appliedIndex, _, err := r.raftMu.stateLoader.LoadAppliedIndex(ctx, r.store.Engine()) - if err != nil { - return roachpb.NewError(err) - } - raftGroup, err := raft.NewRawNode( - newRaftConfig( - raft.Storage((*replicaRaftStorage)(r)), - preemptiveSnapshotRaftGroupID, - // We pass the "real" applied index here due to subtleties - // arising in the case in which Raft discards the snapshot: - // It would instruct us to apply entries, which would have - // crashing potential for any choice of dummy value below. - appliedIndex, - r.store.cfg, - &raftLogger{ctx: ctx}, - ), nil) - if err != nil { - return roachpb.NewError(err) - } - // We have a Raft group; feed it the message. - if err := raftGroup.Step(snapHeader.RaftMessageRequest.Message); err != nil { - return roachpb.NewError(errors.Wrap(err, "unable to process preemptive snapshot")) - } - // In the normal case, the group should ask us to apply a snapshot. - // If it doesn't, our snapshot was probably stale. In that case we - // still go ahead and apply a noop because we want that case to be - // counted by stats as a successful application. - var ready raft.Ready - if raftGroup.HasReady() { - ready = raftGroup.Ready() + if placeholder != nil { + // NB: The placeholder added here is either removed below after a + // preemptive snapshot is applied or after the next call to + // Replica.handleRaftReady. Note that we can only get here if the + // replica doesn't exist or is uninitialized. + if err := s.addPlaceholderLocked(placeholder); err != nil { + log.Fatalf(ctx, "could not add vetted placeholder %s: %s", placeholder, err) } + addedPlaceholder = true + } + return nil + }(); err != nil { + return roachpb.NewError(err) + } - if needTombstone { - // Bump the min replica ID, but don't write the tombstone key. The - // tombstone key is not expected to be present when normal replica data - // is present and applySnapshot would delete the key in most cases. If - // Raft has decided the snapshot shouldn't be applied we would be - // writing the tombstone key incorrectly. - r.mu.Lock() - if r.mu.state.Desc.NextReplicaID > r.mu.minReplicaID { - r.mu.minReplicaID = r.mu.state.Desc.NextReplicaID + if addedPlaceholder { + // If we added a placeholder remove it before we return unless some other + // part of the code takes ownership of the removal (indicated by setting + // removePlaceholder to false). + removePlaceholder = true + defer func() { + if removePlaceholder { + if s.removePlaceholder(ctx, snapHeader.RaftMessageRequest.RangeID) { + atomic.AddInt32(&s.counts.removedPlaceholders, 1) } - r.mu.Unlock() } + }() + } - // Apply the snapshot, as Raft told us to. Preemptive snapshots never - // subsume replicas (this is guaranteed by Store.canApplySnapshot), so - // we can simply pass nil for the subsumedRepls parameter. - if err := r.applySnapshot( - ctx, inSnap, ready.Snapshot, ready.HardState, nil, /* subsumedRepls */ - ); err != nil { - return roachpb.NewError(err) - } - // applySnapshot has already removed the placeholder. - removePlaceholder = false + if snapHeader.RaftMessageRequest.Message.Term == 0 { + return roachpb.NewErrorf( + "snapshot from term %d received with zero term", + snapHeader.RaftMessageRequest.Message.Snapshot.Metadata.Term, + ) + } - // At this point, the Replica has data but no ReplicaID. We hope - // that it turns into a "real" Replica by means of receiving Raft - // messages addressed to it with a ReplicaID, but if that doesn't - // happen, at some point the Replica GC queue will have to grab it. - return nil - } + if err := r.stepRaftGroup(&snapHeader.RaftMessageRequest); err != nil { + return roachpb.NewError(err) + } - if err := r.stepRaftGroup(&snapHeader.RaftMessageRequest); err != nil { - return roachpb.NewError(err) - } + if _, expl, err := r.handleRaftReadyRaftMuLocked(ctx, inSnap); err != nil { + fatalOnRaftReadyErr(ctx, expl, err) + } + removePlaceholder = false + return nil +} - if _, expl, err := r.handleRaftReadyRaftMuLocked(ctx, inSnap); err != nil { - fatalOnRaftReadyErr(ctx, expl, err) - } - removePlaceholder = false - return nil +// processRaftSnapshotRequest processes the incoming snapshot Raft request on +// the request's specified replica. This snapshot can be preemptive or not. If +// not, the function makes sure to handle any updated Raft Ready state. +func (s *Store) processRaftSnapshotRequest( + ctx context.Context, snapHeader *SnapshotRequest_Header, inSnap IncomingSnapshot, +) *roachpb.Error { + return s.withReplicaForRequest(ctx, &snapHeader.RaftMessageRequest, func( + ctx context.Context, r *Replica, + ) (pErr *roachpb.Error) { + return s.processRaftSnapshotRequestWithReplicaRaftMuLocked(ctx, snapHeader, inSnap, r) }) } @@ -3941,6 +3897,23 @@ func (s *Store) getOrCreateReplica( replicaID roachpb.ReplicaID, creatingReplica *roachpb.ReplicaDescriptor, ) (_ *Replica, created bool, _ error) { + if false && replicaID == 0 { + // TODO(tbg): ideally we want to avoid ever creating a replica with id zero, + // but at the time of writing we're not quite there yet because split locks + // use a replica with id zero that they acquire the raftMu of. + // This is a bit silly because really what splits should be doing is acquiring + // a ReplicaPlaceholder: + _ = (*ReplicaPlaceholder)(nil) + // These are currently only used for short amounts of time while we're + // applying snapshots (i.e. not while we're receiving them) and they + // aren't checked in this method, but that should change and we should + // either delay the call to getOrCreateReplica that runs into a + // placeholder (until the placeholder resolves to a *Replica or gets + // canceled), or even just return an error, whichever one works better. + // At that point, we can truly assert throughout all of the code that + // the replicaID is never zero. + return nil, false, errors.Errorf("unable to create replica for r%d with replicaID zero", rangeID) + } for { r, created, err := s.tryGetOrCreateReplica( ctx, @@ -4171,6 +4144,15 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { s.metrics.AverageWritesPerSecond.Update(averageWritesPerSecond) s.recordNewPerSecondStats(averageQueriesPerSecond, averageWritesPerSecond) + // Discard excess delayed preemptive snapshots before updating the + // corresponding gauges. + if bytesDel, numDel := s.delayedPreemptiveSnaps.gc(timeutil.Now()); numDel > 0 { + log.Infof(ctx, "deleted %d (%s) unused delayed preemptive snapshots", + numDel, humanizeutil.IBytes(int64(bytesDel)), + ) + } + s.metrics.RangeSnapshotsPreemptiveDelayedBytes.Update(int64(s.delayedPreemptiveSnaps.bytes())) + s.metrics.RangeCount.Update(rangeCount) s.metrics.UnavailableRangeCount.Update(unavailableRangeCount) s.metrics.UnderReplicatedRangeCount.Update(underreplicatedRangeCount) diff --git a/pkg/storage/store_snapshot.go b/pkg/storage/store_snapshot.go index cf77e4da84d2..75a741a1b46a 100644 --- a/pkg/storage/store_snapshot.go +++ b/pkg/storage/store_snapshot.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "io" - "math" "time" "github.com/cockroachdb/cockroach/pkg/base" @@ -30,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/pkg/errors" @@ -38,10 +38,6 @@ import ( ) const ( - // preemptiveSnapshotRaftGroupID is a bogus ID for which a Raft group is - // temporarily created during the application of a preemptive snapshot. - preemptiveSnapshotRaftGroupID = math.MaxUint64 - // Messages that provide detail about why a preemptive snapshot was rejected. snapshotStoreTooFullMsg = "store almost out of disk space" snapshotApplySemBusyMsg = "store busy applying snapshots" @@ -52,6 +48,89 @@ const ( IntersectingSnapshotMsg = "snapshot intersects existing range" ) +type delayedPreemptiveSnap struct { + Header SnapshotRequest_Header + IncomingSnapshot IncomingSnapshot + // Received is the time at which the snapshot was fully received. + Received time.Time +} + +type delayedPreemptiveSnaps struct { + mu syncutil.Mutex + m map[roachpb.RangeID]delayedPreemptiveSnap // init on first use + + // When zero, sane defaults are used. + maxSize int + maxAge time.Duration +} + +func (dps *delayedPreemptiveSnaps) insert(rangeID roachpb.RangeID, snap delayedPreemptiveSnap) { + dps.mu.Lock() + defer dps.mu.Unlock() + if dps.m == nil { + dps.m = map[roachpb.RangeID]delayedPreemptiveSnap{} + } + dps.m[rangeID] = snap +} + +func (dps *delayedPreemptiveSnaps) getAndRemove( + rangeID roachpb.RangeID, +) (_ delayedPreemptiveSnap, found bool) { + dps.mu.Lock() + defer dps.mu.Unlock() + snap, ok := dps.m[rangeID] + if ok { + delete(dps.m, rangeID) + } + return snap, ok +} + +func (dps *delayedPreemptiveSnaps) gc(now time.Time) (bytesDeleted int, numDeleted int) { + dps.mu.Lock() + defer dps.mu.Unlock() + + maxAge := 20 * time.Second + if dps.maxAge != 0 { + maxAge = dps.maxAge + } + + maxSize := 128 * (1 << 20) // 128MB + if dps.maxSize > 0 { + maxSize = dps.maxSize + } + + var totalSize int + for rangeID, snap := range dps.m { + var size int + for _, b := range snap.IncomingSnapshot.Batches { + size += len(b) + } + totalSize += size + // Delete snapshots that we see once more than maxSize memory is + // allocated. Also delete all snapshots that have been sitting around + // for 20 seconds or more. + if totalSize > maxSize || now.Sub(snap.Received) > maxAge { + numDeleted++ + bytesDeleted += size + delete(dps.m, rangeID) + } + } + return bytesDeleted, numDeleted +} + +func (dps *delayedPreemptiveSnaps) bytes() int { + dps.mu.Lock() + defer dps.mu.Unlock() + + var totalSize int + for _, snap := range dps.m { + for _, b := range snap.IncomingSnapshot.Batches { + totalSize += len(b) + } + } + return totalSize +} + // incomingSnapshotStream is the minimal interface on a GRPC stream required // to receive a snapshot over the network. type incomingSnapshotStream interface { @@ -408,16 +487,8 @@ func (s *Store) reserveSnapshot( // The authoritative bool determines whether the check is carried out with the // intention of actually applying the snapshot (in which case an existing replica // must exist and have its raftMu locked) or as a preliminary check. -func (s *Store) canApplySnapshot( - ctx context.Context, snapHeader *SnapshotRequest_Header, authoritative bool, -) (*ReplicaPlaceholder, error) { - s.mu.Lock() - defer s.mu.Unlock() - return s.canApplySnapshotLocked(ctx, snapHeader, authoritative) -} - func (s *Store) canApplySnapshotLocked( - ctx context.Context, snapHeader *SnapshotRequest_Header, authoritative bool, + ctx context.Context, preemptive bool, snapHeader *SnapshotRequest_Header, authoritative bool, ) (*ReplicaPlaceholder, error) { desc := *snapHeader.State.Desc @@ -457,7 +528,7 @@ func (s *Store) canApplySnapshotLocked( existingRepl.mu.RUnlock() if existingIsInitialized { - if !snapHeader.IsPreemptive() { + if !preemptive { // Regular Raft snapshots can't be refused at this point, // even if they widen the existing replica. See the comments // in Replica.maybeAcquireSnapshotMergeLock for how this is @@ -502,7 +573,7 @@ func (s *Store) canApplySnapshotLocked( // sure enough that this couldn't happen by accident to GC the // replica ourselves - the replica GC queue will perform the proper // check). - } else if snapHeader.IsPreemptive() { + } else if preemptive { // Morally, the existing replica now has a nonzero replica ID // because we already know that it is not initialized (i.e. has no // data). Interestingly, the case in which it has a zero replica ID @@ -618,15 +689,27 @@ func (s *Store) receiveSnapshot( } defer cleanup() + // At this stage, we detect preemptive snapshots by the fact that they are + // addressed to replicaID zero. Note that this won't be true when the snapshot + // is applied as a delayed preemptive snapshot: at that point it will have a + // replicaID (but we'll instead use IncomingSnapshot.IsPreemptive()). + preemptive := header.RaftMessageRequest.ToReplica.ReplicaID == 0 + // Check to see if the snapshot can be applied but don't attempt to add // a placeholder here, because we're not holding the replica's raftMu. // We'll perform this check again later after receiving the rest of the // snapshot data - this is purely an optimization to prevent downloading // a snapshot that we know we won't be able to apply. - if _, err := s.canApplySnapshot(ctx, header, false /* authoritative */); err != nil { - return sendSnapshotError(stream, - errors.Wrapf(err, "%s,r%d: cannot apply snapshot", s, header.State.Desc.RangeID), - ) + { + s.mu.Lock() + _, err := s.canApplySnapshotLocked(ctx, preemptive, header, false /* authoritative */) + s.mu.Unlock() + + if err != nil { + return sendSnapshotError(stream, + errors.Wrapf(err, "%s,r%d: cannot apply snapshot", s, header.State.Desc.RangeID), + ) + } } // Determine which snapshot strategy the sender is using to send this @@ -656,8 +739,22 @@ func (s *Store) receiveSnapshot( if err != nil { return err } - if err := s.processRaftSnapshotRequest(ctx, header, inSnap); err != nil { - return sendSnapshotError(stream, errors.Wrap(err.GoError(), "failed to apply snapshot")) + + if preemptive { + now := timeutil.Now() + // Store the preemptive snapshot. It will be applied as a Raft snapshot + // right when the replica is created (with a replicaID). + var snap delayedPreemptiveSnap + snap.Header = *header + snap.IncomingSnapshot = inSnap + snap.Received = now + + s.delayedPreemptiveSnaps.gc(now) + s.delayedPreemptiveSnaps.insert(header.State.Desc.RangeID, snap) + } else { + if err := s.processRaftSnapshotRequest(ctx, header, inSnap); err != nil { + return sendSnapshotError(stream, errors.Wrap(err.GoError(), "failed to apply snapshot")) + } } return stream.Send(&SnapshotResponse{Status: SnapshotResponse_APPLIED}) diff --git a/pkg/storage/store_snapshot_test.go b/pkg/storage/store_snapshot_test.go index a78d00e3c41f..3804251a3398 100644 --- a/pkg/storage/store_snapshot_test.go +++ b/pkg/storage/store_snapshot_test.go @@ -135,13 +135,16 @@ func TestSnapshotPreemptiveOnUninitializedReplica(t *testing.T) { header := &SnapshotRequest_Header{} header.State.Desc = &desc + inSnap := IncomingSnapshot{snapType: snapTypeLocalDelayedPreemptive} - if !header.IsPreemptive() { + if !inSnap.IsPreemptive() { t.Fatal("mock snapshot isn't preemptive") } - if _, err := store.canApplySnapshot( - ctx, header, true, /* authoritative */ + store.mu.Lock() + defer store.mu.Unlock() + if _, err := store.canApplySnapshotLocked( + ctx, inSnap.IsPreemptive(), header, true, /* authoritative */ ); !testutils.IsError(err, "intersects existing range") { t.Fatal(err) } diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index b3adac9c03c5..6ccabe9d2888 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -2798,6 +2798,7 @@ func TestStoreRangePlaceholders(t *testing.T) { // Test that we remove snapshot placeholders on error conditions. func TestStoreRemovePlaceholderOnError(t *testing.T) { defer leaktest.AfterTest(t)() + tc := testContext{} stopper := stop.NewStopper() defer stopper.Stop(context.TODO()) @@ -2835,7 +2836,7 @@ func TestStoreRemovePlaceholderOnError(t *testing.T) { ToReplica: roachpb.ReplicaDescriptor{ NodeID: 1, StoreID: 1, - ReplicaID: 0, + ReplicaID: 17, // Raft snapshot }, FromReplica: roachpb.ReplicaDescriptor{ NodeID: 2, @@ -2850,7 +2851,7 @@ func TestStoreRemovePlaceholderOnError(t *testing.T) { }, }, } - const expected = "preemptive snapshot from term 0 received" + const expected = "snapshot from term 0 received" if err := s.processRaftSnapshotRequest(ctx, snapHeader, IncomingSnapshot{ SnapUUID: uuid.MakeV4(), @@ -2931,6 +2932,7 @@ func TestStoreRemovePlaceholderOnRaftIgnored(t *testing.T) { }, Message: raftpb.Message{ Type: raftpb.MsgSnap, + Term: 1, Snapshot: raftpb.Snapshot{ Data: data, Metadata: raftpb.SnapshotMetadata{ diff --git a/pkg/storage/txnwait/txnqueue.go b/pkg/storage/txnwait/txnqueue.go index a43934f6d2ec..65074804a914 100644 --- a/pkg/storage/txnwait/txnqueue.go +++ b/pkg/storage/txnwait/txnqueue.go @@ -643,7 +643,7 @@ func (q *Queue) MaybeWaitForQuery( q.mu.Lock() // If the txn wait queue is not enabled or if the request is not - // contained within the replica, do nothing. The request can fall + // contained within the replica, do nothing. The request can fallp // outside of the replica after a split or merge. Note that the // ContainsKey check is done under the txn wait queue's lock to // ensure that it's not cleared before an incorrect insertion happens.