Skip to content

Commit

Permalink
Merge pull request cockroachdb#21304 from spencerkimball/update-resum…
Browse files Browse the repository at this point in the history
…e-span-calcs

storage: use resume span to set ts cache
  • Loading branch information
spencerkimball committed Jan 8, 2018
2 parents 1f5b9e9 + 22f80e7 commit 742b93b
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 19 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1287,7 +1287,7 @@ func TestStopAtRangeBoundary(t *testing.T) {
},
satisfied: []int{0, 2},
},
// scanning [<where we left of>,inf)
// scanning [<where we left off>,inf)
{
expResults: map[int][]string{
1: {"b1"},
Expand Down
29 changes: 11 additions & 18 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -2063,28 +2063,21 @@ func (r *Replica) updateTimestampCache(ba *roachpb.BatchRequest, br *roachpb.Bat
tc.Add(key, nil, ts, txnID, false /* readCache */)
case *roachpb.ScanRequest:
resp := br.Responses[i].GetInner().(*roachpb.ScanResponse)
if ba.Header.MaxSpanRequestKeys != 0 &&
ba.Header.MaxSpanRequestKeys == int64(len(resp.Rows)) {
// If the scan requested a limited number of results and we hit the
// limit, truncate the span of keys to add to the timestamp cache
// to those that were returned.
//
// NB: We need to include any key read by the operation, even if
// not returned, in the span added to the timestamp cache in order
// to prevent phantom read anomalies. That means we can only
// perform this truncate if the scan requested a limited number of
// results and we hit that limit.
end = resp.Rows[len(resp.Rows)-1].Key.Next()
if resp.ResumeSpan != nil {
// Note that for forward scan, the resume span will start at
// the (last key read).Next(), which is actually the correct
// end key for the span to update the timestamp cache.
end = resp.ResumeSpan.Key
}
tc.Add(start, end, ts, txnID, readOnlyUseReadCache)
case *roachpb.ReverseScanRequest:
resp := br.Responses[i].GetInner().(*roachpb.ReverseScanResponse)
if ba.Header.MaxSpanRequestKeys != 0 &&
ba.Header.MaxSpanRequestKeys == int64(len(resp.Rows)) {
// See comment in the ScanRequest case. For revert scans, results
// are returned in reverse order and we truncate the start key of
// the span.
start = resp.Rows[len(resp.Rows)-1].Key
if resp.ResumeSpan != nil {
// Note that for reverse scans, the resume span's end key is
// an open interval. That means it was read as part of this op
// and won't be read on resume. It is the correct start key for
// the span to update the timestamp cache.
start = resp.ResumeSpan.EndKey
}
tc.Add(start, end, ts, txnID, readOnlyUseReadCache)
default:
Expand Down
9 changes: 9 additions & 0 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1465,6 +1465,15 @@ func scanArgs(start, end []byte) roachpb.ScanRequest {
}
}

func reverseScanArgs(start, end []byte) roachpb.ReverseScanRequest {
return roachpb.ReverseScanRequest{
Span: roachpb.Span{
Key: start,
EndKey: end,
},
}
}

func beginTxnArgs(
key []byte, txn *roachpb.Transaction,
) (roachpb.BeginTransactionRequest, roachpb.Header) {
Expand Down
81 changes: 81 additions & 0 deletions pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1725,6 +1725,87 @@ func TestStoreReadInconsistent(t *testing.T) {
}
}

// TestStoreScanResumeTSCache verifies that the timestamp cache is
// properly updated when scans and reverse scans return partial
// results and a resume span.
func TestStoreScanResumeTSCache(t *testing.T) {
defer leaktest.AfterTest(t)()

stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
store, manualClock := createTestStore(t, stopper)

// Write three keys at time t0.
t0 := 1 * time.Second
manualClock.Set(t0.Nanoseconds())
h := roachpb.Header{Timestamp: makeTS(t0.Nanoseconds(), 0)}
for _, keyStr := range []string{"a", "b", "c"} {
key := roachpb.Key(keyStr)
putArgs := putArgs(key, []byte("value"))
if _, pErr := client.SendWrappedWith(context.Background(), store.testSender(), h, &putArgs); pErr != nil {
t.Fatal(pErr)
}
}

// Scan the span at t1 with max keys and verify the expected resume span.
span := roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("d")}
sArgs := scanArgs(span.Key, span.EndKey)
t1 := 2 * time.Second
manualClock.Set(t1.Nanoseconds())
h.Timestamp = makeTS(t1.Nanoseconds(), 0)
h.MaxSpanRequestKeys = 2
reply, pErr := client.SendWrappedWith(context.Background(), store.testSender(), h, &sArgs)
if pErr != nil {
t.Fatal(pErr)
}
sReply := reply.(*roachpb.ScanResponse)
if a, e := len(sReply.Rows), 2; a != e {
t.Errorf("expected %d rows; got %d", e, a)
}
expResumeSpan := &roachpb.Span{Key: roachpb.Key("c"), EndKey: roachpb.Key("d")}
if a, e := sReply.ResumeSpan, expResumeSpan; !reflect.DeepEqual(a, e) {
t.Errorf("expected resume span %s; got %s", e, a)
}

// Verify the timestamp cache has been set for "b".Next(), but not for "c".
rTS, _ := store.tsCache.GetMaxRead(roachpb.Key("b").Next(), nil)
if a, e := rTS, makeTS(t1.Nanoseconds(), 0); a != e {
t.Errorf("expected timestamp cache for \"b\".Next() set to %s; got %s", e, a)
}
rTS, _ = store.tsCache.GetMaxRead(roachpb.Key("c"), nil)
if a, lt := rTS, makeTS(t1.Nanoseconds(), 0); !a.Less(lt) {
t.Errorf("expected timestamp cache for \"c\" set less than %s; got %s", lt, a)
}

// Reverse scan the span at t1 with max keys and verify the expected resume span.
t2 := 3 * time.Second
manualClock.Set(t2.Nanoseconds())
h.Timestamp = makeTS(t2.Nanoseconds(), 0)
rsArgs := reverseScanArgs(span.Key, span.EndKey)
reply, pErr = client.SendWrappedWith(context.Background(), store.testSender(), h, &rsArgs)
if pErr != nil {
t.Fatal(pErr)
}
rsReply := reply.(*roachpb.ReverseScanResponse)
if a, e := len(rsReply.Rows), 2; a != e {
t.Errorf("expected %d rows; got %d", e, a)
}
expResumeSpan = &roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("a").Next()}
if a, e := rsReply.ResumeSpan, expResumeSpan; !reflect.DeepEqual(a, e) {
t.Errorf("expected resume span %s; got %s", e, a)
}

// Verify the timestamp cache has been set for "a".Next(), but not for "a".
rTS, _ = store.tsCache.GetMaxRead(roachpb.Key("a").Next(), nil)
if a, e := rTS, makeTS(t2.Nanoseconds(), 0); a != e {
t.Errorf("expected timestamp cache for \"a\".Next() set to %s; got %s", e, a)
}
rTS, _ = store.tsCache.GetMaxRead(roachpb.Key("a"), nil)
if a, lt := rTS, makeTS(t2.Nanoseconds(), 0); !a.Less(lt) {
t.Errorf("expected timestamp cache for \"a\" set less than %s; got %s", lt, a)
}
}

// TestStoreScanIntents verifies that a scan across 10 intents resolves
// them in one fell swoop using both consistent and inconsistent reads.
func TestStoreScanIntents(t *testing.T) {
Expand Down

0 comments on commit 742b93b

Please sign in to comment.