Skip to content

Commit

Permalink
storage: be more resilient to learner snap conflicts
Browse files Browse the repository at this point in the history
The replica addition code first adds it as a raft learner, then hands it
a snapshot, then promotes it to a voter. For various unfortunate reasons
described in the code, we have to allow the raft snapshot queue to
_also_ send snapshots to learners. A recent etcd change exposed that
this code has always been brittle to the raft snapshot queue winning the
race and starting the snapshot first by making the race dramatically
more likely.

After this commit, learner replica addition grabs a (best effort) lock
before the conf change txn to add the learner is started. This prevents
the race when the raft leader (and thus the raft snapshot queue for that
range) is on the same node.

Closes #40207

Release note: None
  • Loading branch information
danhhz committed Sep 4, 2019
1 parent 92230f0 commit 20ba7bc
Show file tree
Hide file tree
Showing 13 changed files with 122 additions and 60 deletions.
4 changes: 2 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 5 additions & 6 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,12 @@ ignored = [

[[constraint]]
name = "go.etcd.io/etcd"
# We're stopping just shy of 4a2b4c8f7e0a3754fdd5a3341a27c2431c2a5385
# which picks up a fix to an inefficiency that at the time of writing
# triggers a bug:
# https://github.com/cockroachdb/cockroach/issues/40207
#
# The last time this was bumped forward, it was for a targeted fix of #40207,
# which temporarily prevented us from being compatible with etcd's HEAD. The
# PR fixing 40207 bumped it forward exactly one commit to minimize unrelated
# fallout. Feel free to move this back to `branch = "master"` at any time.
# branch = "master"
revision = "9b29151d3072511f574e7272a5348504086013fa"
revision = "4a2b4c8f7e0a3754fdd5a3341a27c2431c2a5385"

# Used for the API client; we want the latest.
[[constraint]]
Expand Down
16 changes: 8 additions & 8 deletions pkg/roachpb/metadata_replicas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,45 +143,45 @@ func TestReplicaDescriptorsConfState(t *testing.T) {
}{
{
[]ReplicaDescriptor{rd(v, 1)},
"Voters:[1] VotersOutgoing:[] Learners:[] LearnersNext:[]",
"Voters:[1] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false",
},
// Make sure nil is treated like VoterFull.
{
[]ReplicaDescriptor{rd(vn, 1)},
"Voters:[1] VotersOutgoing:[] Learners:[] LearnersNext:[]",
"Voters:[1] VotersOutgoing:[] Learners:[] LearnersNext:[] AutoLeave:false",
},
{
[]ReplicaDescriptor{rd(l, 1), rd(vn, 2)},
"Voters:[2] VotersOutgoing:[] Learners:[1] LearnersNext:[]",
"Voters:[2] VotersOutgoing:[] Learners:[1] LearnersNext:[] AutoLeave:false",
},
// First joint case. We're adding n3 (via atomic replication changes), so the outgoing
// config we have to get rid of consists only of n2 (even though n2 remains a voter).
// Note that we could simplify this config so that it's not joint, but raft expects
// the config exactly as described by the descriptor so we don't try.
{
[]ReplicaDescriptor{rd(l, 1), rd(v, 2), rd(vi, 3)},
"Voters:[2 3] VotersOutgoing:[2] Learners:[1] LearnersNext:[]",
"Voters:[2 3] VotersOutgoing:[2] Learners:[1] LearnersNext:[] AutoLeave:false",
},
// More complex joint change: a replica swap, switching out n4 for n3 from the initial
// set of voters n2, n4 (plus learner n1 before and after).
{
[]ReplicaDescriptor{rd(l, 1), rd(v, 2), rd(vi, 3), rd(vo, 4)},
"Voters:[2 3] VotersOutgoing:[2 4] Learners:[1] LearnersNext:[]",
"Voters:[2 3] VotersOutgoing:[2 4] Learners:[1] LearnersNext:[] AutoLeave:false",
},
// Upreplicating from n1,n2 to n1,n2,n3,n4.
{
[]ReplicaDescriptor{rd(v, 1), rd(v, 2), rd(vi, 3), rd(vi, 4)},
"Voters:[1 2 3 4] VotersOutgoing:[1 2] Learners:[] LearnersNext:[]",
"Voters:[1 2 3 4] VotersOutgoing:[1 2] Learners:[] LearnersNext:[] AutoLeave:false",
},
// Downreplicating from n1,n2,n3,n4 to n1,n2.
{
[]ReplicaDescriptor{rd(v, 1), rd(v, 2), rd(vo, 3), rd(vo, 4)},
"Voters:[1 2] VotersOutgoing:[1 2 3 4] Learners:[] LearnersNext:[]",
"Voters:[1 2] VotersOutgoing:[1 2 3 4] Learners:[] LearnersNext:[] AutoLeave:false",
},
// Completely switching to a new set of replicas: n1,n2 to n4,n5. Throw a learner in for fun.
{
[]ReplicaDescriptor{rd(vo, 1), rd(vo, 2), rd(vi, 3), rd(vi, 4), rd(l, 5)},
"Voters:[3 4] VotersOutgoing:[1 2] Learners:[5] LearnersNext:[]",
"Voters:[3 4] VotersOutgoing:[1 2] Learners:[5] LearnersNext:[] AutoLeave:false",
},
}

Expand Down
15 changes: 5 additions & 10 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1568,16 +1568,11 @@ func TestStoreRangeUpReplicate(t *testing.T) {
replicaCount++
return true
})
// It's hard to make generalizations about exactly how many snapshots happen
// of each type. Almost all of them are learner snaps, but there is a race
// where the raft snapshot queue sometimes starts the snapshot first. Further,
// if the raft snapshot is at a higher index, we may even reject the learner
// snap. We definitely get at least one snapshot per replica and the race is
// rare enough that the majority of them should be learner snaps.
if expected := 2 * replicaCount; expected < learnerApplied+raftApplied {
t.Fatalf("expected at least %d snapshots, but found %d learner snaps and %d raft snaps",
expected, learnerApplied, raftApplied)
}
// We upreplicate each range (once each for n2 and n3), so there should be
// exactly 2 * replica learner snaps, one per upreplication.
require.Equal(t, 2*replicaCount, learnerApplied)
// Ideally there would be zero raft snaps, but etcd/raft is picky about
// getting a snapshot at exactly the index it asked for.
if raftApplied > learnerApplied {
t.Fatalf("expected more learner snaps %d than raft snaps %d", learnerApplied, raftApplied)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ func newTruncateDecision(ctx context.Context, r *Replica) (truncateDecision, err
raftStatus := r.raftStatusRLocked()

firstIndex, err := r.raftFirstIndexLocked()
pendingSnapshotIndex := r.getAndGCSnapshotLogTruncationConstraintsLocked(now)
const anyRecipientStore roachpb.StoreID = 0
pendingSnapshotIndex := r.getAndGCSnapshotLogTruncationConstraintsLocked(now, anyRecipientStore)
lastIndex := r.mu.lastIndex
logSizeTrusted := r.mu.raftLogSizeTrusted
r.mu.Unlock()
Expand Down
10 changes: 6 additions & 4 deletions pkg/storage/raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,14 +660,15 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) {

ctx := context.Background()
r := &Replica{}
var storeID roachpb.StoreID
id1, id2 := uuid.MakeV4(), uuid.MakeV4()
const (
index1 = 50
index2 = 60
)

// Add first constraint.
r.addSnapshotLogTruncationConstraintLocked(ctx, id1, index1)
r.addSnapshotLogTruncationConstraintLocked(ctx, id1, index1, storeID)
exp1 := map[uuid.UUID]snapTruncationInfo{id1: {index: index1}}

// Make sure it registered.
Expand All @@ -676,14 +677,15 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) {
// Add another constraint with the same id. Extremely unlikely in practice
// but we want to make sure it doesn't blow anything up. Collisions are
// handled by ignoring the colliding update.
r.addSnapshotLogTruncationConstraintLocked(ctx, id1, index2)
r.addSnapshotLogTruncationConstraintLocked(ctx, id1, index2, storeID)
assert.Equal(t, r.mu.snapshotLogTruncationConstraints, exp1)

// Helper that grabs the min constraint index (which can trigger GC as a
// byproduct) and asserts.
assertMin := func(exp uint64, now time.Time) {
t.Helper()
if maxIndex := r.getAndGCSnapshotLogTruncationConstraintsLocked(now); maxIndex != exp {
const anyRecipientStore roachpb.StoreID = 0
if maxIndex := r.getAndGCSnapshotLogTruncationConstraintsLocked(now, anyRecipientStore); maxIndex != exp {
t.Fatalf("unexpected max index %d, wanted %d", maxIndex, exp)
}
}
Expand All @@ -694,7 +696,7 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) {

// Add another, higher, index. We're not going to notice it's around
// until the lower one disappears.
r.addSnapshotLogTruncationConstraintLocked(ctx, id2, index2)
r.addSnapshotLogTruncationConstraintLocked(ctx, id2, index2, storeID)

now := timeutil.Now()
// The colliding snapshot comes back. Or the original, we can't tell.
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/raft_snapshot_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,15 @@ func (rq *raftSnapshotQueue) processRaftSnapshot(
if !ok {
return errors.Errorf("%s: replica %d not present in %v", repl, id, desc.Replicas())
}
snapType := SnapshotRequest_RAFT

// A learner replica is either getting a snapshot of type LEARNER by the node
// that's adding it or it's been orphaned and it's about to be cleaned up by
// the replicate queue. Either way, no point in also sending it a snapshot of
// type RAFT.
if repDesc.GetType() == roachpb.LEARNER {
if index := repl.getAndGCSnapshotLogTruncationConstraints(timeutil.Now()); index > 0 {
snapType = SnapshotRequest_LEARNER
if index := repl.getAndGCSnapshotLogTruncationConstraints(timeutil.Now(), repDesc.StoreID); index > 0 {
// There is a snapshot being transferred. It's probably a LEARNER snap, so
// bail for now and try again later.
err := errors.Errorf(
Expand All @@ -135,7 +137,7 @@ func (rq *raftSnapshotQueue) processRaftSnapshot(
}
}

err := repl.sendSnapshot(ctx, repDesc, SnapshotRequest_RAFT, SnapshotRequest_RECOVERY)
err := repl.sendSnapshot(ctx, repDesc, snapType, SnapshotRequest_RECOVERY)

// NB: if the snapshot fails because of an overlapping replica on the
// recipient which is also waiting for a snapshot, the "smart" thing is to
Expand Down
60 changes: 49 additions & 11 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
crdberrors "github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -950,12 +952,26 @@ func (r *Replica) changeReplicasImpl(
}

if adds := chgs.Additions(); len(adds) > 0 {
// Lock learner snapshots even before we run the ConfChange txn to add them
// to prevent a race with the raft snapshot queue trying to send it first.
// Note that this lock needs to cover sending the snapshots which happens in
_ = r.atomicReplicationChange
// which also has some more details on what's going on here.
//
// Also note that the lock only prevents the raft snapshot queue from
// sending snapshots to learner replicas, it will still send them to voters.
// There are more details about this locking in
_ = (*raftSnapshotQueue)(nil).processRaftSnapshot
// as well as a TODO about fixing all this to be less subtle and brittle.
releaseSnapshotLockFn := r.lockLearnerSnapshot(ctx, adds)
defer releaseSnapshotLockFn()

// For all newly added nodes, first add raft learner replicas. They accept raft traffic
// (so they can catch up) but don't get to vote (so they don't affect quorum and thus
// don't introduce fragility into the system). For details see:
_ = roachpb.ReplicaDescriptors.Learners
var err error
desc, err = addLearnerReplicas(ctx, r.store, desc, reason, details, chgs.Additions())
desc, err = addLearnerReplicas(ctx, r.store, desc, reason, details, adds)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1092,6 +1108,32 @@ func addLearnerReplicas(
return desc, nil
}

// lockLearnerSnapshot stops the raft snapshot queue from sending snapshots to
// the soon-to-be added learner replicas to prevent duplicate snapshots from
// being sent. This lock is best effort because it times out and it is a node
// local lock while the raft snapshot queue might be running on a different
// node. An idempotent unlock function is returned.
func (r *Replica) lockLearnerSnapshot(
ctx context.Context, additions []roachpb.ReplicationTarget,
) (unlock func()) {
// TODO(dan): The way this works is hacky, but it was added at the last minute
// in 19.2 to work around a commit in etcd/raft that made this race more
// likely. It'd be nice if all learner snapshots could be sent from a single
// place.
var lockUUIDs []uuid.UUID
for _, addition := range additions {
lockUUID := uuid.MakeV4()
lockUUIDs = append(lockUUIDs, lockUUID)
r.addSnapshotLogTruncationConstraint(ctx, lockUUID, 1, addition.StoreID)
}
return func() {
now := timeutil.Now()
for _, lockUUID := range lockUUIDs {
r.completeSnapshotLogTruncationConstraint(ctx, lockUUID, now)
}
}
}

// atomicReplicationChange carries out the atomic membership change that
// finalizes the addition and/or removal of replicas. Any voters in the process
// of being added (as reflected by the replication changes) must have been added
Expand Down Expand Up @@ -1132,15 +1174,11 @@ func (r *Replica) atomicReplicationChange(

// Note that raft snapshot queue will refuse to send a snapshot to a learner
// replica if its store is already sending a snapshot to that replica. That
// races with this snapshot. Most of the time, this side will win the race,
// which avoids needlessly sending the snapshot twice. If the raft snapshot
// queue wins, it's wasteful, but doesn't impact correctness.
//
// Replicas are added to the raft snapshot queue by the raft leader. This
// code can be run anywhere (though it's usually run on the leaseholder,
// which is usually co-located with the raft leader). This means that
// they're usually on the same node, but not always, so that's about as good
// a guarantee as we can offer, anyway.
// would race with this snapshot, except that we've put a (best effort) lock
// on it before the conf change txn was run. This is best effort because the
// lock can time out and the lock is local to this node, while the raft
// leader could be on another node entirely (they're usually co-located but
// this is not guaranteed).
//
// We originally tried always refusing to send snapshots from the raft
// snapshot queue to learner replicas, but this turned out to be brittle.
Expand Down Expand Up @@ -1628,7 +1666,7 @@ func (r *Replica) sendSnapshot(
}
}()

snap, err := r.GetSnapshot(ctx, snapType)
snap, err := r.GetSnapshot(ctx, snapType, recipient.StoreID)
if err != nil {
return errors.Wrapf(err, "%s: failed to generate %s snapshot", r, snapType)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ func TestLearnerReplicateQueueRace(t *testing.T) {
// added.
<-blockUntilSnapshotCh

// Removes the learner on node 3 out from under the replicate queue. This
// Remove the learner on node 3 out from under the replicate queue. This
// simulates a second replicate queue running concurrently. The first thing
// this second replicate queue would do is remove any learners it sees,
// leaving the 2 voters.
Expand Down
41 changes: 32 additions & 9 deletions pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1204,12 +1204,21 @@ func (r *Replica) reportSnapshotStatus(ctx context.Context, to roachpb.ReplicaID
}

type snapTruncationInfo struct {
index uint64
deadline time.Time
index uint64
recipientStore roachpb.StoreID
deadline time.Time
}

func (r *Replica) addSnapshotLogTruncationConstraint(
ctx context.Context, snapUUID uuid.UUID, index uint64, recipientStore roachpb.StoreID,
) {
r.mu.Lock()
defer r.mu.Unlock()
r.addSnapshotLogTruncationConstraintLocked(ctx, snapUUID, index, recipientStore)
}

func (r *Replica) addSnapshotLogTruncationConstraintLocked(
ctx context.Context, snapUUID uuid.UUID, index uint64,
ctx context.Context, snapUUID uuid.UUID, index uint64, recipientStore roachpb.StoreID,
) {
if r.mu.snapshotLogTruncationConstraints == nil {
r.mu.snapshotLogTruncationConstraints = make(map[uuid.UUID]snapTruncationInfo)
Expand All @@ -1224,35 +1233,46 @@ func (r *Replica) addSnapshotLogTruncationConstraintLocked(
return
}

r.mu.snapshotLogTruncationConstraints[snapUUID] = snapTruncationInfo{index: index}
r.mu.snapshotLogTruncationConstraints[snapUUID] = snapTruncationInfo{
index: index,
recipientStore: recipientStore,
}
}

// completeSnapshotLogTruncationConstraint marks the given snapshot as finished,
// releasing the lock on raft log truncation after a grace period.
func (r *Replica) completeSnapshotLogTruncationConstraint(
ctx context.Context, snapUUID uuid.UUID, now time.Time,
) {
deadline := now.Add(raftLogQueuePendingSnapshotGracePeriod)

r.mu.Lock()
defer r.mu.Unlock()

item, ok := r.mu.snapshotLogTruncationConstraints[snapUUID]
if !ok {
// UUID collision while adding the snapshot in originally. Nothing
// else to do.
return
}

deadline := now.Add(raftLogQueuePendingSnapshotGracePeriod)
item.deadline = deadline
r.mu.snapshotLogTruncationConstraints[snapUUID] = item
}

func (r *Replica) getAndGCSnapshotLogTruncationConstraints(now time.Time) (minSnapIndex uint64) {
// getAndGCSnapshotLogTruncationConstraints returns the minimum index of any
// currently outstanding snapshot being sent from this replica to the specified
// recipient or 0 if there isn't one. Passing 0 for recipientStore means any
// recipient.
func (r *Replica) getAndGCSnapshotLogTruncationConstraints(
now time.Time, recipientStore roachpb.StoreID,
) (minSnapIndex uint64) {
r.mu.Lock()
defer r.mu.Unlock()
return r.getAndGCSnapshotLogTruncationConstraintsLocked(now)
return r.getAndGCSnapshotLogTruncationConstraintsLocked(now, recipientStore)
}

func (r *Replica) getAndGCSnapshotLogTruncationConstraintsLocked(
now time.Time,
now time.Time, recipientStore roachpb.StoreID,
) (minSnapIndex uint64) {
for snapUUID, item := range r.mu.snapshotLogTruncationConstraints {
if item.deadline != (time.Time{}) && item.deadline.Before(now) {
Expand All @@ -1261,6 +1281,9 @@ func (r *Replica) getAndGCSnapshotLogTruncationConstraintsLocked(
delete(r.mu.snapshotLogTruncationConstraints, snapUUID)
continue
}
if recipientStore != 0 && item.recipientStore != recipientStore {
continue
}
if minSnapIndex == 0 || minSnapIndex > item.index {
minSnapIndex = item.index
}
Expand Down
Loading

0 comments on commit 20ba7bc

Please sign in to comment.