From 22f80e79615d6e880467d2d9a77f524bd2046885 Mon Sep 17 00:00:00 2001 From: Spencer Kimball Date: Sun, 7 Jan 2018 12:22:40 -0500 Subject: [PATCH] storage: use resume span to set ts cache Previously, we were updating the timestamp cache based on the rows returned during scans and reverse scans. However, not only was this inconsistent with how we gather intent spans for resolving on end transaction, it was actually out of step with results actually read, meaning we were not setting the timestamp cache properly to reflect results read and returned. Consider the following scenario: - Scan from "a" to "d", max results stops at "b". - Current MVCCScan will seek past "b", find next key (let's say "c") and return resume span "c" - "d" - However, we were updating timestamp cache for the span "a" - "b".Next(). - That leaves all of the keys from "b".Next() - "c" unprotected by the timestamp cache, so history can be rewritten under the span on successive resumes at the same timestamp. Now, we correctly use the resume span to update the timestamp cache, which is both consistent with other uses and correctly updates the timestamp cache based on results actually read. Release note: None --- pkg/kv/dist_sender_server_test.go | 2 +- pkg/storage/replica.go | 29 +++++------ pkg/storage/replica_test.go | 9 ++++ pkg/storage/store_test.go | 81 +++++++++++++++++++++++++++++++ 4 files changed, 102 insertions(+), 19 deletions(-) diff --git a/pkg/kv/dist_sender_server_test.go b/pkg/kv/dist_sender_server_test.go index 9b5d4ec4e6eb..7afcad1307a7 100644 --- a/pkg/kv/dist_sender_server_test.go +++ b/pkg/kv/dist_sender_server_test.go @@ -1287,7 +1287,7 @@ func TestStopAtRangeBoundary(t *testing.T) { }, satisfied: []int{0, 2}, }, - // scanning [,inf) + // scanning [,inf) { expResults: map[int][]string{ 1: {"b1"}, diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 6cafe0c588d4..1afc046ce5a8 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -2063,28 +2063,21 @@ func (r *Replica) updateTimestampCache(ba *roachpb.BatchRequest, br *roachpb.Bat tc.Add(key, nil, ts, txnID, false /* readCache */) case *roachpb.ScanRequest: resp := br.Responses[i].GetInner().(*roachpb.ScanResponse) - if ba.Header.MaxSpanRequestKeys != 0 && - ba.Header.MaxSpanRequestKeys == int64(len(resp.Rows)) { - // If the scan requested a limited number of results and we hit the - // limit, truncate the span of keys to add to the timestamp cache - // to those that were returned. - // - // NB: We need to include any key read by the operation, even if - // not returned, in the span added to the timestamp cache in order - // to prevent phantom read anomalies. That means we can only - // perform this truncate if the scan requested a limited number of - // results and we hit that limit. - end = resp.Rows[len(resp.Rows)-1].Key.Next() + if resp.ResumeSpan != nil { + // Note that for forward scan, the resume span will start at + // the (last key read).Next(), which is actually the correct + // end key for the span to update the timestamp cache. + end = resp.ResumeSpan.Key } tc.Add(start, end, ts, txnID, readOnlyUseReadCache) case *roachpb.ReverseScanRequest: resp := br.Responses[i].GetInner().(*roachpb.ReverseScanResponse) - if ba.Header.MaxSpanRequestKeys != 0 && - ba.Header.MaxSpanRequestKeys == int64(len(resp.Rows)) { - // See comment in the ScanRequest case. For revert scans, results - // are returned in reverse order and we truncate the start key of - // the span. - start = resp.Rows[len(resp.Rows)-1].Key + if resp.ResumeSpan != nil { + // Note that for reverse scans, the resume span's end key is + // an open interval. That means it was read as part of this op + // and won't be read on resume. It is the correct start key for + // the span to update the timestamp cache. + start = resp.ResumeSpan.EndKey } tc.Add(start, end, ts, txnID, readOnlyUseReadCache) default: diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index cb4bf5dc7bcb..28d4231ed7f7 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -1465,6 +1465,15 @@ func scanArgs(start, end []byte) roachpb.ScanRequest { } } +func reverseScanArgs(start, end []byte) roachpb.ReverseScanRequest { + return roachpb.ReverseScanRequest{ + Span: roachpb.Span{ + Key: start, + EndKey: end, + }, + } +} + func beginTxnArgs( key []byte, txn *roachpb.Transaction, ) (roachpb.BeginTransactionRequest, roachpb.Header) { diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 1b5510bfa907..173a9f1a034f 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -1725,6 +1725,87 @@ func TestStoreReadInconsistent(t *testing.T) { } } +// TestStoreScanResumeTSCache verifies that the timestamp cache is +// properly updated when scans and reverse scans return partial +// results and a resume span. +func TestStoreScanResumeTSCache(t *testing.T) { + defer leaktest.AfterTest(t)() + + stopper := stop.NewStopper() + defer stopper.Stop(context.TODO()) + store, manualClock := createTestStore(t, stopper) + + // Write three keys at time t0. + t0 := 1 * time.Second + manualClock.Set(t0.Nanoseconds()) + h := roachpb.Header{Timestamp: makeTS(t0.Nanoseconds(), 0)} + for _, keyStr := range []string{"a", "b", "c"} { + key := roachpb.Key(keyStr) + putArgs := putArgs(key, []byte("value")) + if _, pErr := client.SendWrappedWith(context.Background(), store.testSender(), h, &putArgs); pErr != nil { + t.Fatal(pErr) + } + } + + // Scan the span at t1 with max keys and verify the expected resume span. + span := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("d")} + sArgs := scanArgs(span.Key, span.EndKey) + t1 := 2 * time.Second + manualClock.Set(t1.Nanoseconds()) + h.Timestamp = makeTS(t1.Nanoseconds(), 0) + h.MaxSpanRequestKeys = 2 + reply, pErr := client.SendWrappedWith(context.Background(), store.testSender(), h, &sArgs) + if pErr != nil { + t.Fatal(pErr) + } + sReply := reply.(*roachpb.ScanResponse) + if a, e := len(sReply.Rows), 2; a != e { + t.Errorf("expected %d rows; got %d", e, a) + } + expResumeSpan := &roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")} + if a, e := sReply.ResumeSpan, expResumeSpan; !reflect.DeepEqual(a, e) { + t.Errorf("expected resume span %s; got %s", e, a) + } + + // Verify the timestamp cache has been set for "b".Next(), but not for "c". + rTS, _ := store.tsCache.GetMaxRead(roachpb.Key("b").Next(), nil) + if a, e := rTS, makeTS(t1.Nanoseconds(), 0); a != e { + t.Errorf("expected timestamp cache for \"b\".Next() set to %s; got %s", e, a) + } + rTS, _ = store.tsCache.GetMaxRead(roachpb.Key("c"), nil) + if a, lt := rTS, makeTS(t1.Nanoseconds(), 0); !a.Less(lt) { + t.Errorf("expected timestamp cache for \"c\" set less than %s; got %s", lt, a) + } + + // Reverse scan the span at t1 with max keys and verify the expected resume span. + t2 := 3 * time.Second + manualClock.Set(t2.Nanoseconds()) + h.Timestamp = makeTS(t2.Nanoseconds(), 0) + rsArgs := reverseScanArgs(span.Key, span.EndKey) + reply, pErr = client.SendWrappedWith(context.Background(), store.testSender(), h, &rsArgs) + if pErr != nil { + t.Fatal(pErr) + } + rsReply := reply.(*roachpb.ReverseScanResponse) + if a, e := len(rsReply.Rows), 2; a != e { + t.Errorf("expected %d rows; got %d", e, a) + } + expResumeSpan = &roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("a").Next()} + if a, e := rsReply.ResumeSpan, expResumeSpan; !reflect.DeepEqual(a, e) { + t.Errorf("expected resume span %s; got %s", e, a) + } + + // Verify the timestamp cache has been set for "a".Next(), but not for "a". + rTS, _ = store.tsCache.GetMaxRead(roachpb.Key("a").Next(), nil) + if a, e := rTS, makeTS(t2.Nanoseconds(), 0); a != e { + t.Errorf("expected timestamp cache for \"a\".Next() set to %s; got %s", e, a) + } + rTS, _ = store.tsCache.GetMaxRead(roachpb.Key("a"), nil) + if a, lt := rTS, makeTS(t2.Nanoseconds(), 0); !a.Less(lt) { + t.Errorf("expected timestamp cache for \"a\" set less than %s; got %s", lt, a) + } +} + // TestStoreScanIntents verifies that a scan across 10 intents resolves // them in one fell swoop using both consistent and inconsistent reads. func TestStoreScanIntents(t *testing.T) {