Skip to content

Commit

Permalink
storage: apply lease change side-effects on snapshot recipients
Browse files Browse the repository at this point in the history
Fixes #34025.

It is rare but possible for a replica to become a leaseholder but not
learn about this until it applies a snapshot. Immediately upon the
snapshot application's `ReplicaState` update, the replica will begin
operating as a standard leaseholder.

Before this change, leases acquired in this way would not trigger
in-memory side-effects to be performed. This could result in a regression
in the new leaseholder's timestamp cache compared to the previous
leaseholder, allowing write-skew like we saw in #34025. This could
presumably result in other anomalies as well, because all of the
steps in `leasePostApply` were skipped.

This PR fixes this bug by detecting lease updates when applying
snapshots and making sure to react correctly to them. It also likely
fixes the referenced issue. The new test demonstrated that without
this fix, the serializable violation speculated about in the issue
was possible.

Release note (bug fix): Fix bug where lease transfers passed through
Snapshots could forget to update in-memory state on the new leaseholder,
allowing write-skew between read-modify-write operations.
  • Loading branch information
nvanbenschoten committed Feb 5, 2019
1 parent 317226b commit b74eb26
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 9 deletions.
129 changes: 129 additions & 0 deletions pkg/storage/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1822,3 +1822,132 @@ func TestClearRange(t *testing.T) {
verifyKeysWithPrefix(keys.LocalStoreSuggestedCompactionsMin,
[]roachpb.Key{keys.StoreSuggestedCompactionKey(lg1, lg3)})
}

// TestLeaseTransferInSnapshotUpdatesTimestampCache prevents a regression of
// #34025. A Replica is targeted for a lease transfer target when it needs a
// Raft snapshot to catch up. Normally we try to prevent this case, but it is
// possible and hard to prevent entirely. The Replica will only learn that it is
// the new leaseholder when it applies the snapshot. When doing so, it should
// make sure to apply the lease-related side-effects to its in-memory state.
func TestLeaseTransferInSnapshotUpdatesTimestampCache(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
sc := storage.TestStoreConfig(nil)
// We'll control replication by hand.
sc.TestingKnobs.DisableReplicateQueue = true
// Avoid fighting with the merge queue while trying to reproduce this race.
sc.TestingKnobs.DisableMergeQueue = true
mtc := &multiTestContext{storeConfig: &sc}
defer mtc.Stop()
mtc.Start(t, 3)

keyA := roachpb.Key("a")
keyB := roachpb.Key("b")
keyC := roachpb.Key("c")

// First, do a couple of writes; we'll use these to determine when
// the dust has settled.
incArgs := incrementArgs(keyA, 1)
if _, pErr := client.SendWrapped(ctx, mtc.stores[0].TestSender(), incArgs); pErr != nil {
t.Fatal(pErr)
}
incArgs = incrementArgs(keyC, 2)
if _, pErr := client.SendWrapped(ctx, mtc.stores[0].TestSender(), incArgs); pErr != nil {
t.Fatal(pErr)
}

// Split the system range from the rest of the keyspace.
splitArgs := adminSplitArgs(keys.SystemMax)
if _, pErr := client.SendWrapped(ctx, mtc.stores[0].TestSender(), splitArgs); pErr != nil {
t.Fatal(pErr)
}

// Get the range's ID.
repl0 := mtc.stores[0].LookupReplica(roachpb.RKey(keyA))
rangeID := repl0.RangeID

// Replicate the range onto nodes 1 and 2.
// Wait for all replicas to be caught up.
mtc.replicateRange(rangeID, 1, 2)
mtc.waitForValues(keyA, []int64{1, 1, 1})
mtc.waitForValues(keyC, []int64{2, 2, 2})

// Create a transaction the will try to write "under" a served read.
// The read will have been served by the original leaseholder (node 0)
// and the write will be attempted on the new leaseholder (node 2).
// It should not succeed because it should run into the timestamp cache.
db := mtc.dbs[0]
txnOld := client.NewTxn(ctx, db, 0 /* gatewayNodeID */, client.RootTxn)

// Perform a write with txnOld so that its timestamp gets set and so
// that it writes its transaction record.
if err := txnOld.EagerRecord(); err != nil {
t.Fatal(err)
}
if _, err := txnOld.Inc(ctx, keyB, 3); err != nil {
t.Fatal(err)
}

// Read keyC with txnOld, which is updated below. This prevents the
// transaction from refreshing when it hits the serializable error.
if _, err := txnOld.Get(ctx, keyC); err != nil {
t.Fatal(err)
}

// Another client comes along at a higher timestamp and reads. We should
// never be able to write under this time or we would be rewriting history.
if _, err := db.Get(ctx, keyA); err != nil {
t.Fatal(err)
}

// Partition node 2 from the rest of its range. Once partitioned, perform
// another write and truncate the Raft log on the two connected nodes. This
// ensures that that when node 2 comes back up it will require a snapshot
// from Raft.
mtc.transport.GetCircuitBreaker(mtc.idents[2].NodeID).Break()
if _, pErr := client.SendWrapped(ctx, mtc.stores[0].TestSender(), incArgs); pErr != nil {
t.Fatal(pErr)
}
mtc.waitForValues(keyC, []int64{4, 4, 2})

// Truncate the log at index+1 (log entries < N are removed, so this
// includes the increment). This necessitates a snapshot when the
// partitioned replica rejoins the rest of the range.
index, err := repl0.GetLastIndex()
if err != nil {
t.Fatal(err)
}
truncArgs := truncateLogArgs(index+1, rangeID)
truncArgs.Key = keyA
if _, err := client.SendWrapped(ctx, mtc.stores[0].TestSender(), truncArgs); err != nil {
t.Fatal(err)
}

// Finally, transfer the lease to node 2 while it is still unavailable and
// behind. We try to avoid this case when picking new leaseholders in practice,
// but we're never 100% successful.
if err := repl0.AdminTransferLease(ctx, mtc.idents[2].StoreID); err != nil {
t.Fatal(err)
}

// Remove the partition. A snapshot to node 2 should follow. This snapshot
// will inform node 2 that it is the new leaseholder for the range. Node 2
// should act accordingly and update its internal state to reflect this.
mtc.transport.GetCircuitBreaker(mtc.idents[2].NodeID).Reset()
mtc.waitForValues(keyC, []int64{4, 4, 4})

// Perform a write on the new leaseholder underneath the previously served
// read. This write should hit the timestamp cache and flag the txn for a
// restart when we try to commit it below. With the bug in #34025, the new
// leaseholder who heard about the lease transfer from a snapshot had an
// empty timestamp cache and would simply let us write under the previous
// read.
if _, err := txnOld.Inc(ctx, keyA, 4); err != nil {
t.Fatal(err)
}
const exp = `TransactionRetryError: retry txn \(RETRY_SERIALIZABLE\)`
if err := txnOld.Commit(ctx); !testutils.IsError(err, exp) {
t.Fatalf("expected retry error, got: %v; did we write under a read?", err)
}
}
17 changes: 10 additions & 7 deletions pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,14 @@ func (r *Replica) computeChecksumPostApply(ctx context.Context, cc storagepb.Com
}
}

// leasePostApply is called when a RequestLease or TransferLease
// request is executed for a range.
func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease) {
// leasePostApply updates the Replica's internal state to reflect the
// application of a new Range lease. The method is idempotent, so it can be
// called repeatedly for the same lease safely. However, the method will panic
// if passed a lease with a lower sequence number than the current lease. By
// default, the method will also panic if passed a lease that indicates a
// forward sequence number jump (i.e. a skipped lease). This behavior can
// be disabled by passing permitJump as true.
func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease, permitJump bool) {
r.mu.Lock()
replicaID := r.mu.replicaID
prevLease := *r.mu.state.Lease
Expand Down Expand Up @@ -267,9 +272,7 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease) {
}
case s2 == s1+1:
// Lease sequence incremented by 1. Expected case.
case s2 > s1+1:
// Snapshots will never call leasePostApply, so we always expect
// leases to increment one at a time here.
case s2 > s1+1 && !permitJump:
log.Fatalf(ctx, "lease sequence jump, prevLease=%s, newLease=%s",
log.Safe(prevLease), log.Safe(newLease))
}
Expand Down Expand Up @@ -634,7 +637,7 @@ func (r *Replica) handleReplicatedEvalResult(
}

if newLease := rResult.State.Lease; newLease != nil {
r.leasePostApply(ctx, *newLease)
r.leasePostApply(ctx, *newLease, false /* permitJump */)
rResult.State.Lease = nil
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,12 @@ func (r *Replica) applySnapshot(
}
r.store.mu.Unlock()

// Invoke the leasePostApply method to ensure we properly initialize the
// replica according to whether it holds the lease. We allow jumps in the
// lease sequence because there may be multiple lease changes accounted for
// in the snapshot.
r.leasePostApply(ctx, *s.Lease, true /* permitJump */)

r.mu.Lock()
// We set the persisted last index to the last applied index. This is
// not a correctness issue, but means that we may have just transferred
Expand All @@ -967,7 +973,8 @@ func (r *Replica) applySnapshot(
r.store.metrics.subtractMVCCStats(*r.mu.state.Stats)
r.store.metrics.addMVCCStats(*s.Stats)
// Update the rest of the Raft state. Changes to r.mu.state.Desc must be
// managed by r.setDesc, but we called that above, so now it's safe to
// managed by r.setDesc and changes to r.mu.state.Lease must be handled
// by r.leasePostApply, but we called those above, so now it's safe to
// wholesale replace r.mu.state.
r.mu.state = s
r.assertStateLocked(ctx, r.store.Engine())
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2312,7 +2312,7 @@ func splitPostApply(
// Invoke the leasePostApply method to ensure we properly initialize
// the replica according to whether it holds the lease. This enables
// the txnWaitQueue.
rightRng.leasePostApply(ctx, rightLease)
rightRng.leasePostApply(ctx, rightLease, false /* permitJump */)

// Add the RHS replica to the store. This step atomically updates
// the EndKey of the LHS replica and also adds the RHS replica
Expand Down

0 comments on commit b74eb26

Please sign in to comment.