Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
118299: kv/tscache: implement timestamp cache serialization r=nvanbenschoten a=nvanbenschoten

Informs #61986.

This commit adds a new `Serialize` function to `tscache.Cache` implementations. This serialization uses the `readsummary/rspb.Segment` representation added in 0950a1e. Serialization of the production `sklImpl` uses the Segment merging logic added in e4fc6f1 in order to merge together a partial serializations of each individual `sklPage` in the data structure.

Timestamp cache serialization will be used to address #61986.

Release note: None

118543: catalog/lease: detect if synchronous lease releases are successful r=fqazi a=fqazi

Previously, for unit testing, we added support for synchronously releasing leases. If the context was cancelled when releasing a lease synchronously, it was possible for the lease to be erased from memory and not from storage. As a result, reacquisition could hit an error when session-based leasing is enabled. To address this, this patch re-orders operations so that we clear storage first for synchronous lease release, followed by the in-memory copy.

Fixes: #118522, fixes #118523, fixes #118521, fixes #118550

Release note: None

Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
Co-authored-by: Faizan Qazi <faizan@cockroachlabs.com>
  • Loading branch information
3 people committed Jan 31, 2024
3 parents 51f09ee + 1c86227 + 49592b7 commit d3c3d4e
Show file tree
Hide file tree
Showing 10 changed files with 613 additions and 60 deletions.
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/tscache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/tscache",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver/readsummary/rspb",
"//pkg/roachpb",
"//pkg/util",
"//pkg/util/cache",
Expand Down Expand Up @@ -43,12 +44,14 @@ go_test(
"//conditions:default": {"Pool": "default"},
}),
deps = [
"//pkg/kv/kvserver/readsummary/rspb",
"//pkg/roachpb",
"//pkg/testutils",
"//pkg/util",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/timeutil",
"//pkg/util/uint128",
"//pkg/util/uuid",
"@com_github_andy_kimball_arenaskl//:arenaskl",
"@com_github_cockroachdb_errors//:errors",
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/tscache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -67,6 +68,10 @@ type Cache interface {
// returned for the read timestamps.
GetMax(ctx context.Context, start, end roachpb.Key) (hlc.Timestamp, uuid.UUID)

// Serialize returns a serialized representation of the Cache over the
// interval spanning from start to end.
Serialize(ctx context.Context, start, end roachpb.Key) rspb.Segment

// Metrics returns the Cache's metrics struct.
Metrics() Metrics

Expand Down
303 changes: 303 additions & 0 deletions pkg/kv/kvserver/tscache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@ import (
"runtime"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uint128"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -651,6 +654,306 @@ func identicalAndRatcheted(
return firstVal, nil
}

func TestTimestampCacheSerialize(t *testing.T) {
defer leaktest.AfterTest(t)()

forEachCacheImpl(t, func(t *testing.T, tc Cache, _ *hlc.Clock, _ *timeutil.ManualTime) {
lowWater := tc.getLowWater()

// Verify no read spans before any reads.
seg := tc.Serialize(ctx, roachpb.KeyMin, roachpb.KeyMax)
expSeg := rspb.Segment{LowWater: lowWater}
require.Equal(t, expSeg, seg)

// Simulate a series of reads and verify the serialized spans.
ts1 := lowWater.Add(50, 1)
ts2 := ts1.Add(10, 1)
txnID1, txnID2 := uuid.MakeV4(), uuid.MakeV4()
tc.Add(ctx, roachpb.Key("a"), nil, ts1, noTxnID)
tc.Add(ctx, roachpb.Key("b"), nil, ts2, noTxnID)
tc.Add(ctx, roachpb.Key("b"), roachpb.Key("d"), ts1, txnID1)
tc.Add(ctx, roachpb.Key("b\x00"), nil, ts2, txnID2)
tc.Add(ctx, roachpb.Key("e"), nil, ts2, txnID1)
tc.Add(ctx, roachpb.Key("f"), roachpb.Key("h"), ts2, txnID2)

seg = tc.Serialize(ctx, roachpb.KeyMin, roachpb.KeyMax)
expSeg = rspb.Segment{
LowWater: lowWater,
ReadSpans: []rspb.ReadSpan{
{Key: roachpb.Key("a"), EndKey: nil, Timestamp: ts1, TxnID: noTxnID},
{Key: roachpb.Key("b"), EndKey: nil, Timestamp: ts2, TxnID: noTxnID},
{Key: roachpb.Key("b\x00"), EndKey: nil, Timestamp: ts2, TxnID: txnID2},
{Key: roachpb.Key("b\x00\x00"), EndKey: roachpb.Key("d"), Timestamp: ts1, TxnID: txnID1},
{Key: roachpb.Key("e"), EndKey: nil, Timestamp: ts2, TxnID: txnID1},
{Key: roachpb.Key("f"), EndKey: roachpb.Key("h"), Timestamp: ts2, TxnID: txnID2},
},
}
require.Equal(t, expSeg, seg)

// Test that the cache can be cleared, populated, and re-serialized again.
tc.clear(seg.LowWater)
for _, sp := range seg.ReadSpans {
tc.Add(ctx, sp.Key, sp.EndKey, sp.Timestamp, sp.TxnID)
}
seg2 := tc.Serialize(ctx, roachpb.KeyMin, roachpb.KeyMax)
require.Equal(t, expSeg, seg2)

// Test serialization over various key ranges.
testCases := []struct {
name string
key, endKey roachpb.Key
expSeg rspb.Segment
}{
{
name: "half overlap, before",
key: roachpb.KeyMin,
endKey: roachpb.Key("c"),
expSeg: rspb.Segment{
LowWater: lowWater,
ReadSpans: []rspb.ReadSpan{
{Key: roachpb.Key("a"), EndKey: nil, Timestamp: ts1, TxnID: noTxnID},
{Key: roachpb.Key("b"), EndKey: nil, Timestamp: ts2, TxnID: noTxnID},
{Key: roachpb.Key("b\x00"), EndKey: nil, Timestamp: ts2, TxnID: txnID2},
{Key: roachpb.Key("b\x00\x00"), EndKey: roachpb.Key("c"), Timestamp: ts1, TxnID: txnID1},
},
},
},
{
name: "half overlap, after",
key: roachpb.Key("c"),
endKey: roachpb.Key("g"),
expSeg: rspb.Segment{
LowWater: lowWater,
ReadSpans: []rspb.ReadSpan{
{Key: roachpb.Key("c"), EndKey: roachpb.Key("d"), Timestamp: ts1, TxnID: txnID1},
{Key: roachpb.Key("e"), EndKey: nil, Timestamp: ts2, TxnID: txnID1},
{Key: roachpb.Key("f"), EndKey: roachpb.Key("g"), Timestamp: ts2, TxnID: txnID2},
},
},
},
{
name: "point, non-nil end key",
key: roachpb.Key("b"),
endKey: roachpb.Key("b\x00"),
expSeg: rspb.Segment{
LowWater: lowWater,
ReadSpans: []rspb.ReadSpan{
{Key: roachpb.Key("b"), EndKey: nil, Timestamp: ts2, TxnID: noTxnID},
},
},
},
{
name: "point, nil end key",
key: roachpb.Key("b"),
endKey: nil,
expSeg: rspb.Segment{
LowWater: lowWater,
ReadSpans: []rspb.ReadSpan{
{Key: roachpb.Key("b"), EndKey: nil, Timestamp: ts2, TxnID: noTxnID},
},
},
},
{
name: "range, start point, non-nil end key",
key: roachpb.Key("f"),
endKey: roachpb.Key("f\x00"),
expSeg: rspb.Segment{
LowWater: lowWater,
ReadSpans: []rspb.ReadSpan{
{Key: roachpb.Key("f"), EndKey: roachpb.Key("f\x00"), Timestamp: ts2, TxnID: txnID2},
},
},
},
{
name: "range, start point, nil end key",
key: roachpb.Key("f"),
endKey: nil,
expSeg: rspb.Segment{
LowWater: lowWater,
ReadSpans: []rspb.ReadSpan{
{Key: roachpb.Key("f"), EndKey: roachpb.Key("f\x00"), Timestamp: ts2, TxnID: txnID2},
},
},
},
{
name: "range, mid point, non-nil end key",
key: roachpb.Key("g"),
endKey: roachpb.Key("g\x00"),
expSeg: rspb.Segment{
LowWater: lowWater,
ReadSpans: []rspb.ReadSpan{
{Key: roachpb.Key("g"), EndKey: roachpb.Key("g\x00"), Timestamp: ts2, TxnID: txnID2},
},
},
},
{
name: "range, mid point, nil end key",
key: roachpb.Key("g"),
endKey: nil,
expSeg: rspb.Segment{
LowWater: lowWater,
ReadSpans: []rspb.ReadSpan{
{Key: roachpb.Key("g"), EndKey: roachpb.Key("g\x00"), Timestamp: ts2, TxnID: txnID2},
},
},
},
{
name: "range, contained",
key: roachpb.Key("c"),
endKey: roachpb.Key("c2"),
expSeg: rspb.Segment{
LowWater: lowWater,
ReadSpans: []rspb.ReadSpan{
{Key: roachpb.Key("c"), EndKey: roachpb.Key("c2"), Timestamp: ts1, TxnID: txnID1},
},
},
},
{
name: "empty, before",
key: roachpb.KeyMin,
endKey: roachpb.Key("a"),
expSeg: rspb.Segment{
LowWater: lowWater,
ReadSpans: nil,
},
},
{
name: "empty, after",
key: roachpb.Key("h"),
endKey: roachpb.KeyMax,
expSeg: rspb.Segment{
LowWater: lowWater,
ReadSpans: nil,
},
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
seg := tc.Serialize(ctx, testCase.key, testCase.endKey)
require.Equal(t, testCase.expSeg, seg)
})
}
})
}

func TestTimestampCacheSerializeRoundTrip(t *testing.T) {
defer leaktest.AfterTest(t)()

rng := rand.New(rand.NewSource(timeutil.Now().UnixNano()))
randTs := func() hlc.Timestamp {
return hlc.Timestamp{WallTime: int64(rng.Intn(1000))}
}
randTxnID := func() uuid.UUID {
return uuid.FromUint128(uint128.FromInts(0, uint64(rng.Intn(3))))
}

const iters = 10
for i := 0; i < iters; i++ {
const slots = 10000
const maxSpans = 1000
spans := make([]rspb.ReadSpan, rng.Intn(maxSpans))
padding := make([]byte, 8<<10)
for i := range spans {
from, _, to := randRange(rng, slots+1)
// Pad to create large keys, to test multiple intervalSkl pages.
from = append(padding, from...)
to = append(padding, to...)
// Randomly omit the end key to create point keys.
if rng.Intn(2) != 0 || bytes.Equal(from, to) {
to = nil
}
spans[i] = rspb.ReadSpan{
Key: from,
EndKey: to,
Timestamp: randTs(),
TxnID: randTxnID(),
}
}
serFrom, _, serTo := randRange(rng, slots+1)
if bytes.Equal(serFrom, serTo) {
serTo = nil
}

// Preserve the serialized spans for each Cache implementation so we can
// compare them for equality.
type cacheSeg struct {
name string
seg rspb.Segment
}
var segs []cacheSeg

forEachCacheImpl(t, func(t *testing.T, tc Cache, _ *hlc.Clock, _ *timeutil.ManualTime) {
// Insert spans into the cache.
for _, sp := range spans {
tc.Add(ctx, sp.Key, sp.EndKey, sp.Timestamp, sp.TxnID)
}

// Serialize the cache.
seg := tc.Serialize(ctx, serFrom, serTo)
segs = append(segs, cacheSeg{
name: fmt.Sprintf("%T", tc),
seg: seg.Clone(),
})

// Clear the cache and re-populate it from the serialized spans.
tc.clear(seg.LowWater)
for _, sp := range seg.ReadSpans {
tc.Add(ctx, sp.Key, sp.EndKey, sp.Timestamp, sp.TxnID)
}

// Before comparing, normalize the ends keys of point keys when testing
// the treeImpl. This is necessary because the treeImpl will normalize the
// end key of point keys that were in the cache, but not point keys that
// were created by truncating read spans to the serialize boundaries. This
// is an unimportant implementation detail, so paper over it in testing.
if _, ok := tc.(*treeImpl); ok {
for i, sp := range seg.ReadSpans {
if roachpb.Key(sp.Key).IsPrev(sp.EndKey) {
seg.ReadSpans[i].EndKey = nil
}
}
}

// Serialize the cache again and verify that the serialized spans are
// identical.
seg2 := tc.Serialize(ctx, serFrom, serTo)
require.Equal(t, seg, seg2)

// Do the same thing, but this time serialize the entire cache without
// bounds. This should produce the same result.
seg3 := tc.Serialize(ctx, roachpb.KeyMin, roachpb.KeyMax)
require.Equal(t, seg, seg3)
})

// Verify that all Cache implementations produce the same serialized spans.
// Before doing so, normalize the segments to replace nil end keys and merge
// adjacent spans to eliminate any allowed implementation differences.
for i := range segs {
seg := &segs[i].seg
var res []rspb.ReadSpan
for _, next := range seg.ReadSpans {
if len(next.EndKey) == 0 {
next.EndKey = roachpb.Key(next.Key).Next()
}
if len(res) == 0 {
res = append(res, next)
continue
}
last := &res[len(res)-1]
if bytes.Equal(last.EndKey, next.Key) && last.Timestamp == next.Timestamp && last.TxnID == next.TxnID {
last.EndKey = next.EndKey
} else {
res = append(res, next)
}
}
seg.ReadSpans = res
}
for i := 1; i < len(segs); i++ {
require.Equal(t, segs[0].seg, segs[i].seg, "%s != %s", segs[0].name, segs[i].name)
}
}
}

func BenchmarkTimestampCacheInsertion(b *testing.B) {
clock := hlc.NewClockForTesting(timeutil.NewManualTime(timeutil.Unix(0, 123)))
tc := New(clock)
Expand Down
Loading

0 comments on commit d3c3d4e

Please sign in to comment.