Skip to content

Commit

Permalink
storage: delay application of preemptive snapshots
Browse files Browse the repository at this point in the history
Currently, in-memory Replica objects can end up having a replicaID zero.
Roughly speaking, this is always the case when a Replica's range
descriptor does not contain the Replica's store, though sometimes we do
have a replicaID taken from incoming Raft messages (which then won't
survive across a restart).

We end up in this unnatural state mostly due to preemptive snapshots,
which are a snapshot of the Range state before adding a certain replica,
sent to the store that will house that replica once the configuration
change to add it has completed. The range descriptor in the snapshot
cannot yet assign the Replica a proper replicaID because none has been
allocated yet (and this allocation has to be done in the replica change
transaction, which hasn't started yet).

Even when the configuration change completes and the leader starts
"catching up" the preemptive snapshot and informs it of the replicaID,
it will take a few moments until the Replica catches up to the log entry
that actually updates the descriptor. If the node reboots before that
log entry is durably applied, the replicaID will "restart" at zero until
the leader contacts the Replica again.

This suggests that preemptive snapshots introduce fundamental complexity
which we'd like to avoid - as long as we use preemptive snapshots there
will not be sanity in this department.

This PR introduces a mechanism which delays the application of
preemptive snapshots so that we apply them only when the first request
*after* the completed configuration change comes in (at which point a
replicaID is present).

Superficially, this seems to solve the above problem (since the Replica
will only be instantiated the moment a replicaID is known), though it
doesn't do so across restarts.

However, if we synchronously persisted (not done in this PR) the
replicaID from incoming Raft messages whenever it changed, it seems that
we should always be able to assign a replicaID when creating a Replica,
even when dealing with descriptors that don't contain the replica itself
(since there would've been a Raft message with a replicaID at some
point, and we persist that). This roughly corresponds to persisting
`Replica.lastToReplica`.

We ultimately want to switch to learner replicas instead of preemptive
snapshots. Learner replicas have the advantage that they are always
represented in the replica descriptor, and so the snapshot that
initializes them will be a proper Raft snapshot containing a descriptor
containing the learner Replica itself. However, it's clear that we need
to continue supporting preemptive snapshots in 19.2 due to the need to
support mixed 19.1/19.2 clusters.

This PR in conjunction with persisting the replicaID (and auxiliary
work, for example on the split lock which currently also creates a
replica with replicaID zero and which we know [has bugs]) should allow
us to remove replicaID zero from the code base without waiting out the
19.1 release cycle.

[has bugs]: #21146

Release note: None
  • Loading branch information
tbg committed Apr 9, 2019
1 parent 57274fc commit b8fc30a
Show file tree
Hide file tree
Showing 11 changed files with 359 additions and 254 deletions.
29 changes: 11 additions & 18 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1422,8 +1422,8 @@ func TestStoreRangeUpReplicate(t *testing.T) {
if normalApplied != 0 {
t.Fatalf("expected 0 normal snapshots, but found %d", normalApplied)
}
if generated != preemptiveApplied {
t.Fatalf("expected %d preemptive snapshots, but found %d", generated, preemptiveApplied)
if preemptiveApplied == 0 {
t.Fatalf("expected nonzero preemptive snapshots, but found none")
}
}

Expand Down Expand Up @@ -1472,11 +1472,6 @@ func TestUnreplicateFirstRange(t *testing.T) {

// TestChangeReplicasDescriptorInvariant tests that a replica change aborts if
// another change has been made to the RangeDescriptor since it was initiated.
//
// TODO(tschottdorf): If this test is flaky because the snapshot count does not
// increase, it's likely because with proposer-evaluated KV, less gets proposed
// and so sometimes Raft discards the preemptive snapshot (though we count that
// case in stats already) or doesn't produce a Ready.
func TestChangeReplicasDescriptorInvariant(t *testing.T) {
defer leaktest.AfterTest(t)()
mtc := &multiTestContext{
Expand Down Expand Up @@ -1531,16 +1526,15 @@ func TestChangeReplicasDescriptorInvariant(t *testing.T) {
t.Fatalf("got unexpected error: %v", err)
}

testutils.SucceedsSoon(t, func() error {
after := mtc.stores[2].Metrics().RangeSnapshotsPreemptiveApplied.Count()
// The failed ChangeReplicas call should have applied a preemptive snapshot.
if after != before+1 {
return errors.Errorf(
"ChangeReplicas call should have applied a preemptive snapshot, before %d after %d",
before, after)
}
return nil
})
after := mtc.stores[2].Metrics().RangeSnapshotsPreemptiveApplied.Count()
// The failed ChangeReplicas call should not have applied a preemptive
// snapshot. It should've sent it to the target replica, but since we're
// delaying preemptive snapshots, it would never have been applied.
if after != before {
t.Fatalf(
"ChangeReplicas call should not have applied a preemptive snapshot, before %d after %d",
before, after)
}

before = mtc.stores[2].Metrics().RangeSnapshotsPreemptiveApplied.Count()
// Add to third store with fresh descriptor.
Expand All @@ -1550,7 +1544,6 @@ func TestChangeReplicasDescriptorInvariant(t *testing.T) {

testutils.SucceedsSoon(t, func() error {
after := mtc.stores[2].Metrics().RangeSnapshotsPreemptiveApplied.Count()
// The failed ChangeReplicas call should have applied a preemptive snapshot.
if after != before+1 {
return errors.Errorf(
"ChangeReplicas call should have applied a preemptive snapshot, before %d after %d",
Expand Down
42 changes: 25 additions & 17 deletions pkg/storage/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,16 @@ var (
}
metaRangeSnapshotsPreemptiveApplied = metric.Metadata{
Name: "range.snapshots.preemptive-applied",
Help: "Number of applied pre-emptive snapshots",
Help: "Number of applied (delayed or undelayed) pre-emptive snapshots",
Measurement: "Snapshots",
Unit: metric.Unit_COUNT,
}
metaRangeSnapshotsPreemptiveDelayedBytes = metric.Metadata{
Name: "range.snapshots.preemptive-delayed-bytes",
Help: "Bytes held in-memory in delayed preemptive snapshots",
Measurement: "Memory",
Unit: metric.Unit_COUNT,
}
metaRangeRaftLeaderTransfers = metric.Metadata{
Name: "range.raftleadertransfers",
Help: "Number of raft leader transfers",
Expand Down Expand Up @@ -1040,14 +1046,15 @@ type StoreMetrics struct {
// accordingly.

// Range event metrics.
RangeSplits *metric.Counter
RangeMerges *metric.Counter
RangeAdds *metric.Counter
RangeRemoves *metric.Counter
RangeSnapshotsGenerated *metric.Counter
RangeSnapshotsNormalApplied *metric.Counter
RangeSnapshotsPreemptiveApplied *metric.Counter
RangeRaftLeaderTransfers *metric.Counter
RangeSplits *metric.Counter
RangeMerges *metric.Counter
RangeAdds *metric.Counter
RangeRemoves *metric.Counter
RangeSnapshotsGenerated *metric.Counter
RangeSnapshotsNormalApplied *metric.Counter
RangeSnapshotsPreemptiveApplied *metric.Counter
RangeSnapshotsPreemptiveDelayedBytes *metric.Gauge
RangeRaftLeaderTransfers *metric.Counter

// Raft processing metrics.
RaftTicks *metric.Counter
Expand Down Expand Up @@ -1249,14 +1256,15 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
RdbNumSSTables: metric.NewGauge(metaRdbNumSSTables),

// Range event metrics.
RangeSplits: metric.NewCounter(metaRangeSplits),
RangeMerges: metric.NewCounter(metaRangeMerges),
RangeAdds: metric.NewCounter(metaRangeAdds),
RangeRemoves: metric.NewCounter(metaRangeRemoves),
RangeSnapshotsGenerated: metric.NewCounter(metaRangeSnapshotsGenerated),
RangeSnapshotsNormalApplied: metric.NewCounter(metaRangeSnapshotsNormalApplied),
RangeSnapshotsPreemptiveApplied: metric.NewCounter(metaRangeSnapshotsPreemptiveApplied),
RangeRaftLeaderTransfers: metric.NewCounter(metaRangeRaftLeaderTransfers),
RangeSplits: metric.NewCounter(metaRangeSplits),
RangeMerges: metric.NewCounter(metaRangeMerges),
RangeAdds: metric.NewCounter(metaRangeAdds),
RangeRemoves: metric.NewCounter(metaRangeRemoves),
RangeSnapshotsGenerated: metric.NewCounter(metaRangeSnapshotsGenerated),
RangeSnapshotsNormalApplied: metric.NewCounter(metaRangeSnapshotsNormalApplied),
RangeSnapshotsPreemptiveApplied: metric.NewCounter(metaRangeSnapshotsPreemptiveApplied),
RangeSnapshotsPreemptiveDelayedBytes: metric.NewGauge(metaRangeSnapshotsPreemptiveDelayedBytes),
RangeRaftLeaderTransfers: metric.NewCounter(metaRangeRaftLeaderTransfers),

// Raft processing metrics.
RaftTicks: metric.NewCounter(metaRaftTicks),
Expand Down
8 changes: 0 additions & 8 deletions pkg/storage/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,6 @@ func raftEntryFormatter(data []byte) string {
return fmt.Sprintf("[%x] [%d]", commandID, len(data))
}

// IsPreemptive returns whether this is a preemptive snapshot or a Raft
// snapshot.
func (h *SnapshotRequest_Header) IsPreemptive() bool {
// Preemptive snapshots are addressed to replica ID 0. No other requests to
// replica ID 0 are allowed.
return h.RaftMessageRequest.ToReplica.ReplicaID == 0
}

// traceEntries records the provided event for all proposals corresponding
// to the entries contained in ents. The vmodule level for raft must be at
// least 1.
Expand Down
9 changes: 6 additions & 3 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -934,9 +934,12 @@ func (r *Replica) sendSnapshot(
FromReplica: fromRepDesc,
ToReplica: repDesc,
Message: raftpb.Message{
Type: raftpb.MsgSnap,
To: uint64(repDesc.ReplicaID),
From: uint64(fromRepDesc.ReplicaID),
Type: raftpb.MsgSnap,
To: uint64(repDesc.ReplicaID),
From: uint64(fromRepDesc.ReplicaID),
// TODO(tbg): is it kosher to pick a random Term from a RaftStatus
// and to stick that in a message? Should this term match that of
// the HardState in the snapshot from which the data was taken?
Term: status.Term,
Snapshot: snap.RaftSnap,
},
Expand Down
13 changes: 12 additions & 1 deletion pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -2122,9 +2122,20 @@ func (r *Replica) maybeAcquireSnapshotMergeLock(
// left-hand neighbor has no pending merges to apply. And that RHS replica
// could not have been further split or merged, as it never processes another
// command after the merge commits.
//
// Note that this reasoning does not extend to preemptive snapshots. If a range
// starts out as [a,b), then merges [b,d), creates a preemptive snapshot S,
// splits into [a,b)', [b,c), and [b,d), then a node not involved in any of
// the previous activity could receive an up-to-date replica of [b,c), and
// then observe the preemptive snapshot S which must not be applied, for [b,c)
// is "more recent" and must not be destroyed. If the node has picked
// up a replica of `[a,b)'` by that time, the snapshot will be discarded by
// Raft. If it has a replica of `[a,b)` (i.e. before the merge), the snapshot
// again must be refused, for

endKey := r.Desc().EndKey
if endKey == nil {
// The existing replica is unitialized, in which case we've already
// The existing replica is uninitialized, in which case we've already
// installed a placeholder for snapshot's keyspace. No merge lock needed.
return nil, func() {}
}
Expand Down
22 changes: 18 additions & 4 deletions pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,11 @@ type IncomingSnapshot struct {
snapType string
}

// IsPreemptive returns whether this is a delayed or undelayed preemptive snapshot.
func (is *IncomingSnapshot) IsPreemptive() bool {
return is.snapType != snapTypeRaft
}

// snapshot creates an OutgoingSnapshot containing a rocksdb snapshot for the
// given range. Note that snapshot() is called without Replica.raftMu held.
func snapshot(
Expand Down Expand Up @@ -667,8 +672,9 @@ func (r *Replica) updateRangeInfo(desc *roachpb.RangeDescriptor) error {
}

const (
snapTypeRaft = "Raft"
snapTypePreemptive = "preemptive"
snapTypeRaft = "Raft"
snapTypePreemptive = "preemptive"
snapTypeLocalDelayedPreemptive = "delayed preemptive"
)

func clearRangeData(
Expand Down Expand Up @@ -767,10 +773,18 @@ func (r *Replica) applySnapshot(
snapType := inSnap.snapType
defer func() {
if err == nil {
if snapType == snapTypeRaft {
switch snapType {
case snapTypeRaft:
r.store.metrics.RangeSnapshotsNormalApplied.Inc(1)
} else {
case snapTypePreemptive:
log.Fatalf(ctx, "unexpectedly asked to apply an undelayed preemptive snapshot")
case snapTypeLocalDelayedPreemptive:
// We treat application of a delayed preemptive snapshot as an
// application of a preemptive snapshot as far as stats are
// concerned.
r.store.metrics.RangeSnapshotsPreemptiveApplied.Inc(1)
default:
log.Fatalf(ctx, "unknown snapshot type %s", snapType)
}
}
}()
Expand Down
Loading

0 comments on commit b8fc30a

Please sign in to comment.