Skip to content

Commit

Permalink
storage: use lease start time to determine follower read safety
Browse files Browse the repository at this point in the history
This change recognizes that lease transfer start times have provide write
invariants equivalent to those provided by the closed timestamp and MLAI and
thus the maximum closed timestamp for a replica can be viewed as the maximum
timestamp of that of the current replica lease start time and the timestamp
provided by the closed timestamp provider. Utilizing this fact makes follower
reads as well as rangefeeds more robust to lease transfers. Before this change
lease transfers to different nodes might cause closed timestamp information for
a range to become unavailable leading to an inability for a range to serve a
read at a timestamp which was previously determined to be safe.

Fixes #35129.

Release note: None
  • Loading branch information
ajwerner committed Mar 12, 2019
1 parent 895659a commit 7037b54
Show file tree
Hide file tree
Showing 7 changed files with 297 additions and 184 deletions.
307 changes: 221 additions & 86 deletions pkg/storage/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package storage_test

import (
"context"
gosql "database/sql"
"fmt"
"math/rand"
"testing"
"time"

Expand All @@ -28,9 +30,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

func TestClosedTimestampCanServe(t *testing.T) {
Expand All @@ -44,17 +48,184 @@ func TestClosedTimestampCanServe(t *testing.T) {
}

ctx := context.Background()
const numNodes = 3
tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t)
defer tc.Stopper().Stop(ctx)

if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil {
t.Fatal(err)
}

ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
testutils.SucceedsSoon(t, func() error {
return verifyCanReadFromAllRepls(ctx, t, desc, ts, repls, 1)
})

// We just served a follower read. As a sanity check, make sure that we can't write at
// that same timestamp.
{
var baWrite roachpb.BatchRequest
r := &roachpb.DeleteRequest{}
r.Key = desc.StartKey.AsRawKey()
txn := roachpb.MakeTransaction("testwrite", r.Key, roachpb.NormalUserPriority, ts, 100)
baWrite.Txn = &txn
baWrite.Add(r)
baWrite.RangeID = repls[0].RangeID
if err := baWrite.SetActiveTimestamp(tc.Server(0).Clock().Now); err != nil {
t.Fatal(err)
}

tc := serverutils.StartTestCluster(t, numNodes, base.TestClusterArgs{})
var found bool
for _, repl := range repls {
resp, pErr := repl.Send(ctx, baWrite)
if _, ok := pErr.GoError().(*roachpb.NotLeaseHolderError); ok {
continue
} else if pErr != nil {
t.Fatal(pErr)
}
found = true
if !ts.Less(resp.Txn.Timestamp) || resp.Txn.OrigTimestamp == resp.Txn.Timestamp {
t.Fatal("timestamp did not get bumped")
}
break
}
if !found {
t.Fatal("unable to send to any replica")
}
}
}

func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) {
defer leaktest.AfterTest(t)()

if util.RaceEnabled {
// Limiting how long transactions can run does not work
// well with race unless we're extremely lenient, which
// drives up the test duration.
t.Skip("skipping under race")
}
ctx := context.Background()
tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t)
defer tc.Stopper().Stop(ctx)

db0 := tc.ServerConn(0)
// Every 0.1s=100ms, try close out a timestamp ~300ms in the past.
// We don't want to be more aggressive than that since it's also
// a limit on how long transactions can run.
targetDuration := 300 * time.Millisecond
closeFraction := 0.3
if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil {
t.Fatal(err)
}
ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
testutils.SucceedsSoon(t, func() error {
return verifyCanReadFromAllRepls(ctx, t, desc, ts, repls, 1)
})

// Once we know that we can read safely at this timestamp, want to ensure
// that we can always read from this timestamp from all replicas even while
// lease transfers are ongoing. The test launches a goroutine to randomly
// trigger transfers every
const testTime = 500 * time.Millisecond
const maxTransferWait = 50 * time.Millisecond
deadline := timeutil.Now().Add(testTime)
g, gCtx := errgroup.WithContext(ctx)
getCurrentLeaseholder := func() (lh roachpb.ReplicationTarget) {
testutils.SucceedsSoon(t, func() error {
var err error
lh, err = tc.FindRangeLeaseHolder(desc, nil)
return err
})
return lh
}
pickRandomTarget := func(lh roachpb.ReplicationTarget) (t roachpb.ReplicationTarget) {
for {
if t = tc.Target(rand.Intn(len(repls))); t != lh {
return t
}
}
}
transferLeasesRandomlyUntilDeadline := func() error {
for timeutil.Now().Before(deadline) {
lh := getCurrentLeaseholder()
target := pickRandomTarget(lh)
if err := tc.TransferRangeLease(desc, target); err != nil {
return err
}
time.Sleep(time.Duration(rand.Intn(int(maxTransferWait))))
}
return nil
}
g.Go(transferLeasesRandomlyUntilDeadline)

// Attempt to send read requests to a replica in a tight loop until deadline
// is reached. If an error is seen on any replica then it is returned to the
// errgroup.
baRead := makeReadBatchRequestForDesc(desc, ts)
ensureCanReadFromReplicaUntilDeadline := func(r *storage.Replica) {
g.Go(func() error {
for timeutil.Now().Before(deadline) {
resp, pErr := r.Send(gCtx, baRead)
if pErr != nil {
return errors.Wrapf(pErr.GoError(), "on %s", r)
}
rows := resp.Responses[0].GetInner().(*roachpb.ScanResponse).Rows
// Should see the write.
if len(rows) != 1 {
return fmt.Errorf("expected one row, but got %d", len(rows))
}
}
return nil
})
}
for _, r := range repls {
ensureCanReadFromReplicaUntilDeadline(r)
}
if err := g.Wait(); err != nil {
t.Fatal(err)
}
}

// Every 0.1s=100ms, try close out a timestamp ~300ms in the past.
// We don't want to be more aggressive than that since it's also
// a limit on how long transactions can run.
const targetDuration = 300 * time.Millisecond
const closeFraction = 0.333
const numNodes = 3

func replsForRange(
ctx context.Context,
t *testing.T,
tc serverutils.TestClusterInterface,
desc roachpb.RangeDescriptor,
) (repls []*storage.Replica) {
testutils.SucceedsSoon(t, func() error {
repls = nil
for i := 0; i < numNodes; i++ {
repl, err := tc.Server(i).GetStores().(*storage.Stores).GetReplicaForRangeID(desc.RangeID)
if err != nil {
return err
}
if repl != nil {
repls = append(repls, repl)
}
}
return nil
})
return repls
}

// This gnarly helper function creates a test cluster that is prepared to
// exercise follower reads. The returned test cluster has follower reads enabled
// using the above targetDuration and closeFraction. In addition to the newly
// minted test cluster, this function returns a db handle to node 0, a range
// descriptor for the range used by the table `cttest.kv` and the replica
// objects corresponding to the replicas for the range. It is the caller's
// responsibility to Stop the Stopper on the returned test cluster when done.
func setupTestClusterForClosedTimestampTesting(
ctx context.Context, t *testing.T,
) (
tc serverutils.TestClusterInterface,
db0 *gosql.DB,
kvTableDesc roachpb.RangeDescriptor,
repls []*storage.Replica,
) {

tc = serverutils.StartTestCluster(t, numNodes, base.TestClusterArgs{})
db0 = tc.ServerConn(0)

if _, err := db0.Exec(fmt.Sprintf(`
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '%s';
Expand Down Expand Up @@ -106,24 +277,8 @@ CREATE TABLE cttest.kv (id INT PRIMARY KEY, value STRING);
break
}
}

var repls []*storage.Replica
testutils.SucceedsSoon(t, func() error {
repls = nil
for i := 0; i < numNodes; i++ {
repl, err := tc.Server(i).GetStores().(*storage.Stores).GetReplicaForRangeID(desc.RangeID)
if err != nil {
return err
}
if repl != nil {
repls = append(repls, repl)
}
}
return nil
})

repls = replsForRange(ctx, t, tc, desc)
require.Equal(t, numReplicas, len(repls))

// Wait until we see an epoch based lease on our chosen range. This should
// happen fairly quickly since we just transferred a lease (as a means to make
// it epoch based). If the lease transfer fails, we'll be sitting out the lease
Expand All @@ -138,74 +293,54 @@ CREATE TABLE cttest.kv (id INT PRIMARY KEY, value STRING);
}
}
}
return tc, db0, desc, repls
}

if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil {
t.Fatal(err)
func verifyCanReadFromAllRepls(
ctx context.Context,
t *testing.T,
desc roachpb.RangeDescriptor,
ts hlc.Timestamp,
repls []*storage.Replica,
expectedRows int,
) error {
t.Helper()
baRead := makeReadBatchRequestForDesc(desc, ts)
// The read should succeed once enough time (~300ms, but it's difficult to
// assert on that) has passed - on all replicas!

for _, repl := range repls {
resp, pErr := repl.Send(ctx, baRead)
if pErr != nil {
switch tErr := pErr.GetDetail().(type) {
case *roachpb.NotLeaseHolderError:
log.Infof(ctx, "got not lease holder error, here's the lease: %v %v %v %v", *tErr.Lease, tErr.Lease.Start.GoTime(), ts.GoTime(), ts.GoTime().Sub(tErr.Lease.Start.GoTime()))
return tErr
case *roachpb.RangeNotFoundError:
// Can happen during upreplication.
return tErr
default:
t.Fatal(errors.Wrapf(pErr.GoError(), "on %s", repl))
}
}
rows := resp.Responses[0].GetInner().(*roachpb.ScanResponse).Rows
// Should see the write.
if len(rows) != expectedRows {
t.Fatalf("expected %d rows, but got %d", expectedRows, len(rows))
}
}
return nil
}

func makeReadBatchRequestForDesc(
desc roachpb.RangeDescriptor, ts hlc.Timestamp,
) roachpb.BatchRequest {
var baRead roachpb.BatchRequest
baRead.Header.RangeID = desc.RangeID
r := &roachpb.ScanRequest{}
r.Key = desc.StartKey.AsRawKey()
r.EndKey = desc.EndKey.AsRawKey()
baRead.Add(r)
baRead.Timestamp = hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}

// The read should succeed once enough time (~300ms, but it's difficult to
// assert on that) has passed - on all replicas!
testutils.SucceedsSoon(t, func() error {
for _, repl := range repls {
resp, pErr := repl.Send(ctx, baRead)
if pErr != nil {
switch tErr := pErr.GetDetail().(type) {
case *roachpb.NotLeaseHolderError:
return tErr
case *roachpb.RangeNotFoundError:
// Can happen during upreplication.
return tErr
default:
t.Fatal(errors.Wrapf(pErr.GoError(), "on %s", repl))
}
}
rows := resp.Responses[0].GetInner().(*roachpb.ScanResponse).Rows
// Should see the write.
if len(rows) != 1 {
t.Fatalf("expected one row, but got %d", len(rows))
}
}
return nil
})

// We just served a follower read. As a sanity check, make sure that we can't write at
// that same timestamp.
{
var baWrite roachpb.BatchRequest
r := &roachpb.DeleteRequest{}
r.Key = desc.StartKey.AsRawKey()
txn := roachpb.MakeTransaction("testwrite", r.Key, roachpb.NormalUserPriority, baRead.Timestamp, 100)
baWrite.Txn = &txn
baWrite.Add(r)
baWrite.RangeID = repls[0].RangeID
if err := baWrite.SetActiveTimestamp(tc.Server(0).Clock().Now); err != nil {
t.Fatal(err)
}

var found bool
for _, repl := range repls {
resp, pErr := repl.Send(ctx, baWrite)
if _, ok := pErr.GoError().(*roachpb.NotLeaseHolderError); ok {
continue
} else if pErr != nil {
t.Fatal(pErr)
}
found = true
if !baRead.Timestamp.Less(resp.Txn.Timestamp) || resp.Txn.OrigTimestamp == resp.Txn.Timestamp {
t.Fatal("timestamp did not get bumped")
}
break
}
if !found {
t.Fatal("unable to send to any replica")
}
}
baRead.Timestamp = ts
return baRead
}
1 change: 0 additions & 1 deletion pkg/storage/closedts/closedts.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ type Provider interface {
Producer
Notifyee
Start()
CanServe(roachpb.NodeID, hlc.Timestamp, roachpb.RangeID, ctpb.Epoch, ctpb.LAI) bool
MaxClosed(roachpb.NodeID, roachpb.RangeID, ctpb.Epoch, ctpb.LAI) hlc.Timestamp
}

Expand Down
Loading

0 comments on commit 7037b54

Please sign in to comment.