From 7037b544db927a2693fd94c2effc007a8d0dc2bf Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Wed, 20 Feb 2019 18:25:26 -0500 Subject: [PATCH] storage: use lease start time to determine follower read safety This change recognizes that lease transfer start times have provide write invariants equivalent to those provided by the closed timestamp and MLAI and thus the maximum closed timestamp for a replica can be viewed as the maximum timestamp of that of the current replica lease start time and the timestamp provided by the closed timestamp provider. Utilizing this fact makes follower reads as well as rangefeeds more robust to lease transfers. Before this change lease transfers to different nodes might cause closed timestamp information for a range to become unavailable leading to an inability for a range to serve a read at a timestamp which was previously determined to be safe. Fixes #35129. Release note: None --- pkg/storage/closed_timestamp_test.go | 307 +++++++++++++----- pkg/storage/closedts/closedts.go | 1 - .../closedts/container/container_test.go | 96 +++--- pkg/storage/closedts/container/noop.go | 5 - pkg/storage/closedts/provider/provider.go | 25 +- pkg/storage/replica_follower_read.go | 37 ++- pkg/storage/replica_rangefeed.go | 10 +- 7 files changed, 297 insertions(+), 184 deletions(-) diff --git a/pkg/storage/closed_timestamp_test.go b/pkg/storage/closed_timestamp_test.go index 6497dae66483..d6231cfd3191 100644 --- a/pkg/storage/closed_timestamp_test.go +++ b/pkg/storage/closed_timestamp_test.go @@ -16,7 +16,9 @@ package storage_test import ( "context" + gosql "database/sql" "fmt" + "math/rand" "testing" "time" @@ -28,9 +30,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/pkg/errors" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) func TestClosedTimestampCanServe(t *testing.T) { @@ -44,17 +48,184 @@ func TestClosedTimestampCanServe(t *testing.T) { } ctx := context.Background() - const numNodes = 3 + tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t) + defer tc.Stopper().Stop(ctx) + + if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { + t.Fatal(err) + } + + ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + testutils.SucceedsSoon(t, func() error { + return verifyCanReadFromAllRepls(ctx, t, desc, ts, repls, 1) + }) + + // We just served a follower read. As a sanity check, make sure that we can't write at + // that same timestamp. + { + var baWrite roachpb.BatchRequest + r := &roachpb.DeleteRequest{} + r.Key = desc.StartKey.AsRawKey() + txn := roachpb.MakeTransaction("testwrite", r.Key, roachpb.NormalUserPriority, ts, 100) + baWrite.Txn = &txn + baWrite.Add(r) + baWrite.RangeID = repls[0].RangeID + if err := baWrite.SetActiveTimestamp(tc.Server(0).Clock().Now); err != nil { + t.Fatal(err) + } - tc := serverutils.StartTestCluster(t, numNodes, base.TestClusterArgs{}) + var found bool + for _, repl := range repls { + resp, pErr := repl.Send(ctx, baWrite) + if _, ok := pErr.GoError().(*roachpb.NotLeaseHolderError); ok { + continue + } else if pErr != nil { + t.Fatal(pErr) + } + found = true + if !ts.Less(resp.Txn.Timestamp) || resp.Txn.OrigTimestamp == resp.Txn.Timestamp { + t.Fatal("timestamp did not get bumped") + } + break + } + if !found { + t.Fatal("unable to send to any replica") + } + } +} + +func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) { + defer leaktest.AfterTest(t)() + + if util.RaceEnabled { + // Limiting how long transactions can run does not work + // well with race unless we're extremely lenient, which + // drives up the test duration. + t.Skip("skipping under race") + } + ctx := context.Background() + tc, db0, desc, repls := setupTestClusterForClosedTimestampTesting(ctx, t) defer tc.Stopper().Stop(ctx) - db0 := tc.ServerConn(0) - // Every 0.1s=100ms, try close out a timestamp ~300ms in the past. - // We don't want to be more aggressive than that since it's also - // a limit on how long transactions can run. - targetDuration := 300 * time.Millisecond - closeFraction := 0.3 + if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { + t.Fatal(err) + } + ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + testutils.SucceedsSoon(t, func() error { + return verifyCanReadFromAllRepls(ctx, t, desc, ts, repls, 1) + }) + + // Once we know that we can read safely at this timestamp, want to ensure + // that we can always read from this timestamp from all replicas even while + // lease transfers are ongoing. The test launches a goroutine to randomly + // trigger transfers every + const testTime = 500 * time.Millisecond + const maxTransferWait = 50 * time.Millisecond + deadline := timeutil.Now().Add(testTime) + g, gCtx := errgroup.WithContext(ctx) + getCurrentLeaseholder := func() (lh roachpb.ReplicationTarget) { + testutils.SucceedsSoon(t, func() error { + var err error + lh, err = tc.FindRangeLeaseHolder(desc, nil) + return err + }) + return lh + } + pickRandomTarget := func(lh roachpb.ReplicationTarget) (t roachpb.ReplicationTarget) { + for { + if t = tc.Target(rand.Intn(len(repls))); t != lh { + return t + } + } + } + transferLeasesRandomlyUntilDeadline := func() error { + for timeutil.Now().Before(deadline) { + lh := getCurrentLeaseholder() + target := pickRandomTarget(lh) + if err := tc.TransferRangeLease(desc, target); err != nil { + return err + } + time.Sleep(time.Duration(rand.Intn(int(maxTransferWait)))) + } + return nil + } + g.Go(transferLeasesRandomlyUntilDeadline) + + // Attempt to send read requests to a replica in a tight loop until deadline + // is reached. If an error is seen on any replica then it is returned to the + // errgroup. + baRead := makeReadBatchRequestForDesc(desc, ts) + ensureCanReadFromReplicaUntilDeadline := func(r *storage.Replica) { + g.Go(func() error { + for timeutil.Now().Before(deadline) { + resp, pErr := r.Send(gCtx, baRead) + if pErr != nil { + return errors.Wrapf(pErr.GoError(), "on %s", r) + } + rows := resp.Responses[0].GetInner().(*roachpb.ScanResponse).Rows + // Should see the write. + if len(rows) != 1 { + return fmt.Errorf("expected one row, but got %d", len(rows)) + } + } + return nil + }) + } + for _, r := range repls { + ensureCanReadFromReplicaUntilDeadline(r) + } + if err := g.Wait(); err != nil { + t.Fatal(err) + } +} + +// Every 0.1s=100ms, try close out a timestamp ~300ms in the past. +// We don't want to be more aggressive than that since it's also +// a limit on how long transactions can run. +const targetDuration = 300 * time.Millisecond +const closeFraction = 0.333 +const numNodes = 3 + +func replsForRange( + ctx context.Context, + t *testing.T, + tc serverutils.TestClusterInterface, + desc roachpb.RangeDescriptor, +) (repls []*storage.Replica) { + testutils.SucceedsSoon(t, func() error { + repls = nil + for i := 0; i < numNodes; i++ { + repl, err := tc.Server(i).GetStores().(*storage.Stores).GetReplicaForRangeID(desc.RangeID) + if err != nil { + return err + } + if repl != nil { + repls = append(repls, repl) + } + } + return nil + }) + return repls +} + +// This gnarly helper function creates a test cluster that is prepared to +// exercise follower reads. The returned test cluster has follower reads enabled +// using the above targetDuration and closeFraction. In addition to the newly +// minted test cluster, this function returns a db handle to node 0, a range +// descriptor for the range used by the table `cttest.kv` and the replica +// objects corresponding to the replicas for the range. It is the caller's +// responsibility to Stop the Stopper on the returned test cluster when done. +func setupTestClusterForClosedTimestampTesting( + ctx context.Context, t *testing.T, +) ( + tc serverutils.TestClusterInterface, + db0 *gosql.DB, + kvTableDesc roachpb.RangeDescriptor, + repls []*storage.Replica, +) { + + tc = serverutils.StartTestCluster(t, numNodes, base.TestClusterArgs{}) + db0 = tc.ServerConn(0) if _, err := db0.Exec(fmt.Sprintf(` SET CLUSTER SETTING kv.closed_timestamp.target_duration = '%s'; @@ -106,24 +277,8 @@ CREATE TABLE cttest.kv (id INT PRIMARY KEY, value STRING); break } } - - var repls []*storage.Replica - testutils.SucceedsSoon(t, func() error { - repls = nil - for i := 0; i < numNodes; i++ { - repl, err := tc.Server(i).GetStores().(*storage.Stores).GetReplicaForRangeID(desc.RangeID) - if err != nil { - return err - } - if repl != nil { - repls = append(repls, repl) - } - } - return nil - }) - + repls = replsForRange(ctx, t, tc, desc) require.Equal(t, numReplicas, len(repls)) - // Wait until we see an epoch based lease on our chosen range. This should // happen fairly quickly since we just transferred a lease (as a means to make // it epoch based). If the lease transfer fails, we'll be sitting out the lease @@ -138,74 +293,54 @@ CREATE TABLE cttest.kv (id INT PRIMARY KEY, value STRING); } } } + return tc, db0, desc, repls +} - if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { - t.Fatal(err) +func verifyCanReadFromAllRepls( + ctx context.Context, + t *testing.T, + desc roachpb.RangeDescriptor, + ts hlc.Timestamp, + repls []*storage.Replica, + expectedRows int, +) error { + t.Helper() + baRead := makeReadBatchRequestForDesc(desc, ts) + // The read should succeed once enough time (~300ms, but it's difficult to + // assert on that) has passed - on all replicas! + + for _, repl := range repls { + resp, pErr := repl.Send(ctx, baRead) + if pErr != nil { + switch tErr := pErr.GetDetail().(type) { + case *roachpb.NotLeaseHolderError: + log.Infof(ctx, "got not lease holder error, here's the lease: %v %v %v %v", *tErr.Lease, tErr.Lease.Start.GoTime(), ts.GoTime(), ts.GoTime().Sub(tErr.Lease.Start.GoTime())) + return tErr + case *roachpb.RangeNotFoundError: + // Can happen during upreplication. + return tErr + default: + t.Fatal(errors.Wrapf(pErr.GoError(), "on %s", repl)) + } + } + rows := resp.Responses[0].GetInner().(*roachpb.ScanResponse).Rows + // Should see the write. + if len(rows) != expectedRows { + t.Fatalf("expected %d rows, but got %d", expectedRows, len(rows)) + } } + return nil +} +func makeReadBatchRequestForDesc( + desc roachpb.RangeDescriptor, ts hlc.Timestamp, +) roachpb.BatchRequest { var baRead roachpb.BatchRequest baRead.Header.RangeID = desc.RangeID r := &roachpb.ScanRequest{} r.Key = desc.StartKey.AsRawKey() r.EndKey = desc.EndKey.AsRawKey() baRead.Add(r) - baRead.Timestamp = hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} - - // The read should succeed once enough time (~300ms, but it's difficult to - // assert on that) has passed - on all replicas! - testutils.SucceedsSoon(t, func() error { - for _, repl := range repls { - resp, pErr := repl.Send(ctx, baRead) - if pErr != nil { - switch tErr := pErr.GetDetail().(type) { - case *roachpb.NotLeaseHolderError: - return tErr - case *roachpb.RangeNotFoundError: - // Can happen during upreplication. - return tErr - default: - t.Fatal(errors.Wrapf(pErr.GoError(), "on %s", repl)) - } - } - rows := resp.Responses[0].GetInner().(*roachpb.ScanResponse).Rows - // Should see the write. - if len(rows) != 1 { - t.Fatalf("expected one row, but got %d", len(rows)) - } - } - return nil - }) - - // We just served a follower read. As a sanity check, make sure that we can't write at - // that same timestamp. - { - var baWrite roachpb.BatchRequest - r := &roachpb.DeleteRequest{} - r.Key = desc.StartKey.AsRawKey() - txn := roachpb.MakeTransaction("testwrite", r.Key, roachpb.NormalUserPriority, baRead.Timestamp, 100) - baWrite.Txn = &txn - baWrite.Add(r) - baWrite.RangeID = repls[0].RangeID - if err := baWrite.SetActiveTimestamp(tc.Server(0).Clock().Now); err != nil { - t.Fatal(err) - } - - var found bool - for _, repl := range repls { - resp, pErr := repl.Send(ctx, baWrite) - if _, ok := pErr.GoError().(*roachpb.NotLeaseHolderError); ok { - continue - } else if pErr != nil { - t.Fatal(pErr) - } - found = true - if !baRead.Timestamp.Less(resp.Txn.Timestamp) || resp.Txn.OrigTimestamp == resp.Txn.Timestamp { - t.Fatal("timestamp did not get bumped") - } - break - } - if !found { - t.Fatal("unable to send to any replica") - } - } + baRead.Timestamp = ts + return baRead } diff --git a/pkg/storage/closedts/closedts.go b/pkg/storage/closedts/closedts.go index 5ff655e43989..073ce7af1d56 100644 --- a/pkg/storage/closedts/closedts.go +++ b/pkg/storage/closedts/closedts.go @@ -137,7 +137,6 @@ type Provider interface { Producer Notifyee Start() - CanServe(roachpb.NodeID, hlc.Timestamp, roachpb.RangeID, ctpb.Epoch, ctpb.LAI) bool MaxClosed(roachpb.NodeID, roachpb.RangeID, ctpb.Epoch, ctpb.LAI) hlc.Timestamp } diff --git a/pkg/storage/closedts/container/container_test.go b/pkg/storage/closedts/container/container_test.go index e8eb837a869b..f2c19928ae3d 100644 --- a/pkg/storage/closedts/container/container_test.go +++ b/pkg/storage/closedts/container/container_test.go @@ -143,11 +143,11 @@ func TestTwoNodes(t *testing.T) { }() // Initially, can't serve random things for either n1 or n2. - require.False(t, c1.Container.Provider.CanServe( - c1.NodeID, hlc.Timestamp{}, roachpb.RangeID(5), ctpb.Epoch(0), ctpb.LAI(0)), + require.True(t, c1.Container.Provider.MaxClosed( + c1.NodeID, roachpb.RangeID(5), ctpb.Epoch(0), ctpb.LAI(0)).IsEmpty(), ) - require.False(t, c1.Container.Provider.CanServe( - c2.NodeID, hlc.Timestamp{}, roachpb.RangeID(5), ctpb.Epoch(0), ctpb.LAI(0)), + require.True(t, c1.Container.Provider.MaxClosed( + c2.NodeID, roachpb.RangeID(5), ctpb.Epoch(0), ctpb.LAI(0)).IsEmpty(), ) // Track and release a command. @@ -173,8 +173,8 @@ func TestTwoNodes(t *testing.T) { // 0.1 - this is because it has no information about any ranges at that timestamp. // (Note that the Tracker may not have processed the closing yet, so if there were // a bug here, this test would fail flakily - that's ok). - require.False(t, c1.Container.Provider.CanServe( - c1.NodeID, hlc.Timestamp{Logical: 1}, roachpb.RangeID(17), ctpb.Epoch(1), ctpb.LAI(12)), + require.True(t, c1.Container.Provider.MaxClosed( + c1.NodeID, roachpb.RangeID(17), ctpb.Epoch(1), ctpb.LAI(12)).IsEmpty(), ) // Two more commands come in. @@ -191,23 +191,23 @@ func TestTwoNodes(t *testing.T) { c1.TestClock.Tick(hlc.Timestamp{WallTime: 3E9}, ctpb.Epoch(1), nil) testutils.SucceedsSoon(t, func() error { - if !c1.Container.Provider.CanServe( - c1.NodeID, hlc.Timestamp{WallTime: 1E9}, roachpb.RangeID(17), ctpb.Epoch(1), ctpb.LAI(12), - ) { + if c1.Container.Provider.MaxClosed( + c1.NodeID, roachpb.RangeID(17), ctpb.Epoch(1), ctpb.LAI(12), + ).Less(hlc.Timestamp{WallTime: 1E9}) { return errors.New("still can't serve") } return nil }) // Shouldn't be able to serve the same thing if we haven't caught up yet. - require.False(t, c1.Container.Provider.CanServe( - c1.NodeID, hlc.Timestamp{WallTime: 1E9}, roachpb.RangeID(17), ctpb.Epoch(1), ctpb.LAI(11), - )) + require.False(t, !c1.Container.Provider.MaxClosed( + c1.NodeID, roachpb.RangeID(17), ctpb.Epoch(1), ctpb.LAI(11), + ).Less(hlc.Timestamp{WallTime: 1E9})) // Shouldn't be able to serve at a higher timestamp. - require.False(t, c1.Container.Provider.CanServe( - c1.NodeID, hlc.Timestamp{WallTime: 1E9, Logical: 1}, roachpb.RangeID(17), ctpb.Epoch(1), ctpb.LAI(12), - )) + require.False(t, !c1.Container.Provider.MaxClosed( + c1.NodeID, roachpb.RangeID(17), ctpb.Epoch(1), ctpb.LAI(12), + ).Less(hlc.Timestamp{WallTime: 1E9, Logical: 1})) // Now things get a little more interesting. Tell node2 to get a stream of // information from node1. We do this via Request, which as a side effect lets @@ -226,9 +226,9 @@ func TestTwoNodes(t *testing.T) { // And n2 should soon also be able to serve follower reads for a range lead by // n1 when it has caught up. testutils.SucceedsSoon(t, func() error { - if !c2.Container.Provider.CanServe( - c1.NodeID, hlc.Timestamp{WallTime: 1E9}, roachpb.RangeID(17), ctpb.Epoch(1), ctpb.LAI(12), - ) { + if c2.Container.Provider.MaxClosed( + c1.NodeID, roachpb.RangeID(17), ctpb.Epoch(1), ctpb.LAI(12), + ).Less(hlc.Timestamp{WallTime: 1E9}) { return errors.New("n2 still can't serve") } return nil @@ -249,28 +249,28 @@ func TestTwoNodes(t *testing.T) { {8, 88}, } { testutils.SucceedsSoon(t, func() error { - if !c.Container.Provider.CanServe( - c1.NodeID, ts, tuple.RangeID, ctpb.Epoch(1), tuple.LAI, - ) { + if c.Container.Provider.MaxClosed( + c1.NodeID, tuple.RangeID, ctpb.Epoch(1), tuple.LAI, + ).Less(ts) { return errors.Errorf("n%d still can't serve (r%d,%d) @ %s", i+1, tuple.RangeID, tuple.LAI, ts) } return nil }) // Still can't serve when not caught up. - require.False(t, c.Container.Provider.CanServe( - c1.NodeID, ts, tuple.RangeID, ctpb.Epoch(1), tuple.LAI-1, - )) + require.False(t, !c.Container.Provider.MaxClosed( + c1.NodeID, tuple.RangeID, ctpb.Epoch(1), tuple.LAI-1, + ).Less(ts)) // Can serve when more than caught up. - require.True(t, c.Container.Provider.CanServe( - c1.NodeID, ts, tuple.RangeID, ctpb.Epoch(1), tuple.LAI+1, - )) + require.True(t, !c.Container.Provider.MaxClosed( + c1.NodeID, tuple.RangeID, ctpb.Epoch(1), tuple.LAI+1, + ).Less(ts)) // Can't serve when in different epoch, no matter larger or smaller. - require.False(t, c.Container.Provider.CanServe( - c1.NodeID, ts, tuple.RangeID, ctpb.Epoch(0), tuple.LAI, - )) - require.False(t, c.Container.Provider.CanServe( - c1.NodeID, ts, tuple.RangeID, ctpb.Epoch(2), tuple.LAI, - )) + require.False(t, !c.Container.Provider.MaxClosed( + c1.NodeID, tuple.RangeID, ctpb.Epoch(0), tuple.LAI, + ).Less(ts)) + require.False(t, !c.Container.Provider.MaxClosed( + c1.NodeID, tuple.RangeID, ctpb.Epoch(2), tuple.LAI, + ).Less(ts)) } } } @@ -322,30 +322,30 @@ func TestTwoNodes(t *testing.T) { ts := hlc.Timestamp{WallTime: int64(container.StorageBucketScale) + 5E9} testutils.SucceedsSoon(t, func() error { - if !c.Container.Provider.CanServe( - c1.NodeID, ts, rangeID, epoch, lai, - ) { + if c.Container.Provider.MaxClosed( + c1.NodeID, rangeID, epoch, lai, + ).Less(ts) { return errors.Errorf("n%d still can't serve (r%d,%d) @ %s", i+1, rangeID, lai, ts) } return nil }) // Still can't serve when not caught up. - require.False(t, c.Container.Provider.CanServe( - c1.NodeID, ts, rangeID, epoch, lai-1, - )) + require.False(t, !c.Container.Provider.MaxClosed( + c1.NodeID, rangeID, epoch, lai-1, + ).Less(ts)) // Can serve when more than caught up. - require.True(t, c.Container.Provider.CanServe( - c1.NodeID, ts, rangeID, epoch, lai+1, - )) + require.True(t, !c.Container.Provider.MaxClosed( + c1.NodeID, rangeID, epoch, lai+1, + ).Less(ts)) // Can't serve when in different epoch, no matter larger or smaller. - require.False(t, c.Container.Provider.CanServe( - c1.NodeID, ts, rangeID, epoch-1, lai, - )) - require.False(t, c.Container.Provider.CanServe( - c1.NodeID, ts, rangeID, epoch+1, lai, - )) + require.False(t, !c.Container.Provider.MaxClosed( + c1.NodeID, rangeID, epoch-1, lai, + ).Less(ts)) + require.False(t, !c.Container.Provider.MaxClosed( + c1.NodeID, rangeID, epoch+1, lai, + ).Less(ts)) } } diff --git a/pkg/storage/closedts/container/noop.go b/pkg/storage/closedts/container/noop.go index 49ebb81c4b3b..71fec28dd020 100644 --- a/pkg/storage/closedts/container/noop.go +++ b/pkg/storage/closedts/container/noop.go @@ -73,11 +73,6 @@ func (noopEverything) Notify(roachpb.NodeID) chan<- ctpb.Entry { } func (noopEverything) Subscribe(context.Context, chan<- ctpb.Entry) {} func (noopEverything) Start() {} -func (noopEverything) CanServe( - roachpb.NodeID, hlc.Timestamp, roachpb.RangeID, ctpb.Epoch, ctpb.LAI, -) bool { - return false -} func (noopEverything) MaxClosed( roachpb.NodeID, roachpb.RangeID, ctpb.Epoch, ctpb.LAI, ) hlc.Timestamp { diff --git a/pkg/storage/closedts/provider/provider.go b/pkg/storage/closedts/provider/provider.go index 4f4ebb4ee677..ea08ef523378 100644 --- a/pkg/storage/closedts/provider/provider.go +++ b/pkg/storage/closedts/provider/provider.go @@ -330,35 +330,12 @@ func (p *Provider) Subscribe(ctx context.Context, ch chan<- ctpb.Entry) { } } -// CanServe implements closedts.Provider. -func (p *Provider) CanServe( - nodeID roachpb.NodeID, ts hlc.Timestamp, rangeID roachpb.RangeID, epoch ctpb.Epoch, lai ctpb.LAI, -) bool { - var ok bool - p.cfg.Storage.VisitDescending(nodeID, func(entry ctpb.Entry) bool { - mlai, found := entry.MLAI[rangeID] - ctOK := !entry.ClosedTimestamp.Less(ts) - - ok = found && - ctOK && - entry.Epoch == epoch && - mlai <= lai - - // We're done either if we proved that the read is possible, or if we're - // already done looking at closed timestamps large enough to satisfy it. - done := ok || !ctOK - return done - }) - - return ok -} - // MaxClosed implements closedts.Provider. func (p *Provider) MaxClosed( nodeID roachpb.NodeID, rangeID roachpb.RangeID, epoch ctpb.Epoch, lai ctpb.LAI, ) hlc.Timestamp { var maxTS hlc.Timestamp - p.cfg.Storage.VisitDescending(nodeID, func(entry ctpb.Entry) bool { + p.cfg.Storage.VisitDescending(nodeID, func(entry ctpb.Entry) (done bool) { if mlai, found := entry.MLAI[rangeID]; found { if entry.Epoch == epoch && mlai <= lai { maxTS = entry.ClosedTimestamp diff --git a/pkg/storage/replica_follower_read.go b/pkg/storage/replica_follower_read.go index 64501e5a9cfe..d804ee122a48 100644 --- a/pkg/storage/replica_follower_read.go +++ b/pkg/storage/replica_follower_read.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/storage/closedts/ctpb" ctstorage "github.com/cockroachdb/cockroach/pkg/storage/closedts/storage" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" ) @@ -45,24 +46,18 @@ func (r *Replica) canServeFollowerRead( FollowerReadsEnabled.Get(&r.store.cfg.Settings.SV) && lErr.LeaseHolder != nil && lErr.Lease.Type() == roachpb.LeaseEpoch { - r.mu.RLock() - lai := r.mu.state.LeaseAppliedIndex - r.mu.RUnlock() - canServeFollowerRead = r.store.cfg.ClosedTimestamp.Provider.CanServe( - lErr.LeaseHolder.NodeID, ba.Timestamp, r.RangeID, ctpb.Epoch(lErr.Lease.Epoch), ctpb.LAI(lai), - ) - + canServeFollowerRead = !r.maxClosed(ctx).Less(ba.Timestamp) if !canServeFollowerRead { - // We can't actually serve the read. Signal the clients that we want - // an update so that future requests can succeed. + // We can't actually serve the read based on the closed timestamp. + // Signal the clients that we want an update so that future requests can succeed. r.store.cfg.ClosedTimestamp.Clients.Request(lErr.LeaseHolder.NodeID, r.RangeID) if false { // NB: this can't go behind V(x) because the log message created by the // storage might be gigantic in real clusters, and we don't want to trip it // using logspy. - log.Warningf(ctx, "can't serve follower read for %s at epo %d lai %d, storage is %s", - ba.Timestamp, lErr.Lease.Epoch, lai, + log.Warningf(ctx, "can't serve follower read for %s at epo %d, storage is %s", + ba.Timestamp, lErr.Lease.Epoch, r.store.cfg.ClosedTimestamp.Storage.(*ctstorage.MultiStorage).StringForNodes(lErr.LeaseHolder.NodeID), ) } @@ -81,3 +76,23 @@ func (r *Replica) canServeFollowerRead( log.Event(ctx, "serving via follower read") return nil } + +// maxClosed returns the maximum closed timestamp for this range. +// It is computed as the most recent of the known closed timestamp for the +// current lease holder for this range as tracked by the closed timestamp +// subsystem and the start time of the current lease. It is safe to use the +// start time of the current lease because leasePostApply bumps the timestamp +// cache forward to at least the new lease start time. Using this combination +// allows the closed timestamp mechanism to be robust to lease transfers. +func (r *Replica) maxClosed(ctx context.Context) hlc.Timestamp { + r.mu.RLock() + lai := r.mu.state.LeaseAppliedIndex + lease := *r.mu.state.Lease + r.mu.RUnlock() + maxClosed := r.store.cfg.ClosedTimestamp.Provider.MaxClosed( + lease.Replica.NodeID, r.RangeID, ctpb.Epoch(lease.Epoch), ctpb.LAI(lai)) + if maxClosed.IsEmpty() || maxClosed.Less(lease.Start) { + maxClosed = lease.Start + } + return maxClosed +} diff --git a/pkg/storage/replica_rangefeed.go b/pkg/storage/replica_rangefeed.go index f961920b958b..b425822f4459 100644 --- a/pkg/storage/replica_rangefeed.go +++ b/pkg/storage/replica_rangefeed.go @@ -24,7 +24,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" - "github.com/cockroachdb/cockroach/pkg/storage/closedts/ctpb" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/intentresolver" @@ -414,15 +413,8 @@ func (r *Replica) handleClosedTimestampUpdateRaftMuLocked() { return } - r.mu.RLock() - lai := r.mu.state.LeaseAppliedIndex - lease := *r.mu.state.Lease - r.mu.RUnlock() - // Determine what the maximum closed timestamp is for this replica. - closedTS := r.store.cfg.ClosedTimestamp.Provider.MaxClosed( - lease.Replica.NodeID, r.RangeID, ctpb.Epoch(lease.Epoch), ctpb.LAI(lai), - ) + closedTS := r.maxClosed(context.Background()) // If the closed timestamp is not empty, inform the Processor. if closedTS.IsEmpty() {