Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: delay application of preemptive snapshots #35786

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 11 additions & 18 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1422,8 +1422,8 @@ func TestStoreRangeUpReplicate(t *testing.T) {
if normalApplied != 0 {
t.Fatalf("expected 0 normal snapshots, but found %d", normalApplied)
}
if generated != preemptiveApplied {
t.Fatalf("expected %d preemptive snapshots, but found %d", generated, preemptiveApplied)
if preemptiveApplied == 0 {
t.Fatalf("expected nonzero preemptive snapshots, but found none")
}
}

Expand Down Expand Up @@ -1472,11 +1472,6 @@ func TestUnreplicateFirstRange(t *testing.T) {

// TestChangeReplicasDescriptorInvariant tests that a replica change aborts if
// another change has been made to the RangeDescriptor since it was initiated.
//
// TODO(tschottdorf): If this test is flaky because the snapshot count does not
// increase, it's likely because with proposer-evaluated KV, less gets proposed
// and so sometimes Raft discards the preemptive snapshot (though we count that
// case in stats already) or doesn't produce a Ready.
func TestChangeReplicasDescriptorInvariant(t *testing.T) {
defer leaktest.AfterTest(t)()
mtc := &multiTestContext{
Expand Down Expand Up @@ -1531,16 +1526,15 @@ func TestChangeReplicasDescriptorInvariant(t *testing.T) {
t.Fatalf("got unexpected error: %v", err)
}

testutils.SucceedsSoon(t, func() error {
after := mtc.stores[2].Metrics().RangeSnapshotsPreemptiveApplied.Count()
// The failed ChangeReplicas call should have applied a preemptive snapshot.
if after != before+1 {
return errors.Errorf(
"ChangeReplicas call should have applied a preemptive snapshot, before %d after %d",
before, after)
}
return nil
})
after := mtc.stores[2].Metrics().RangeSnapshotsPreemptiveApplied.Count()
// The failed ChangeReplicas call should not have applied a preemptive
// snapshot. It should've sent it to the target replica, but since we're
// delaying preemptive snapshots, it would never have been applied.
if after != before {
t.Fatalf(
"ChangeReplicas call should not have applied a preemptive snapshot, before %d after %d",
before, after)
}

before = mtc.stores[2].Metrics().RangeSnapshotsPreemptiveApplied.Count()
// Add to third store with fresh descriptor.
Expand All @@ -1550,7 +1544,6 @@ func TestChangeReplicasDescriptorInvariant(t *testing.T) {

testutils.SucceedsSoon(t, func() error {
after := mtc.stores[2].Metrics().RangeSnapshotsPreemptiveApplied.Count()
// The failed ChangeReplicas call should have applied a preemptive snapshot.
if after != before+1 {
return errors.Errorf(
"ChangeReplicas call should have applied a preemptive snapshot, before %d after %d",
Expand Down
42 changes: 25 additions & 17 deletions pkg/storage/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,16 @@ var (
}
metaRangeSnapshotsPreemptiveApplied = metric.Metadata{
Name: "range.snapshots.preemptive-applied",
Help: "Number of applied pre-emptive snapshots",
Help: "Number of applied (delayed or undelayed) pre-emptive snapshots",
Measurement: "Snapshots",
Unit: metric.Unit_COUNT,
}
metaRangeSnapshotsPreemptiveDelayedBytes = metric.Metadata{
Name: "range.snapshots.preemptive-delayed-bytes",
Help: "Bytes held in-memory in delayed preemptive snapshots",
Measurement: "Memory",
Unit: metric.Unit_COUNT,
}
metaRangeRaftLeaderTransfers = metric.Metadata{
Name: "range.raftleadertransfers",
Help: "Number of raft leader transfers",
Expand Down Expand Up @@ -1040,14 +1046,15 @@ type StoreMetrics struct {
// accordingly.

// Range event metrics.
RangeSplits *metric.Counter
RangeMerges *metric.Counter
RangeAdds *metric.Counter
RangeRemoves *metric.Counter
RangeSnapshotsGenerated *metric.Counter
RangeSnapshotsNormalApplied *metric.Counter
RangeSnapshotsPreemptiveApplied *metric.Counter
RangeRaftLeaderTransfers *metric.Counter
RangeSplits *metric.Counter
RangeMerges *metric.Counter
RangeAdds *metric.Counter
RangeRemoves *metric.Counter
RangeSnapshotsGenerated *metric.Counter
RangeSnapshotsNormalApplied *metric.Counter
RangeSnapshotsPreemptiveApplied *metric.Counter
RangeSnapshotsPreemptiveDelayedBytes *metric.Gauge
RangeRaftLeaderTransfers *metric.Counter

// Raft processing metrics.
RaftTicks *metric.Counter
Expand Down Expand Up @@ -1249,14 +1256,15 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
RdbNumSSTables: metric.NewGauge(metaRdbNumSSTables),

// Range event metrics.
RangeSplits: metric.NewCounter(metaRangeSplits),
RangeMerges: metric.NewCounter(metaRangeMerges),
RangeAdds: metric.NewCounter(metaRangeAdds),
RangeRemoves: metric.NewCounter(metaRangeRemoves),
RangeSnapshotsGenerated: metric.NewCounter(metaRangeSnapshotsGenerated),
RangeSnapshotsNormalApplied: metric.NewCounter(metaRangeSnapshotsNormalApplied),
RangeSnapshotsPreemptiveApplied: metric.NewCounter(metaRangeSnapshotsPreemptiveApplied),
RangeRaftLeaderTransfers: metric.NewCounter(metaRangeRaftLeaderTransfers),
RangeSplits: metric.NewCounter(metaRangeSplits),
RangeMerges: metric.NewCounter(metaRangeMerges),
RangeAdds: metric.NewCounter(metaRangeAdds),
RangeRemoves: metric.NewCounter(metaRangeRemoves),
RangeSnapshotsGenerated: metric.NewCounter(metaRangeSnapshotsGenerated),
RangeSnapshotsNormalApplied: metric.NewCounter(metaRangeSnapshotsNormalApplied),
RangeSnapshotsPreemptiveApplied: metric.NewCounter(metaRangeSnapshotsPreemptiveApplied),
RangeSnapshotsPreemptiveDelayedBytes: metric.NewGauge(metaRangeSnapshotsPreemptiveDelayedBytes),
RangeRaftLeaderTransfers: metric.NewCounter(metaRangeRaftLeaderTransfers),

// Raft processing metrics.
RaftTicks: metric.NewCounter(metaRaftTicks),
Expand Down
8 changes: 0 additions & 8 deletions pkg/storage/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,6 @@ func raftEntryFormatter(data []byte) string {
return fmt.Sprintf("[%x] [%d]", commandID, len(data))
}

// IsPreemptive returns whether this is a preemptive snapshot or a Raft
// snapshot.
func (h *SnapshotRequest_Header) IsPreemptive() bool {
// Preemptive snapshots are addressed to replica ID 0. No other requests to
// replica ID 0 are allowed.
return h.RaftMessageRequest.ToReplica.ReplicaID == 0
}

// traceEntries records the provided event for all proposals corresponding
// to the entries contained in ents. The vmodule level for raft must be at
// least 1.
Expand Down
9 changes: 6 additions & 3 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -934,9 +934,12 @@ func (r *Replica) sendSnapshot(
FromReplica: fromRepDesc,
ToReplica: repDesc,
Message: raftpb.Message{
Type: raftpb.MsgSnap,
To: uint64(repDesc.ReplicaID),
From: uint64(fromRepDesc.ReplicaID),
Type: raftpb.MsgSnap,
To: uint64(repDesc.ReplicaID),
From: uint64(fromRepDesc.ReplicaID),
// TODO(tbg): is it kosher to pick a random Term from a RaftStatus
// and to stick that in a message? Should this term match that of
// the HardState in the snapshot from which the data was taken?
Term: status.Term,
Snapshot: snap.RaftSnap,
},
Expand Down
13 changes: 12 additions & 1 deletion pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -2122,9 +2122,20 @@ func (r *Replica) maybeAcquireSnapshotMergeLock(
// left-hand neighbor has no pending merges to apply. And that RHS replica
// could not have been further split or merged, as it never processes another
// command after the merge commits.
//
// Note that this reasoning does not extend to preemptive snapshots. If a range
// starts out as [a,b), then merges [b,d), creates a preemptive snapshot S,
// splits into [a,b)', [b,c), and [b,d), then a node not involved in any of
// the previous activity could receive an up-to-date replica of [b,c), and
// then observe the preemptive snapshot S which must not be applied, for [b,c)
// is "more recent" and must not be destroyed. If the node has picked
// up a replica of `[a,b)'` by that time, the snapshot will be discarded by
// Raft. If it has a replica of `[a,b)` (i.e. before the merge), the snapshot
// again must be refused, for

endKey := r.Desc().EndKey
if endKey == nil {
// The existing replica is unitialized, in which case we've already
// The existing replica is uninitialized, in which case we've already
// installed a placeholder for snapshot's keyspace. No merge lock needed.
return nil, func() {}
}
Expand Down
22 changes: 18 additions & 4 deletions pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,11 @@ type IncomingSnapshot struct {
snapType string
}

// IsPreemptive returns whether this is a delayed or undelayed preemptive snapshot.
func (is *IncomingSnapshot) IsPreemptive() bool {
return is.snapType != snapTypeRaft
}

// snapshot creates an OutgoingSnapshot containing a rocksdb snapshot for the
// given range. Note that snapshot() is called without Replica.raftMu held.
func snapshot(
Expand Down Expand Up @@ -667,8 +672,9 @@ func (r *Replica) updateRangeInfo(desc *roachpb.RangeDescriptor) error {
}

const (
snapTypeRaft = "Raft"
snapTypePreemptive = "preemptive"
snapTypeRaft = "Raft"
snapTypePreemptive = "preemptive"
snapTypeLocalDelayedPreemptive = "delayed preemptive"
)

func clearRangeData(
Expand Down Expand Up @@ -767,10 +773,18 @@ func (r *Replica) applySnapshot(
snapType := inSnap.snapType
defer func() {
if err == nil {
if snapType == snapTypeRaft {
switch snapType {
case snapTypeRaft:
r.store.metrics.RangeSnapshotsNormalApplied.Inc(1)
} else {
case snapTypePreemptive:
log.Fatalf(ctx, "unexpectedly asked to apply an undelayed preemptive snapshot")
case snapTypeLocalDelayedPreemptive:
// We treat application of a delayed preemptive snapshot as an
// application of a preemptive snapshot as far as stats are
// concerned.
r.store.metrics.RangeSnapshotsPreemptiveApplied.Inc(1)
default:
log.Fatalf(ctx, "unknown snapshot type %s", snapType)
}
}
}()
Expand Down
Loading