Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: handle preemptive snapshots that span merges #28683

Merged
merged 3 commits into from
Aug 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 155 additions & 0 deletions pkg/storage/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
)

func adminMergeArgs(key roachpb.Key) *roachpb.AdminMergeRequest {
Expand Down Expand Up @@ -1454,6 +1455,160 @@ func TestStoreRangeMergeDeadFollower(t *testing.T) {
}
}

func TestStoreRangeMergeReadoptedBothFollowers(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
storeCfg := storage.TestStoreConfig(nil)
storeCfg.TestingKnobs.DisableReplicateQueue = true
storeCfg.TestingKnobs.DisableReplicaGCQueue = true
mtc := &multiTestContext{storeConfig: &storeCfg}
mtc.Start(t, 3)
defer mtc.Stop()
store0, store2 := mtc.Store(0), mtc.Store(2)

// Create two ranges on all nodes.
mtc.replicateRange(roachpb.RangeID(1), 1, 2)
lhsDesc, rhsDesc, err := createSplitRanges(ctx, store0)
if err != nil {
t.Fatal(err)
}

// Wait for store2 to hear about the split.
var lhsRepl2, rhsRepl2 *storage.Replica
testutils.SucceedsSoon(t, func() error {
lhsRepl2, err = store2.GetReplica(lhsDesc.RangeID)
if err != nil {
return err
}
rhsRepl2, err = store2.GetReplica(rhsDesc.RangeID)
return err
})

// Abandon the two ranges on store2, but do not GC them.
mtc.unreplicateRange(lhsDesc.RangeID, 2)
mtc.unreplicateRange(rhsDesc.RangeID, 2)

// Merge the two ranges together.
args := adminMergeArgs(lhsDesc.StartKey.AsRawKey())
_, pErr := client.SendWrapped(ctx, store0.TestSender(), args)
if pErr != nil {
t.Fatal(pErr)
}

// Attempt to re-add the merged range to store2. The operation should fail
// because store2's LHS and RHS replicas intersect the merged range.
lhsRepl0, err := store0.GetReplica(lhsDesc.RangeID)
if err != nil {
t.Fatal(err)
}

addLHSRepl2 := func() error {
for r := retry.StartWithCtx(ctx, retry.Options{}); r.Next(); {
err := lhsRepl0.ChangeReplicas(ctx, roachpb.ADD_REPLICA, roachpb.ReplicationTarget{
NodeID: store2.Ident.NodeID,
StoreID: store2.Ident.StoreID,
}, lhsRepl0.Desc(), storage.ReasonUnknown, t.Name())
if !testutils.IsError(err, "store busy applying snapshots") {
return err
}
}
t.Fatal("unreachable")
return nil
}

err = addLHSRepl2()
if exp := "cannot apply snapshot: snapshot intersects existing range"; !testutils.IsError(err, exp) {
t.Fatalf("expected %q error, but got %v", exp, err)
}

// GC the replica of the LHS on store2.
if err := store2.ManualReplicaGC(lhsRepl2); err != nil {
t.Fatal(err)
}
if _, err := store2.GetReplica(lhsDesc.RangeID); err == nil {
t.Fatal("lhs replica not destroyed on store2")
}

// Attempt to re-add the merged range to store2. The operation should fail
// again because store2's RHS still intersects the merged range.
err = addLHSRepl2()
if exp := "cannot apply snapshot: snapshot intersects existing range"; !testutils.IsError(err, exp) {
t.Fatalf("expected %q error, but got %v", exp, err)
}

// GC the replica of the RHS on store2.
if err := store2.ManualReplicaGC(rhsRepl2); err != nil {
t.Fatal(err)
}
if _, err := store2.GetReplica(rhsDesc.RangeID); err == nil {
t.Fatal("rhs replica not destroyed on store2")
}

// Attempt to re-add the merged range to store2 one last time. This time the
// operation should succeed because there are no remaining intersecting
// replicas.
if err := addLHSRepl2(); err != nil {
t.Fatal(err)
}

// Give store2 the lease to force all commands to be applied, including the
// ChangeReplicas.
mtc.transferLease(ctx, lhsDesc.RangeID, 0, 2)
}

func TestStoreRangeMergeReadoptedLHSFollower(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
storeCfg := storage.TestStoreConfig(nil)
storeCfg.TestingKnobs.DisableReplicateQueue = true
storeCfg.TestingKnobs.DisableReplicaGCQueue = true
mtc := &multiTestContext{storeConfig: &storeCfg}
mtc.Start(t, 3)
defer mtc.Stop()
store0, store2 := mtc.Store(0), mtc.Store(2)

// Create two ranges on store0 and store1.
lhsDesc, rhsDesc, err := createSplitRanges(ctx, store0)
if err != nil {
t.Fatal(err)
}
mtc.replicateRange(lhsDesc.RangeID, 1)
mtc.replicateRange(rhsDesc.RangeID, 1)

// Abandon a replica of the LHS on store2.
mtc.replicateRange(lhsDesc.RangeID, 2)
var lhsRepl2 *storage.Replica
testutils.SucceedsSoon(t, func() error {
lhsRepl2, err = store2.GetReplica(lhsDesc.RangeID)
return err
})
mtc.unreplicateRange(lhsDesc.RangeID, 2)

// Merge the two ranges together.
args := adminMergeArgs(lhsDesc.StartKey.AsRawKey())
_, pErr := client.SendWrapped(ctx, store0.TestSender(), args)
if pErr != nil {
t.Fatal(pErr)
}

// Attempt to re-add the merged range to store2. This should succeed
// immediately because there are no overlapping replicas that would interfere
// with the widening of the existing LHS replica.
mtc.replicateRange(lhsDesc.RangeID, 2)

if newLHSRepl2, err := store2.GetReplica(lhsDesc.RangeID); err != nil {
t.Fatal(err)
} else if lhsRepl2 != newLHSRepl2 {
t.Fatalf("store2 created new lhs repl to receive preemptive snapshot post merge")
}

// Give store2 the lease to force all commands to be applied, including the
// ChangeReplicas.
mtc.transferLease(ctx, lhsDesc.RangeID, 0, 2)
}

// TestStoreRangeMergeDuringShutdown verifies that a shutdown of a store
// containing the RHS of a merge can occur cleanly. This previously triggered
// a fatal error (#27552).
Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,14 @@ func (m *multiTestContext) Stop() {
panic("timed out during shutdown")
}
}

m.mu.RLock()
defer m.mu.RUnlock()
for _, s := range m.stores {
if s != nil {
s.AssertInvariants()
}
}
}

// gossipStores forces each store to gossip its store descriptor and then
Expand Down
27 changes: 27 additions & 0 deletions pkg/storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"math/rand"
"testing"
"time"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/storage/rditer"
"github.com/pkg/errors"
Expand Down Expand Up @@ -234,6 +235,32 @@ func (s *Store) ReservationCount() int {
return len(s.snapshotApplySem)
}

// AssertInvariants verifies that the store's bookkeping is self-consistent. It
// is only valid to call this method when there is no in-flight traffic to the
// store (e.g., after the store is shut down).
func (s *Store) AssertInvariants() {
s.mu.RLock()
defer s.mu.RUnlock()
s.mu.replicas.Range(func(_ int64, p unsafe.Pointer) bool {
ctx := s.cfg.AmbientCtx.AnnotateCtx(context.Background())
repl := (*Replica)(p)
// We would normally need to hold repl.raftMu. Otherwise we can observe an
// initialized replica that is not in s.replicasByKey, e.g., if we race with
// a goroutine that is currently initializing repl. The lock ordering makes
// acquiring repl.raftMu challenging; instead we require that this method is
// called only when there is no in-flight traffic to the store, at which
// point acquiring repl.raftMu is unnecessary.
if repl.IsInitialized() {
if ex := s.mu.replicasByKey.Get(repl); ex != repl {
log.Fatalf(ctx, "%v misplaced in replicasByKey; found %v instead", repl, ex)
}
} else if _, ok := s.mu.uninitReplicas[repl.RangeID]; !ok {
log.Fatalf(ctx, "%v missing from unitReplicas", repl)
}
return true // keep iterating
})
}

func NewTestStorePool(cfg StoreConfig) *StorePool {
TimeUntilStoreDead.Override(&cfg.Settings.SV, TestTimeUntilStoreDeadOff)
return NewStorePool(
Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,11 @@ var _ security.RequestWithUser = &RaftMessageRequest{}
func (*RaftMessageRequest) GetUser() string {
return security.NodeUser
}

// IsPreemptive returns whether this is a preemptive snapshot or a Raft
// snapshot.
func (h *SnapshotRequest_Header) IsPreemptive() bool {
// Preemptive snapshots are addressed to replica ID 0. No other requests to
// replica ID 0 are allowed.
return h.RaftMessageRequest.ToReplica.ReplicaID == 0
}
97 changes: 44 additions & 53 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3351,13 +3351,13 @@ func (s *Store) processRaftRequestWithReplica(
// TODO(benesch): handle snapshots that widen EndKey. These can occur if this
// replica was behind when the range committed a merge.
func (s *Store) processRaftSnapshotRequest(
ctx context.Context, req *RaftMessageRequest, inSnap IncomingSnapshot,
ctx context.Context, snapHeader *SnapshotRequest_Header, inSnap IncomingSnapshot,
) *roachpb.Error {
return s.withReplicaForRequest(ctx, req, func(
return s.withReplicaForRequest(ctx, &snapHeader.RaftMessageRequest, func(
ctx context.Context, r *Replica,
) (pErr *roachpb.Error) {
if req.Message.Type != raftpb.MsgSnap {
log.Fatalf(ctx, "expected snapshot: %+v", req)
if snapHeader.RaftMessageRequest.Message.Type != raftpb.MsgSnap {
log.Fatalf(ctx, "expected snapshot: %+v", snapHeader.RaftMessageRequest)
}

// Check to see if a snapshot can be applied. Snapshots can always be applied
Expand All @@ -3366,57 +3366,48 @@ func (s *Store) processRaftSnapshotRequest(
// raft-ready processing of uninitialized replicas.
var addedPlaceholder bool
var removePlaceholder bool
if !r.IsInitialized() {
if err := func() error {
s.mu.Lock()
defer s.mu.Unlock()
placeholder, err := s.canApplySnapshotLocked(ctx, inSnap.State.Desc)
if err != nil {
// If the storage cannot accept the snapshot, return an
// error before passing it to RawNode.Step, since our
// error handling options past that point are limited.
log.Infof(ctx, "cannot apply snapshot: %s", err)
return err
}
if err := func() error {
s.mu.Lock()
defer s.mu.Unlock()
placeholder, err := s.canApplySnapshotLocked(ctx, snapHeader)
if err != nil {
// If the storage cannot accept the snapshot, return an
// error before passing it to RawNode.Step, since our
// error handling options past that point are limited.
log.Infof(ctx, "cannot apply snapshot: %s", err)
return err
}

if placeholder != nil {
// NB: The placeholder added here is either removed below after a
// preemptive snapshot is applied or after the next call to
// Replica.handleRaftReady. Note that we can only get here if the
// replica doesn't exist or is uninitialized.
if err := s.addPlaceholderLocked(placeholder); err != nil {
log.Fatalf(ctx, "could not add vetted placeholder %s: %s", placeholder, err)
}
addedPlaceholder = true
if placeholder != nil {
// NB: The placeholder added here is either removed below after a
// preemptive snapshot is applied or after the next call to
// Replica.handleRaftReady. Note that we can only get here if the
// replica doesn't exist or is uninitialized.
if err := s.addPlaceholderLocked(placeholder); err != nil {
log.Fatalf(ctx, "could not add vetted placeholder %s: %s", placeholder, err)
}
return nil
}(); err != nil {
return roachpb.NewError(err)
addedPlaceholder = true
}
return nil
}(); err != nil {
return roachpb.NewError(err)
}

if addedPlaceholder {
// If we added a placeholder remove it before we return unless some other
// part of the code takes ownership of the removal (indicated by setting
// removePlaceholder to false).
removePlaceholder = true
defer func() {
if removePlaceholder {
if s.removePlaceholder(ctx, req.RangeID) {
atomic.AddInt32(&s.counts.removedPlaceholders, 1)
}
if addedPlaceholder {
// If we added a placeholder remove it before we return unless some other
// part of the code takes ownership of the removal (indicated by setting
// removePlaceholder to false).
removePlaceholder = true
defer func() {
if removePlaceholder {
if s.removePlaceholder(ctx, snapHeader.RaftMessageRequest.RangeID) {
atomic.AddInt32(&s.counts.removedPlaceholders, 1)
}
}()
}
}
}()
}

// Snapshots addressed to replica ID 0 are permitted; this is the
// mechanism by which preemptive snapshots work. No other requests to
// replica ID 0 are allowed.
//
// Note that just because the ToReplica's ID is 0 it does not necessarily
// mean that the replica's current ID is 0. We allow for preemptive snaphots
// to be applied to initialized replicas as of #8613.
if req.ToReplica.ReplicaID == 0 {
if snapHeader.IsPreemptive() {
defer func() {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -3425,7 +3416,7 @@ func (s *Store) processRaftSnapshotRequest(
// applied successfully or not.
if addedPlaceholder {
// Clear the replica placeholder; we are about to swap it with a real replica.
if !s.removePlaceholderLocked(ctx, req.RangeID) {
if !s.removePlaceholderLocked(ctx, snapHeader.RaftMessageRequest.RangeID) {
log.Fatalf(ctx, "could not remove placeholder after preemptive snapshot")
}
if pErr == nil {
Expand All @@ -3448,10 +3439,10 @@ func (s *Store) processRaftSnapshotRequest(
// get all of Raft's internal safety checks (it confuses messages
// at term zero for internal messages). The sending side uses the
// term from the snapshot itself, but we'll just check nonzero.
if req.Message.Term == 0 {
if snapHeader.RaftMessageRequest.Message.Term == 0 {
return roachpb.NewErrorf(
"preemptive snapshot from term %d received with zero term",
req.Message.Snapshot.Metadata.Term,
snapHeader.RaftMessageRequest.Message.Snapshot.Metadata.Term,
)
}
// TODO(tschottdorf): A lot of locking of the individual Replica
Expand Down Expand Up @@ -3517,7 +3508,7 @@ func (s *Store) processRaftSnapshotRequest(
return roachpb.NewError(err)
}
// We have a Raft group; feed it the message.
if err := raftGroup.Step(req.Message); err != nil {
if err := raftGroup.Step(snapHeader.RaftMessageRequest.Message); err != nil {
return roachpb.NewError(errors.Wrap(err, "unable to process preemptive snapshot"))
}
// In the normal case, the group should ask us to apply a snapshot.
Expand Down Expand Up @@ -3557,7 +3548,7 @@ func (s *Store) processRaftSnapshotRequest(
return nil
}

if err := r.stepRaftGroup(req); err != nil {
if err := r.stepRaftGroup(&snapHeader.RaftMessageRequest); err != nil {
return roachpb.NewError(err)
}

Expand Down
Loading