From 6dfd5ac6dee2afff19799756c3e0985cc80794e2 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 21 Dec 2023 11:54:12 -0500 Subject: [PATCH 1/4] kv/tscache: inline sklPage.maxInRange in lookupTimestampRange This was the only caller and the separation of concerns did not seem meaningful. Release note: None --- pkg/kv/kvserver/tscache/interval_skl.go | 59 +++++++++++-------------- 1 file changed, 27 insertions(+), 32 deletions(-) diff --git a/pkg/kv/kvserver/tscache/interval_skl.go b/pkg/kv/kvserver/tscache/interval_skl.go index a757920387a4..2af63a1b792d 100644 --- a/pkg/kv/kvserver/tscache/interval_skl.go +++ b/pkg/kv/kvserver/tscache/interval_skl.go @@ -573,6 +573,8 @@ func newSklPage(arena *arenaskl.Arena) *sklPage { return &sklPage{list: arenaskl.NewSkiplist(arena)} } +// lookupTimestampRange scans the range of keys between from and to and returns +// the maximum (initialized or uninitialized) value found. func (p *sklPage) lookupTimestampRange(from, to []byte, opt rangeOptions) cacheValue { if to != nil { cmp := 0 @@ -601,7 +603,31 @@ func (p *sklPage) lookupTimestampRange(from, to []byte, opt rangeOptions) cacheV it.Init(p.list) it.SeekForPrev(from) - return p.maxInRange(&it, from, to, opt) + // Determine the previous gap value. This will move the iterator to the + // first node >= from. + prevGapVal := p.incomingGapVal(&it, from) + + if !it.Valid() { + // No more nodes. + return prevGapVal + } else if bytes.Equal(it.Key(), from) { + // Found a node at from. + if (it.Meta() & initialized) != 0 { + // The node was initialized. Ignore the previous gap value. + prevGapVal = cacheValue{} + } + } else { + // No node at from. Remove excludeFrom option. + opt &^= excludeFrom + } + + // Scan the rest of the way. Notice that we provide the previous gap value. + // This is important for two reasons: + // 1. it will be counted towards the maxVal result. + // 2. it will be used to ratchet uninitialized nodes that the scan sees + // before any initialized nodes. + _, maxVal := p.scanTo(&it, to, opt, prevGapVal) + return maxVal } // addNode adds a new node at key with the provided value if one does not exist. @@ -979,37 +1005,6 @@ func (p *sklPage) ratchetValueSet( } } -// maxInRange scans the range of keys between from and to and returns the -// maximum (initialized or uninitialized) value found. When finished, the -// iterator will be positioned the same as if it.Seek(to) had been called. -func (p *sklPage) maxInRange(it *arenaskl.Iterator, from, to []byte, opt rangeOptions) cacheValue { - // Determine the previous gap value. This will move the iterator to the - // first node >= from. - prevGapVal := p.incomingGapVal(it, from) - - if !it.Valid() { - // No more nodes. - return prevGapVal - } else if bytes.Equal(it.Key(), from) { - // Found a node at from. - if (it.Meta() & initialized) != 0 { - // The node was initialized. Ignore the previous gap value. - prevGapVal = cacheValue{} - } - } else { - // No node at from. Remove excludeFrom option. - opt &^= excludeFrom - } - - // Scan the rest of the way. Notice that we provide the previous gap value. - // This is important for two reasons: - // 1. it will be counted towards the maxVal result. - // 2. it will be used to ratchet uninitialized nodes that the scan sees - // before any initialized nodes. - _, maxVal := p.scanTo(it, to, opt, prevGapVal) - return maxVal -} - // incomingGapVal determines the gap value active at the specified key by first // scanning backwards to the first initialized node and then scanning forwards // to the specified key. If there is already a node at key then the previous gap From 7edf746e5baeab19bbcdbf2070db2501ca3196dc Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 21 Dec 2023 12:48:21 -0500 Subject: [PATCH 2/4] kv/tscache: implement lookupTimestampRange using a visitor function This commit refactors sklPage.lookupTimestampRange to scan using a visitor function. This will be used in a subsequent commit to implement serialization of an sklPage. Release note: None --- pkg/kv/kvserver/tscache/interval_skl.go | 76 +++++++++++++++---------- 1 file changed, 47 insertions(+), 29 deletions(-) diff --git a/pkg/kv/kvserver/tscache/interval_skl.go b/pkg/kv/kvserver/tscache/interval_skl.go index 2af63a1b792d..4efd5cb46775 100644 --- a/pkg/kv/kvserver/tscache/interval_skl.go +++ b/pkg/kv/kvserver/tscache/interval_skl.go @@ -576,6 +576,16 @@ func newSklPage(arena *arenaskl.Arena) *sklPage { // lookupTimestampRange scans the range of keys between from and to and returns // the maximum (initialized or uninitialized) value found. func (p *sklPage) lookupTimestampRange(from, to []byte, opt rangeOptions) cacheValue { + var maxVal cacheValue + p.visitRange(from, to, opt, func(val cacheValue) { + maxVal, _ = ratchetValue(maxVal, val) + }) + return maxVal +} + +// visitRange scans the range of keys between from and to and calls the visitor +// function for each (initialized or uninitialized) value found. +func (p *sklPage) visitRange(from, to []byte, opt rangeOptions, visit func(cacheValue)) { if to != nil { cmp := 0 if from != nil { @@ -584,13 +594,13 @@ func (p *sklPage) lookupTimestampRange(from, to []byte, opt rangeOptions) cacheV if cmp > 0 { // Starting key is after ending key, so range is zero length. - return cacheValue{} + return } if cmp == 0 { // Starting key is same as ending key. if opt == (excludeFrom | excludeTo) { // Both from and to keys are excluded, so range is zero length. - return cacheValue{} + return } // Scan over a single key. @@ -608,26 +618,25 @@ func (p *sklPage) lookupTimestampRange(from, to []byte, opt rangeOptions) cacheV prevGapVal := p.incomingGapVal(&it, from) if !it.Valid() { - // No more nodes. - return prevGapVal + // No more nodes. Visit the previous gap value and return. + visit(prevGapVal) + return } else if bytes.Equal(it.Key(), from) { - // Found a node at from. + // Found a node at from. No need to visit the gap value. if (it.Meta() & initialized) != 0 { // The node was initialized. Ignore the previous gap value. prevGapVal = cacheValue{} } } else { - // No node at from. Remove excludeFrom option. + // No node at from. Visit the gap value and remove the excludeFrom option. + visit(prevGapVal) opt &^= excludeFrom } // Scan the rest of the way. Notice that we provide the previous gap value. - // This is important for two reasons: - // 1. it will be counted towards the maxVal result. - // 2. it will be used to ratchet uninitialized nodes that the scan sees - // before any initialized nodes. - _, maxVal := p.scanTo(&it, to, opt, prevGapVal) - return maxVal + // This is important because it will be used to ratchet uninitialized nodes + // that the scan sees before any initialized nodes. + p.scanTo(&it, to, opt, prevGapVal, visit) } // addNode adds a new node at key with the provided value if one does not exist. @@ -1030,8 +1039,7 @@ func (p *sklPage) incomingGapVal(it *arenaskl.Iterator, key []byte) cacheValue { prevInitNode(it) // Iterate forwards to key, remembering the last gap value. - prevGapVal, _ := p.scanTo(it, key, 0, cacheValue{}) - return prevGapVal + return p.scanTo(it, key, 0, cacheValue{}, nil) } // scanTo scans from the current iterator position until the key "to". While @@ -1039,16 +1047,24 @@ func (p *sklPage) incomingGapVal(it *arenaskl.Iterator, key []byte) cacheValue { // which is essential to avoiding ratchet inversions (see the comment on // ensureInitialized). // -// The function then returns the maximum value seen along with the gap value at -// the end of the scan. If the iterator is positioned at a key > "to", the -// function will return zero values. The function takes an optional initial gap -// value argument, which is used to initialize the running maximum and gap -// values. When finished, the iterator will be positioned the same as if -// it.Seek(to) had been called. +// The function calls the provided visitor function for each value that it +// encounters, if not nil. The visitor function is called for both key and gap +// values. +// +// The function then returns the gap value at the end of the scan. If the +// iterator is positioned at a key > "to", the function will return a zero +// value. +// +// The function takes an optional initial gap value argument, which is used to +// initialize the running gap value so that it can ratchet uninitialized nodes +// that the scan sees before any initialized nodes. +// +// When finished, the iterator will be positioned the same as if it.Seek(to) had +// been called. func (p *sklPage) scanTo( - it *arenaskl.Iterator, to []byte, opt rangeOptions, initGapVal cacheValue, -) (prevGapVal, maxVal cacheValue) { - prevGapVal, maxVal = initGapVal, initGapVal + it *arenaskl.Iterator, to []byte, opt rangeOptions, initGapVal cacheValue, visit func(cacheValue), +) (prevGapVal cacheValue) { + prevGapVal = initGapVal first := true for { util.RacePreempt() @@ -1087,19 +1103,21 @@ func (p *sklPage) scanTo( } } - if !(first && (opt&excludeFrom) != 0) { + if !(first && (opt&excludeFrom) != 0) && visit != nil { // As long as this isn't the first key and opt says to exclude the - // first key, we ratchet the maxVal. - maxVal, _ = ratchetValue(maxVal, keyVal) + // first key, we call the visitor. + visit(keyVal) } if toCmp == 0 { - // We're on the scan's end key, so return the max value seen. + // We're on the scan's end key, so return. return } - // Ratchet the maxVal by the current gapVal. - maxVal, _ = ratchetValue(maxVal, gapVal) + // Call the visitor with the current gapVal. + if visit != nil { + visit(gapVal) + } // Haven't yet reached the scan's end key, so keep iterating. prevGapVal = gapVal From 1c86227f2ff02e5910d9f3885ed36de57c0a3b73 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 24 Jan 2024 13:55:06 -0500 Subject: [PATCH 3/4] kv/tscache: implement timestamp cache serialization Informs #61986. This commit adds a new `Serialize` function to `tscache.Cache` implementations. This serialization uses the `readsummary/rspb.Segment` representation added in 0950a1e0. Serialization of the production `sklImpl` uses the Segment merging logic added in e4fc6f1e in order to merge together a partial serializations of each individual `sklPage` in the data structure. Timestamp cache serialization will be used to address #61986. Release note: None --- pkg/kv/kvserver/tscache/BUILD.bazel | 3 + pkg/kv/kvserver/tscache/cache.go | 5 + pkg/kv/kvserver/tscache/cache_test.go | 303 +++++++++++++++++++ pkg/kv/kvserver/tscache/interval_skl.go | 159 +++++++++- pkg/kv/kvserver/tscache/interval_skl_test.go | 26 +- pkg/kv/kvserver/tscache/skl_impl.go | 9 + pkg/kv/kvserver/tscache/tree_impl.go | 39 +++ 7 files changed, 535 insertions(+), 9 deletions(-) diff --git a/pkg/kv/kvserver/tscache/BUILD.bazel b/pkg/kv/kvserver/tscache/BUILD.bazel index 27512728cb14..d12435b710fd 100644 --- a/pkg/kv/kvserver/tscache/BUILD.bazel +++ b/pkg/kv/kvserver/tscache/BUILD.bazel @@ -12,6 +12,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/tscache", visibility = ["//visibility:public"], deps = [ + "//pkg/kv/kvserver/readsummary/rspb", "//pkg/roachpb", "//pkg/util", "//pkg/util/cache", @@ -43,12 +44,14 @@ go_test( "//conditions:default": {"Pool": "default"}, }), deps = [ + "//pkg/kv/kvserver/readsummary/rspb", "//pkg/roachpb", "//pkg/testutils", "//pkg/util", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/timeutil", + "//pkg/util/uint128", "//pkg/util/uuid", "@com_github_andy_kimball_arenaskl//:arenaskl", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/kv/kvserver/tscache/cache.go b/pkg/kv/kvserver/tscache/cache.go index 6dd5de098b0e..22eae41ba97c 100644 --- a/pkg/kv/kvserver/tscache/cache.go +++ b/pkg/kv/kvserver/tscache/cache.go @@ -17,6 +17,7 @@ import ( "fmt" "time" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -67,6 +68,10 @@ type Cache interface { // returned for the read timestamps. GetMax(ctx context.Context, start, end roachpb.Key) (hlc.Timestamp, uuid.UUID) + // Serialize returns a serialized representation of the Cache over the + // interval spanning from start to end. + Serialize(ctx context.Context, start, end roachpb.Key) rspb.Segment + // Metrics returns the Cache's metrics struct. Metrics() Metrics diff --git a/pkg/kv/kvserver/tscache/cache_test.go b/pkg/kv/kvserver/tscache/cache_test.go index 4718763df018..be652e752689 100644 --- a/pkg/kv/kvserver/tscache/cache_test.go +++ b/pkg/kv/kvserver/tscache/cache_test.go @@ -19,14 +19,17 @@ import ( "runtime" "testing" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/uint128" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" ) @@ -651,6 +654,306 @@ func identicalAndRatcheted( return firstVal, nil } +func TestTimestampCacheSerialize(t *testing.T) { + defer leaktest.AfterTest(t)() + + forEachCacheImpl(t, func(t *testing.T, tc Cache, _ *hlc.Clock, _ *timeutil.ManualTime) { + lowWater := tc.getLowWater() + + // Verify no read spans before any reads. + seg := tc.Serialize(ctx, roachpb.KeyMin, roachpb.KeyMax) + expSeg := rspb.Segment{LowWater: lowWater} + require.Equal(t, expSeg, seg) + + // Simulate a series of reads and verify the serialized spans. + ts1 := lowWater.Add(50, 1) + ts2 := ts1.Add(10, 1) + txnID1, txnID2 := uuid.MakeV4(), uuid.MakeV4() + tc.Add(ctx, roachpb.Key("a"), nil, ts1, noTxnID) + tc.Add(ctx, roachpb.Key("b"), nil, ts2, noTxnID) + tc.Add(ctx, roachpb.Key("b"), roachpb.Key("d"), ts1, txnID1) + tc.Add(ctx, roachpb.Key("b\x00"), nil, ts2, txnID2) + tc.Add(ctx, roachpb.Key("e"), nil, ts2, txnID1) + tc.Add(ctx, roachpb.Key("f"), roachpb.Key("h"), ts2, txnID2) + + seg = tc.Serialize(ctx, roachpb.KeyMin, roachpb.KeyMax) + expSeg = rspb.Segment{ + LowWater: lowWater, + ReadSpans: []rspb.ReadSpan{ + {Key: roachpb.Key("a"), EndKey: nil, Timestamp: ts1, TxnID: noTxnID}, + {Key: roachpb.Key("b"), EndKey: nil, Timestamp: ts2, TxnID: noTxnID}, + {Key: roachpb.Key("b\x00"), EndKey: nil, Timestamp: ts2, TxnID: txnID2}, + {Key: roachpb.Key("b\x00\x00"), EndKey: roachpb.Key("d"), Timestamp: ts1, TxnID: txnID1}, + {Key: roachpb.Key("e"), EndKey: nil, Timestamp: ts2, TxnID: txnID1}, + {Key: roachpb.Key("f"), EndKey: roachpb.Key("h"), Timestamp: ts2, TxnID: txnID2}, + }, + } + require.Equal(t, expSeg, seg) + + // Test that the cache can be cleared, populated, and re-serialized again. + tc.clear(seg.LowWater) + for _, sp := range seg.ReadSpans { + tc.Add(ctx, sp.Key, sp.EndKey, sp.Timestamp, sp.TxnID) + } + seg2 := tc.Serialize(ctx, roachpb.KeyMin, roachpb.KeyMax) + require.Equal(t, expSeg, seg2) + + // Test serialization over various key ranges. + testCases := []struct { + name string + key, endKey roachpb.Key + expSeg rspb.Segment + }{ + { + name: "half overlap, before", + key: roachpb.KeyMin, + endKey: roachpb.Key("c"), + expSeg: rspb.Segment{ + LowWater: lowWater, + ReadSpans: []rspb.ReadSpan{ + {Key: roachpb.Key("a"), EndKey: nil, Timestamp: ts1, TxnID: noTxnID}, + {Key: roachpb.Key("b"), EndKey: nil, Timestamp: ts2, TxnID: noTxnID}, + {Key: roachpb.Key("b\x00"), EndKey: nil, Timestamp: ts2, TxnID: txnID2}, + {Key: roachpb.Key("b\x00\x00"), EndKey: roachpb.Key("c"), Timestamp: ts1, TxnID: txnID1}, + }, + }, + }, + { + name: "half overlap, after", + key: roachpb.Key("c"), + endKey: roachpb.Key("g"), + expSeg: rspb.Segment{ + LowWater: lowWater, + ReadSpans: []rspb.ReadSpan{ + {Key: roachpb.Key("c"), EndKey: roachpb.Key("d"), Timestamp: ts1, TxnID: txnID1}, + {Key: roachpb.Key("e"), EndKey: nil, Timestamp: ts2, TxnID: txnID1}, + {Key: roachpb.Key("f"), EndKey: roachpb.Key("g"), Timestamp: ts2, TxnID: txnID2}, + }, + }, + }, + { + name: "point, non-nil end key", + key: roachpb.Key("b"), + endKey: roachpb.Key("b\x00"), + expSeg: rspb.Segment{ + LowWater: lowWater, + ReadSpans: []rspb.ReadSpan{ + {Key: roachpb.Key("b"), EndKey: nil, Timestamp: ts2, TxnID: noTxnID}, + }, + }, + }, + { + name: "point, nil end key", + key: roachpb.Key("b"), + endKey: nil, + expSeg: rspb.Segment{ + LowWater: lowWater, + ReadSpans: []rspb.ReadSpan{ + {Key: roachpb.Key("b"), EndKey: nil, Timestamp: ts2, TxnID: noTxnID}, + }, + }, + }, + { + name: "range, start point, non-nil end key", + key: roachpb.Key("f"), + endKey: roachpb.Key("f\x00"), + expSeg: rspb.Segment{ + LowWater: lowWater, + ReadSpans: []rspb.ReadSpan{ + {Key: roachpb.Key("f"), EndKey: roachpb.Key("f\x00"), Timestamp: ts2, TxnID: txnID2}, + }, + }, + }, + { + name: "range, start point, nil end key", + key: roachpb.Key("f"), + endKey: nil, + expSeg: rspb.Segment{ + LowWater: lowWater, + ReadSpans: []rspb.ReadSpan{ + {Key: roachpb.Key("f"), EndKey: roachpb.Key("f\x00"), Timestamp: ts2, TxnID: txnID2}, + }, + }, + }, + { + name: "range, mid point, non-nil end key", + key: roachpb.Key("g"), + endKey: roachpb.Key("g\x00"), + expSeg: rspb.Segment{ + LowWater: lowWater, + ReadSpans: []rspb.ReadSpan{ + {Key: roachpb.Key("g"), EndKey: roachpb.Key("g\x00"), Timestamp: ts2, TxnID: txnID2}, + }, + }, + }, + { + name: "range, mid point, nil end key", + key: roachpb.Key("g"), + endKey: nil, + expSeg: rspb.Segment{ + LowWater: lowWater, + ReadSpans: []rspb.ReadSpan{ + {Key: roachpb.Key("g"), EndKey: roachpb.Key("g\x00"), Timestamp: ts2, TxnID: txnID2}, + }, + }, + }, + { + name: "range, contained", + key: roachpb.Key("c"), + endKey: roachpb.Key("c2"), + expSeg: rspb.Segment{ + LowWater: lowWater, + ReadSpans: []rspb.ReadSpan{ + {Key: roachpb.Key("c"), EndKey: roachpb.Key("c2"), Timestamp: ts1, TxnID: txnID1}, + }, + }, + }, + { + name: "empty, before", + key: roachpb.KeyMin, + endKey: roachpb.Key("a"), + expSeg: rspb.Segment{ + LowWater: lowWater, + ReadSpans: nil, + }, + }, + { + name: "empty, after", + key: roachpb.Key("h"), + endKey: roachpb.KeyMax, + expSeg: rspb.Segment{ + LowWater: lowWater, + ReadSpans: nil, + }, + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + seg := tc.Serialize(ctx, testCase.key, testCase.endKey) + require.Equal(t, testCase.expSeg, seg) + }) + } + }) +} + +func TestTimestampCacheSerializeRoundTrip(t *testing.T) { + defer leaktest.AfterTest(t)() + + rng := rand.New(rand.NewSource(timeutil.Now().UnixNano())) + randTs := func() hlc.Timestamp { + return hlc.Timestamp{WallTime: int64(rng.Intn(1000))} + } + randTxnID := func() uuid.UUID { + return uuid.FromUint128(uint128.FromInts(0, uint64(rng.Intn(3)))) + } + + const iters = 10 + for i := 0; i < iters; i++ { + const slots = 10000 + const maxSpans = 1000 + spans := make([]rspb.ReadSpan, rng.Intn(maxSpans)) + padding := make([]byte, 8<<10) + for i := range spans { + from, _, to := randRange(rng, slots+1) + // Pad to create large keys, to test multiple intervalSkl pages. + from = append(padding, from...) + to = append(padding, to...) + // Randomly omit the end key to create point keys. + if rng.Intn(2) != 0 || bytes.Equal(from, to) { + to = nil + } + spans[i] = rspb.ReadSpan{ + Key: from, + EndKey: to, + Timestamp: randTs(), + TxnID: randTxnID(), + } + } + serFrom, _, serTo := randRange(rng, slots+1) + if bytes.Equal(serFrom, serTo) { + serTo = nil + } + + // Preserve the serialized spans for each Cache implementation so we can + // compare them for equality. + type cacheSeg struct { + name string + seg rspb.Segment + } + var segs []cacheSeg + + forEachCacheImpl(t, func(t *testing.T, tc Cache, _ *hlc.Clock, _ *timeutil.ManualTime) { + // Insert spans into the cache. + for _, sp := range spans { + tc.Add(ctx, sp.Key, sp.EndKey, sp.Timestamp, sp.TxnID) + } + + // Serialize the cache. + seg := tc.Serialize(ctx, serFrom, serTo) + segs = append(segs, cacheSeg{ + name: fmt.Sprintf("%T", tc), + seg: seg.Clone(), + }) + + // Clear the cache and re-populate it from the serialized spans. + tc.clear(seg.LowWater) + for _, sp := range seg.ReadSpans { + tc.Add(ctx, sp.Key, sp.EndKey, sp.Timestamp, sp.TxnID) + } + + // Before comparing, normalize the ends keys of point keys when testing + // the treeImpl. This is necessary because the treeImpl will normalize the + // end key of point keys that were in the cache, but not point keys that + // were created by truncating read spans to the serialize boundaries. This + // is an unimportant implementation detail, so paper over it in testing. + if _, ok := tc.(*treeImpl); ok { + for i, sp := range seg.ReadSpans { + if roachpb.Key(sp.Key).IsPrev(sp.EndKey) { + seg.ReadSpans[i].EndKey = nil + } + } + } + + // Serialize the cache again and verify that the serialized spans are + // identical. + seg2 := tc.Serialize(ctx, serFrom, serTo) + require.Equal(t, seg, seg2) + + // Do the same thing, but this time serialize the entire cache without + // bounds. This should produce the same result. + seg3 := tc.Serialize(ctx, roachpb.KeyMin, roachpb.KeyMax) + require.Equal(t, seg, seg3) + }) + + // Verify that all Cache implementations produce the same serialized spans. + // Before doing so, normalize the segments to replace nil end keys and merge + // adjacent spans to eliminate any allowed implementation differences. + for i := range segs { + seg := &segs[i].seg + var res []rspb.ReadSpan + for _, next := range seg.ReadSpans { + if len(next.EndKey) == 0 { + next.EndKey = roachpb.Key(next.Key).Next() + } + if len(res) == 0 { + res = append(res, next) + continue + } + last := &res[len(res)-1] + if bytes.Equal(last.EndKey, next.Key) && last.Timestamp == next.Timestamp && last.TxnID == next.TxnID { + last.EndKey = next.EndKey + } else { + res = append(res, next) + } + } + seg.ReadSpans = res + } + for i := 1; i < len(segs); i++ { + require.Equal(t, segs[0].seg, segs[i].seg, "%s != %s", segs[0].name, segs[i].name) + } + } +} + func BenchmarkTimestampCacheInsertion(b *testing.B) { clock := hlc.NewClockForTesting(timeutil.NewManualTime(timeutil.Unix(0, 123))) tc := New(clock) diff --git a/pkg/kv/kvserver/tscache/interval_skl.go b/pkg/kv/kvserver/tscache/interval_skl.go index 4efd5cb46775..6a2525941a13 100644 --- a/pkg/kv/kvserver/tscache/interval_skl.go +++ b/pkg/kv/kvserver/tscache/interval_skl.go @@ -20,6 +20,7 @@ import ( "unsafe" "github.com/andy-kimball/arenaskl" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/container/list" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -553,6 +554,41 @@ func (s *intervalSkl) LookupTimestampRange( return val } +// Serialize returns a serialized representation of the intervalSkl over the +// interval spanning from start to end. +func (s *intervalSkl) Serialize(ctx context.Context, from, to []byte) rspb.Segment { + if from == nil && to == nil { + panic("from and to keys cannot be nil") + } + + // Serialize each page into a separate segment under the rotMutex and then + // merge them together outside the rotMutex. + var merged rspb.Segment + for _, seg := range s.serializePages(ctx, from, to) { + merged.Merge(seg) + } + return merged +} + +// serializePages returns a serialized representation of each of the +// intervalSkl's pages over the interval spanning from start to end. +func (s *intervalSkl) serializePages(ctx context.Context, from, to []byte) []rspb.Segment { + // Acquire the rotation mutex read lock so that the page will not be rotated + // while serialize operations are in progress. + s.rotMutex.TracedRLock(ctx) + defer s.rotMutex.RUnlock() + + // Iterate over the pages, serializing each and merging as we go. + segs := make([]rspb.Segment, 0, s.pages.Len()) + for e := s.pages.Front(); e != nil; e = e.Next() { + p := e.Value + seg := p.serialize(from, to) + seg.LowWater = s.floorTS + segs = append(segs, seg) + } + return segs +} + // FloorTS returns the receiver's floor timestamp. func (s *intervalSkl) FloorTS() hlc.Timestamp { s.rotMutex.RLock() @@ -577,15 +613,121 @@ func newSklPage(arena *arenaskl.Arena) *sklPage { // the maximum (initialized or uninitialized) value found. func (p *sklPage) lookupTimestampRange(from, to []byte, opt rangeOptions) cacheValue { var maxVal cacheValue - p.visitRange(from, to, opt, func(val cacheValue) { + p.visitRange(from, to, opt, func(_ []byte, val cacheValue, _ nodeOptions) { maxVal, _ = ratchetValue(maxVal, val) }) return maxVal } +func (p *sklPage) serialize(from, to []byte) rspb.Segment { + var seg rspb.Segment + + // Serializing the page requires a scan over the skiplist while stitching read + // spans back together. This is necessary because the skiplist stores keys and + // gaps separately, but the serialized representation stores them together. To + // support this, we construct read spans across consecutive calls to the + // visitor function and flush them when we encounter their end key. + var lastSpan rspb.ReadSpan + var lastOpts nodeOptions + flushLastSpan := func(endKey []byte) { + lastSpan.EndKey = endKey + seg.AddReadSpan(lastSpan) + lastSpan = rspb.ReadSpan{} + lastOpts = 0 + } + visit := func(key []byte, val cacheValue, opt nodeOptions) { + // Maybe flush the previous read span. + if lastOpts&hasGap != 0 /* lastOpts == hasGap || lastOpts == (hasKey|hasGap) */ { + // Previous read span was a range which ends at this key. Flush it. + if bytes.Equal(lastSpan.Key, key) { + // If the previous read span has the same start key as this key, we're in + // one of two cases: + // 1. there's a bug and the skiplist has two nodes with the same key, or + // 2. we're in a rare case where the immediately preceding key was a gap + // value with no key value, so we hit the Key.Next() case below. In + // such cases, the gap value can be discarded, because it is filling + // an empty space which cannot be represented in a roachpb.Key. + // + // An example where case 2 is possible is: + // 1. Read(["a", "b"), ts100) + // 2. Read(["a", nil), ts200) + // 3. Read(["a\x00", nil), ts200) + // + // In this case, the skiplist will retain a gap value between "a" and + // "a\x00", even when such a gap is effectively empty because it cannot + // be represented in a roachpb.Key. + if lastOpts != hasGap { + panic("unexpected same key as previous read span") + } else { + lastSpan, lastOpts = rspb.ReadSpan{}, 0 + } + } else { + flushLastSpan(key /* endKey */) + } + } else if lastOpts == hasKey { + // Determine whether this is the gap portion of a previous read span, or + // whether this is a new read span. + sameSpan := bytes.Equal(lastSpan.Key, key) && lastSpan.Timestamp == val.ts && lastSpan.TxnID == val.txnID + if sameSpan { + if opt != hasGap { + panic("expected gap value after key value for same read span") + } + // Combine key with gap into same read span. + lastOpts |= opt + return + } else { + // Previous key was a point key, so flush it and start a new read span. + flushLastSpan(nil /* endKey */) + } + } else if lastOpts != 0 { + panic("unexpected") + } + + // Track val as a new read span, if not empty. + if !val.ts.IsEmpty() { + if opt&hasKey == 0 { + // The value is a gap value with no key value. This means that the value + // has an exclusive start key, so we advance the key to the next key. + key = append(key, 0) // Key.Next() + } + lastSpan = rspb.ReadSpan{ + Key: key, + Timestamp: val.ts, + TxnID: val.txnID, + } + lastOpts = opt + } + } + + // Scan over the skiplist. If the visitor is called at all (i.e. if there are + // any overlapping read spans), then the visitor will be called first with + // either a key value (opt=hasKey) or a key+gap value (opt=(hasKey|hasGap)). + // + // We use a rangeOptions of excludeTo, meaning an inclusive start key and an + // exclusive end key. This is all callers of this function need, and it allows + // us to simplify the translation from cacheValues to ReadSpans. + p.visitRange(from, to, excludeTo, visit) + + // Flush the last read span. This will be a no-op if the visitor was never + // previously called because there were no overlapping read spans. + visit(to, cacheValue{}, hasKey) + + return seg +} + +// sklPageVisitor is a visitor function that is called on each key and gap value +// encountered during a scan of an sklPage. The visitor function is called with +// a nodeOptions bitset to indicate whether the node is a key value (hasKey), a +// gap value (hasGap), or both (hasKey|hasGap). +// +// If the key range is empty, the visitor function will not be called. Else the +// visitor function will be called at least once and the first call will always +// be for either a key value (hasKey) or a key+gap value (hasKey|hasGap). +type sklPageVisitor func(key []byte, value cacheValue, opt nodeOptions) + // visitRange scans the range of keys between from and to and calls the visitor // function for each (initialized or uninitialized) value found. -func (p *sklPage) visitRange(from, to []byte, opt rangeOptions, visit func(cacheValue)) { +func (p *sklPage) visitRange(from, to []byte, opt rangeOptions, visit sklPageVisitor) { if to != nil { cmp := 0 if from != nil { @@ -619,7 +761,7 @@ func (p *sklPage) visitRange(from, to []byte, opt rangeOptions, visit func(cache if !it.Valid() { // No more nodes. Visit the previous gap value and return. - visit(prevGapVal) + visit(from, prevGapVal, hasKey|hasGap) return } else if bytes.Equal(it.Key(), from) { // Found a node at from. No need to visit the gap value. @@ -629,7 +771,7 @@ func (p *sklPage) visitRange(from, to []byte, opt rangeOptions, visit func(cache } } else { // No node at from. Visit the gap value and remove the excludeFrom option. - visit(prevGapVal) + visit(from, prevGapVal, hasKey|hasGap) opt &^= excludeFrom } @@ -1062,7 +1204,7 @@ func (p *sklPage) incomingGapVal(it *arenaskl.Iterator, key []byte) cacheValue { // When finished, the iterator will be positioned the same as if it.Seek(to) had // been called. func (p *sklPage) scanTo( - it *arenaskl.Iterator, to []byte, opt rangeOptions, initGapVal cacheValue, visit func(cacheValue), + it *arenaskl.Iterator, to []byte, opt rangeOptions, initGapVal cacheValue, visit sklPageVisitor, ) (prevGapVal cacheValue) { prevGapVal = initGapVal first := true @@ -1074,7 +1216,8 @@ func (p *sklPage) scanTo( return } - toCmp := bytes.Compare(it.Key(), to) + key := it.Key() + toCmp := bytes.Compare(key, to) if to == nil { // to == nil means open range, so toCmp will always be -1. toCmp = -1 @@ -1106,7 +1249,7 @@ func (p *sklPage) scanTo( if !(first && (opt&excludeFrom) != 0) && visit != nil { // As long as this isn't the first key and opt says to exclude the // first key, we call the visitor. - visit(keyVal) + visit(key, keyVal, hasKey) } if toCmp == 0 { @@ -1116,7 +1259,7 @@ func (p *sklPage) scanTo( // Call the visitor with the current gapVal. if visit != nil { - visit(gapVal) + visit(key, gapVal, hasGap) } // Haven't yet reached the scan's end key, so keep iterating. diff --git a/pkg/kv/kvserver/tscache/interval_skl_test.go b/pkg/kv/kvserver/tscache/interval_skl_test.go index ea3a67c663d2..fae8d23eeaa1 100644 --- a/pkg/kv/kvserver/tscache/interval_skl_test.go +++ b/pkg/kv/kvserver/tscache/interval_skl_test.go @@ -1360,6 +1360,29 @@ func BenchmarkIntervalSklAddAndLookup(b *testing.B) { } } +func BenchmarkIntervalSklSerialize(b *testing.B) { + const max = 1000000000 // max size of range + const data = 1000 // number of ranges + const txnID = "123" + + clock := hlc.NewClockForTesting(nil) + s := newIntervalSkl(clock, MinRetentionWindow, makeSklMetrics()) + s.pageSize = maximumSklPageSize + rng := rand.New(rand.NewSource(timeutil.Now().UnixNano())) + + for i := 0; i < data; i++ { + from, to := makeRange(rng.Int31n(max)) + nowVal := makeVal(clock.Now(), txnID) + s.AddRange(ctx, from, to, excludeTo, nowVal) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + // from = "" and to = nil means that we're scanning over all keys. + _ = s.Serialize(ctx, []byte(""), []byte(nil)) + } +} + // makeRange creates a key range from the provided input. The range will start // at the provided key and will have an end key that is a deterministic function // of the provided key. This means that for a given input, the function will @@ -1387,7 +1410,8 @@ func makeRange(start int32) (from, to []byte) { } // randRange creates a random range with keys within the specified number of -// slots. The function also returns a middle key, if one exists. +// slots. The function also returns a middle key, if one exists. from and to +// may be equal, which represents a point key. func randRange(rng *rand.Rand, slots int) (from, middle, to []byte) { fromNum := rng.Intn(slots) toNum := rng.Intn(slots) diff --git a/pkg/kv/kvserver/tscache/skl_impl.go b/pkg/kv/kvserver/tscache/skl_impl.go index 96afda0201f1..951e8ef116b0 100644 --- a/pkg/kv/kvserver/tscache/skl_impl.go +++ b/pkg/kv/kvserver/tscache/skl_impl.go @@ -13,6 +13,7 @@ package tscache import ( "context" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -76,6 +77,14 @@ func (tc *sklImpl) GetMax(ctx context.Context, start, end roachpb.Key) (hlc.Time return val.ts, val.txnID } +// Serialize implements the Cache interface. +func (tc *sklImpl) Serialize(ctx context.Context, start, end roachpb.Key) rspb.Segment { + if len(end) == 0 { + end = start.Next() + } + return tc.cache.Serialize(ctx, nonNil(start), end) +} + // boundKeyLengths makes sure that the key lengths provided are well below the // size of each sklPage, otherwise we'll never be successful in adding it to // an intervalSkl. diff --git a/pkg/kv/kvserver/tscache/tree_impl.go b/pkg/kv/kvserver/tscache/tree_impl.go index 180935d8cb6b..78b59107bce0 100644 --- a/pkg/kv/kvserver/tscache/tree_impl.go +++ b/pkg/kv/kvserver/tscache/tree_impl.go @@ -15,6 +15,7 @@ import ( "fmt" "unsafe" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/cache" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -482,6 +483,44 @@ func (tc *treeImpl) getMax(start, end roachpb.Key) (hlc.Timestamp, uuid.UUID) { return maxTS, maxTxnID } +// Serialize implements the Cache interface. +func (tc *treeImpl) Serialize(_ context.Context, start, end roachpb.Key) rspb.Segment { + tc.Lock() + defer tc.Unlock() + if len(end) == 0 { + end = start.Next() + } + bounds := roachpb.Span{Key: start, EndKey: end} + + var seg rspb.Segment + for _, o := range tc.cache.GetOverlaps(start, end) { + cv := o.Value.(*cacheValue) + ck := o.Key.(*cache.IntervalKey) + startKey := roachpb.Key(ck.Start) + endKey := roachpb.Key(ck.End) + if startKey.IsPrev(endKey) { + // cache.IntervalKey does not follow the roachpb.Span convention of having + // the end key be nil for single keys. Fix that here. Doing so is not + // necessary for correctness (they're equivalent representations), but it + // makes comparing this implementation to sklImpl easier in tests. + endKey = nil + } + span := roachpb.Span{Key: startKey, EndKey: endKey} + // Intersect the bounds with the span to ensure that we don't return spans + // that are outside of the requested range. + intersect := bounds.Intersect(span) + // Construct the read span and add it to the segment. + seg.AddReadSpan(rspb.ReadSpan{ + Key: intersect.Key, + EndKey: intersect.EndKey, + Timestamp: cv.ts, + TxnID: cv.txnID, + }) + } + seg.LowWater = tc.lowWater + return seg +} + // shouldEvict returns true if the cache entry's timestamp is no // longer within the MinRetentionWindow. func (tc *treeImpl) shouldEvict(size int, key, value interface{}) bool { From 49592b71a3fbacb81a1381f8bc8bcb542b25247b Mon Sep 17 00:00:00 2001 From: Faizan Qazi Date: Wed, 31 Jan 2024 14:00:00 -0500 Subject: [PATCH 4/4] catalog/lease: detect if synchronous lease releases are successful Previously, for unit testing, we added support for synchronously releasing leases. If the context was cancelled when releasing a lease synchronously, it was possible for the lease to be erased from memory and not from storage. As a result, reacquisition could hit an error when session-based leasing is enabled. To address this, this patch re-orders operations so that we clear storage first for synchronous lease release, followed by the in-memory copy. Fixes: #118522, #118523, #118521 Release note: None --- pkg/sql/catalog/lease/descriptor_state.go | 11 +++++++++-- pkg/sql/catalog/lease/lease.go | 9 ++++++--- pkg/sql/catalog/lease/storage.go | 8 ++++++-- 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/pkg/sql/catalog/lease/descriptor_state.go b/pkg/sql/catalog/lease/descriptor_state.go index c0a47f386b65..1418e6c0f7e6 100644 --- a/pkg/sql/catalog/lease/descriptor_state.go +++ b/pkg/sql/catalog/lease/descriptor_state.go @@ -292,11 +292,18 @@ func (t *descriptorState) release(ctx context.Context, s *descriptorVersionState t.mu.Lock() defer t.mu.Unlock() if l := maybeMarkRemoveStoredLease(s); l != nil { - t.mu.active.remove(s) + leaseReleased := true + // For testing, we will synchronously release leases, but that + // exposes us to the danger of the context getting cancelled. To + // eliminate this risk, we are going first remove the lease from + // storage and then delete if from mqemory. if t.m.storage.testingKnobs.RemoveOnceDereferenced { - releaseLease(ctx, l, t.m) + leaseReleased = releaseLease(ctx, l, t.m) l = nil } + if leaseReleased { + t.mu.active.remove(s) + } return l } return nil diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 08404e15bd0e..18fe1ea604cb 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -637,7 +637,7 @@ func acquireNodeLease( } // releaseLease deletes an entry from system.lease. -func releaseLease(ctx context.Context, lease *storedLease, m *Manager) { +func releaseLease(ctx context.Context, lease *storedLease, m *Manager) (released bool) { // Force the release to happen synchronously, if we are draining or, when we // force removals for unit tests. This didn't matter with expiration based leases // since each renewal would have a different expiry (but the same version in @@ -647,8 +647,9 @@ func releaseLease(ctx context.Context, lease *storedLease, m *Manager) { // because we release only if a new version exists. if m.IsDraining() || m.removeOnceDereferenced() { // Release synchronously to guarantee release before exiting. - m.storage.release(ctx, m.stopper, lease) - return + // It's possible for the context to get cancelled, so return if + // it was released. + return m.storage.release(ctx, m.stopper, lease) } // Release to the store asynchronously, without the descriptorState lock. @@ -663,6 +664,8 @@ func releaseLease(ctx context.Context, lease *storedLease, m *Manager) { }); err != nil { log.Warningf(ctx, "error: %s, not releasing lease: %q", err, lease) } + // Asynchronous job is releasing it. + return true } // purgeOldVersions removes old unused descriptor versions older than diff --git a/pkg/sql/catalog/lease/storage.go b/pkg/sql/catalog/lease/storage.go index 94fa7eda7416..73975275c22b 100644 --- a/pkg/sql/catalog/lease/storage.go +++ b/pkg/sql/catalog/lease/storage.go @@ -157,6 +157,7 @@ func (s storage) acquire( // written a value to the database, which we'd leak if we did not delete it. // Note that the expiration is part of the primary key in the table, so we // would not overwrite the old entry if we just were to do another insert. + //repeatIteration = desc != nil if (!expiration.IsEmpty() || sessionID != nil) && desc != nil { prevExpirationTS := storedLeaseExpiration(expiration) if err := s.writer.deleteLease(ctx, txn, leaseFields{ @@ -191,7 +192,6 @@ func (s storage) acquire( log.VEventf(ctx, 2, "storage attempting to acquire lease %v@%v", desc, expiration) ts := storedLeaseExpiration(expiration) - var isLeaseRenewal bool var lastLeaseWasWrittenWithSessionID bool // If there was a previous lease then determine if this a renewal and @@ -254,7 +254,9 @@ func (s storage) acquire( // read a descriptor because it can be called while modifying a // descriptor through a schema change before the schema change has committed // that can result in a deadlock. -func (s storage) release(ctx context.Context, stopper *stop.Stopper, lease *storedLease) { +func (s storage) release( + ctx context.Context, stopper *stop.Stopper, lease *storedLease, +) (released bool) { ctx = multitenant.WithTenantCostControlExemption(ctx) retryOptions := base.DefaultRetryOptions() retryOptions.Closer = stopper.ShouldQuiesce() @@ -283,6 +285,7 @@ func (s storage) release(ctx context.Context, stopper *stop.Stopper, lease *stor } continue } + released = true s.outstandingLeases.Dec(1) if s.testingKnobs.LeaseReleasedEvent != nil { s.testingKnobs.LeaseReleasedEvent( @@ -290,6 +293,7 @@ func (s storage) release(ctx context.Context, stopper *stop.Stopper, lease *stor } break } + return released } // Get the descriptor valid for the expiration time from the store.