Skip to content

Commit

Permalink
Merge #28594 #28683
Browse files Browse the repository at this point in the history
28594: ui: add disk ops per second graph to hardware dashboard r=vilterp a=vilterp

The middle two graphs are new:
![image](https://user-images.githubusercontent.com/7341/44124100-7a365648-9ff9-11e8-96b8-c9253ba4e05d.png)

Fixes #28552

28683: storage: handle preemptive snapshots that span merges r=bdarnell,tschottdorf a=benesch

Preemptive snapshots can indirectly "widen" an existing replica.
Consider a replica that is removed from a range immediately before a
merge, then re-added to the range immediately after the merge. If the
replica is not GC'd before it is re-added to the range, the existing
replica will receive a preemptive snapshot with a larger end key.

This commit teaches the snapshot reception code to handle this case
properly. If widening the existing replica would cause overlap other
replicas, the preemptive snapshot is rejected. Otherwise, a placeholder
for the new keyspace is installed and the snapshot is applied.

Release note: None

Fix #28369.

Co-authored-by: Pete Vilter <vilterp@cockroachlabs.com>
Co-authored-by: Nikhil Benesch <nikhil.benesch@gmail.com>
  • Loading branch information
3 people committed Aug 16, 2018
3 parents e63832f + 7eac041 + fa8b2bb commit 852396d
Show file tree
Hide file tree
Showing 8 changed files with 355 additions and 114 deletions.
155 changes: 155 additions & 0 deletions pkg/storage/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
)

func adminMergeArgs(key roachpb.Key) *roachpb.AdminMergeRequest {
Expand Down Expand Up @@ -1454,6 +1455,160 @@ func TestStoreRangeMergeDeadFollower(t *testing.T) {
}
}

func TestStoreRangeMergeReadoptedBothFollowers(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
storeCfg := storage.TestStoreConfig(nil)
storeCfg.TestingKnobs.DisableReplicateQueue = true
storeCfg.TestingKnobs.DisableReplicaGCQueue = true
mtc := &multiTestContext{storeConfig: &storeCfg}
mtc.Start(t, 3)
defer mtc.Stop()
store0, store2 := mtc.Store(0), mtc.Store(2)

// Create two ranges on all nodes.
mtc.replicateRange(roachpb.RangeID(1), 1, 2)
lhsDesc, rhsDesc, err := createSplitRanges(ctx, store0)
if err != nil {
t.Fatal(err)
}

// Wait for store2 to hear about the split.
var lhsRepl2, rhsRepl2 *storage.Replica
testutils.SucceedsSoon(t, func() error {
lhsRepl2, err = store2.GetReplica(lhsDesc.RangeID)
if err != nil {
return err
}
rhsRepl2, err = store2.GetReplica(rhsDesc.RangeID)
return err
})

// Abandon the two ranges on store2, but do not GC them.
mtc.unreplicateRange(lhsDesc.RangeID, 2)
mtc.unreplicateRange(rhsDesc.RangeID, 2)

// Merge the two ranges together.
args := adminMergeArgs(lhsDesc.StartKey.AsRawKey())
_, pErr := client.SendWrapped(ctx, store0.TestSender(), args)
if pErr != nil {
t.Fatal(pErr)
}

// Attempt to re-add the merged range to store2. The operation should fail
// because store2's LHS and RHS replicas intersect the merged range.
lhsRepl0, err := store0.GetReplica(lhsDesc.RangeID)
if err != nil {
t.Fatal(err)
}

addLHSRepl2 := func() error {
for r := retry.StartWithCtx(ctx, retry.Options{}); r.Next(); {
err := lhsRepl0.ChangeReplicas(ctx, roachpb.ADD_REPLICA, roachpb.ReplicationTarget{
NodeID: store2.Ident.NodeID,
StoreID: store2.Ident.StoreID,
}, lhsRepl0.Desc(), storage.ReasonUnknown, t.Name())
if !testutils.IsError(err, "store busy applying snapshots") {
return err
}
}
t.Fatal("unreachable")
return nil
}

err = addLHSRepl2()
if exp := "cannot apply snapshot: snapshot intersects existing range"; !testutils.IsError(err, exp) {
t.Fatalf("expected %q error, but got %v", exp, err)
}

// GC the replica of the LHS on store2.
if err := store2.ManualReplicaGC(lhsRepl2); err != nil {
t.Fatal(err)
}
if _, err := store2.GetReplica(lhsDesc.RangeID); err == nil {
t.Fatal("lhs replica not destroyed on store2")
}

// Attempt to re-add the merged range to store2. The operation should fail
// again because store2's RHS still intersects the merged range.
err = addLHSRepl2()
if exp := "cannot apply snapshot: snapshot intersects existing range"; !testutils.IsError(err, exp) {
t.Fatalf("expected %q error, but got %v", exp, err)
}

// GC the replica of the RHS on store2.
if err := store2.ManualReplicaGC(rhsRepl2); err != nil {
t.Fatal(err)
}
if _, err := store2.GetReplica(rhsDesc.RangeID); err == nil {
t.Fatal("rhs replica not destroyed on store2")
}

// Attempt to re-add the merged range to store2 one last time. This time the
// operation should succeed because there are no remaining intersecting
// replicas.
if err := addLHSRepl2(); err != nil {
t.Fatal(err)
}

// Give store2 the lease to force all commands to be applied, including the
// ChangeReplicas.
mtc.transferLease(ctx, lhsDesc.RangeID, 0, 2)
}

func TestStoreRangeMergeReadoptedLHSFollower(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
storeCfg := storage.TestStoreConfig(nil)
storeCfg.TestingKnobs.DisableReplicateQueue = true
storeCfg.TestingKnobs.DisableReplicaGCQueue = true
mtc := &multiTestContext{storeConfig: &storeCfg}
mtc.Start(t, 3)
defer mtc.Stop()
store0, store2 := mtc.Store(0), mtc.Store(2)

// Create two ranges on store0 and store1.
lhsDesc, rhsDesc, err := createSplitRanges(ctx, store0)
if err != nil {
t.Fatal(err)
}
mtc.replicateRange(lhsDesc.RangeID, 1)
mtc.replicateRange(rhsDesc.RangeID, 1)

// Abandon a replica of the LHS on store2.
mtc.replicateRange(lhsDesc.RangeID, 2)
var lhsRepl2 *storage.Replica
testutils.SucceedsSoon(t, func() error {
lhsRepl2, err = store2.GetReplica(lhsDesc.RangeID)
return err
})
mtc.unreplicateRange(lhsDesc.RangeID, 2)

// Merge the two ranges together.
args := adminMergeArgs(lhsDesc.StartKey.AsRawKey())
_, pErr := client.SendWrapped(ctx, store0.TestSender(), args)
if pErr != nil {
t.Fatal(pErr)
}

// Attempt to re-add the merged range to store2. This should succeed
// immediately because there are no overlapping replicas that would interfere
// with the widening of the existing LHS replica.
mtc.replicateRange(lhsDesc.RangeID, 2)

if newLHSRepl2, err := store2.GetReplica(lhsDesc.RangeID); err != nil {
t.Fatal(err)
} else if lhsRepl2 != newLHSRepl2 {
t.Fatalf("store2 created new lhs repl to receive preemptive snapshot post merge")
}

// Give store2 the lease to force all commands to be applied, including the
// ChangeReplicas.
mtc.transferLease(ctx, lhsDesc.RangeID, 0, 2)
}

// TestStoreRangeMergeDuringShutdown verifies that a shutdown of a store
// containing the RHS of a merge can occur cleanly. This previously triggered
// a fatal error (#27552).
Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,14 @@ func (m *multiTestContext) Stop() {
panic("timed out during shutdown")
}
}

m.mu.RLock()
defer m.mu.RUnlock()
for _, s := range m.stores {
if s != nil {
s.AssertInvariants()
}
}
}

// gossipStores forces each store to gossip its store descriptor and then
Expand Down
27 changes: 27 additions & 0 deletions pkg/storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"math/rand"
"testing"
"time"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/storage/rditer"
"github.com/pkg/errors"
Expand Down Expand Up @@ -234,6 +235,32 @@ func (s *Store) ReservationCount() int {
return len(s.snapshotApplySem)
}

// AssertInvariants verifies that the store's bookkeping is self-consistent. It
// is only valid to call this method when there is no in-flight traffic to the
// store (e.g., after the store is shut down).
func (s *Store) AssertInvariants() {
s.mu.RLock()
defer s.mu.RUnlock()
s.mu.replicas.Range(func(_ int64, p unsafe.Pointer) bool {
ctx := s.cfg.AmbientCtx.AnnotateCtx(context.Background())
repl := (*Replica)(p)
// We would normally need to hold repl.raftMu. Otherwise we can observe an
// initialized replica that is not in s.replicasByKey, e.g., if we race with
// a goroutine that is currently initializing repl. The lock ordering makes
// acquiring repl.raftMu challenging; instead we require that this method is
// called only when there is no in-flight traffic to the store, at which
// point acquiring repl.raftMu is unnecessary.
if repl.IsInitialized() {
if ex := s.mu.replicasByKey.Get(repl); ex != repl {
log.Fatalf(ctx, "%v misplaced in replicasByKey; found %v instead", repl, ex)
}
} else if _, ok := s.mu.uninitReplicas[repl.RangeID]; !ok {
log.Fatalf(ctx, "%v missing from unitReplicas", repl)
}
return true // keep iterating
})
}

func NewTestStorePool(cfg StoreConfig) *StorePool {
TimeUntilStoreDead.Override(&cfg.Settings.SV, TestTimeUntilStoreDeadOff)
return NewStorePool(
Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,11 @@ var _ security.RequestWithUser = &RaftMessageRequest{}
func (*RaftMessageRequest) GetUser() string {
return security.NodeUser
}

// IsPreemptive returns whether this is a preemptive snapshot or a Raft
// snapshot.
func (h *SnapshotRequest_Header) IsPreemptive() bool {
// Preemptive snapshots are addressed to replica ID 0. No other requests to
// replica ID 0 are allowed.
return h.RaftMessageRequest.ToReplica.ReplicaID == 0
}
97 changes: 44 additions & 53 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3397,13 +3397,13 @@ func (s *Store) processRaftRequestWithReplica(
// TODO(benesch): handle snapshots that widen EndKey. These can occur if this
// replica was behind when the range committed a merge.
func (s *Store) processRaftSnapshotRequest(
ctx context.Context, req *RaftMessageRequest, inSnap IncomingSnapshot,
ctx context.Context, snapHeader *SnapshotRequest_Header, inSnap IncomingSnapshot,
) *roachpb.Error {
return s.withReplicaForRequest(ctx, req, func(
return s.withReplicaForRequest(ctx, &snapHeader.RaftMessageRequest, func(
ctx context.Context, r *Replica,
) (pErr *roachpb.Error) {
if req.Message.Type != raftpb.MsgSnap {
log.Fatalf(ctx, "expected snapshot: %+v", req)
if snapHeader.RaftMessageRequest.Message.Type != raftpb.MsgSnap {
log.Fatalf(ctx, "expected snapshot: %+v", snapHeader.RaftMessageRequest)
}

// Check to see if a snapshot can be applied. Snapshots can always be applied
Expand All @@ -3412,57 +3412,48 @@ func (s *Store) processRaftSnapshotRequest(
// raft-ready processing of uninitialized replicas.
var addedPlaceholder bool
var removePlaceholder bool
if !r.IsInitialized() {
if err := func() error {
s.mu.Lock()
defer s.mu.Unlock()
placeholder, err := s.canApplySnapshotLocked(ctx, inSnap.State.Desc)
if err != nil {
// If the storage cannot accept the snapshot, return an
// error before passing it to RawNode.Step, since our
// error handling options past that point are limited.
log.Infof(ctx, "cannot apply snapshot: %s", err)
return err
}
if err := func() error {
s.mu.Lock()
defer s.mu.Unlock()
placeholder, err := s.canApplySnapshotLocked(ctx, snapHeader)
if err != nil {
// If the storage cannot accept the snapshot, return an
// error before passing it to RawNode.Step, since our
// error handling options past that point are limited.
log.Infof(ctx, "cannot apply snapshot: %s", err)
return err
}

if placeholder != nil {
// NB: The placeholder added here is either removed below after a
// preemptive snapshot is applied or after the next call to
// Replica.handleRaftReady. Note that we can only get here if the
// replica doesn't exist or is uninitialized.
if err := s.addPlaceholderLocked(placeholder); err != nil {
log.Fatalf(ctx, "could not add vetted placeholder %s: %s", placeholder, err)
}
addedPlaceholder = true
if placeholder != nil {
// NB: The placeholder added here is either removed below after a
// preemptive snapshot is applied or after the next call to
// Replica.handleRaftReady. Note that we can only get here if the
// replica doesn't exist or is uninitialized.
if err := s.addPlaceholderLocked(placeholder); err != nil {
log.Fatalf(ctx, "could not add vetted placeholder %s: %s", placeholder, err)
}
return nil
}(); err != nil {
return roachpb.NewError(err)
addedPlaceholder = true
}
return nil
}(); err != nil {
return roachpb.NewError(err)
}

if addedPlaceholder {
// If we added a placeholder remove it before we return unless some other
// part of the code takes ownership of the removal (indicated by setting
// removePlaceholder to false).
removePlaceholder = true
defer func() {
if removePlaceholder {
if s.removePlaceholder(ctx, req.RangeID) {
atomic.AddInt32(&s.counts.removedPlaceholders, 1)
}
if addedPlaceholder {
// If we added a placeholder remove it before we return unless some other
// part of the code takes ownership of the removal (indicated by setting
// removePlaceholder to false).
removePlaceholder = true
defer func() {
if removePlaceholder {
if s.removePlaceholder(ctx, snapHeader.RaftMessageRequest.RangeID) {
atomic.AddInt32(&s.counts.removedPlaceholders, 1)
}
}()
}
}
}()
}

// Snapshots addressed to replica ID 0 are permitted; this is the
// mechanism by which preemptive snapshots work. No other requests to
// replica ID 0 are allowed.
//
// Note that just because the ToReplica's ID is 0 it does not necessarily
// mean that the replica's current ID is 0. We allow for preemptive snaphots
// to be applied to initialized replicas as of #8613.
if req.ToReplica.ReplicaID == 0 {
if snapHeader.IsPreemptive() {
defer func() {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -3471,7 +3462,7 @@ func (s *Store) processRaftSnapshotRequest(
// applied successfully or not.
if addedPlaceholder {
// Clear the replica placeholder; we are about to swap it with a real replica.
if !s.removePlaceholderLocked(ctx, req.RangeID) {
if !s.removePlaceholderLocked(ctx, snapHeader.RaftMessageRequest.RangeID) {
log.Fatalf(ctx, "could not remove placeholder after preemptive snapshot")
}
if pErr == nil {
Expand All @@ -3494,10 +3485,10 @@ func (s *Store) processRaftSnapshotRequest(
// get all of Raft's internal safety checks (it confuses messages
// at term zero for internal messages). The sending side uses the
// term from the snapshot itself, but we'll just check nonzero.
if req.Message.Term == 0 {
if snapHeader.RaftMessageRequest.Message.Term == 0 {
return roachpb.NewErrorf(
"preemptive snapshot from term %d received with zero term",
req.Message.Snapshot.Metadata.Term,
snapHeader.RaftMessageRequest.Message.Snapshot.Metadata.Term,
)
}
// TODO(tschottdorf): A lot of locking of the individual Replica
Expand Down Expand Up @@ -3563,7 +3554,7 @@ func (s *Store) processRaftSnapshotRequest(
return roachpb.NewError(err)
}
// We have a Raft group; feed it the message.
if err := raftGroup.Step(req.Message); err != nil {
if err := raftGroup.Step(snapHeader.RaftMessageRequest.Message); err != nil {
return roachpb.NewError(errors.Wrap(err, "unable to process preemptive snapshot"))
}
// In the normal case, the group should ask us to apply a snapshot.
Expand Down Expand Up @@ -3603,7 +3594,7 @@ func (s *Store) processRaftSnapshotRequest(
return nil
}

if err := r.stepRaftGroup(req); err != nil {
if err := r.stepRaftGroup(&snapHeader.RaftMessageRequest); err != nil {
return roachpb.NewError(err)
}

Expand Down
Loading

0 comments on commit 852396d

Please sign in to comment.