Skip to content

Commit

Permalink
storage: teach MVCCFindSplitKey about tenant prefixes
Browse files Browse the repository at this point in the history
Informs cockroachdb#48123.

`keys.EnsureSafeSplitKey` itself already knew about tenant prefixes, but
`storage.MVCCFindSplitKey` had a bit of extra logic that assumed the system
tenant. This commit addresses this and removes the only use of `TODOSQLCodec`
in `pkg/storage` while doing so.
  • Loading branch information
nvanbenschoten committed Jun 5, 2020
1 parent de038a3 commit a4cd3ad
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 69 deletions.
23 changes: 13 additions & 10 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3334,17 +3334,20 @@ func MVCCFindSplitKey(
return nil, nil
}
var minSplitKey roachpb.Key
if _, _, err := keys.TODOSQLCodec.DecodeTablePrefix(it.UnsafeKey().Key); err == nil {
// The first key in this range represents a row in a SQL table. Advance the
// minSplitKey past this row to avoid the problems described above.
firstRowKey, err := keys.EnsureSafeSplitKey(it.Key().Key)
if err != nil {
return nil, err
if _, tenID, err := keys.DecodeTenantPrefix(it.UnsafeKey().Key); err == nil {
if _, _, err := keys.MakeSQLCodec(tenID).DecodeTablePrefix(it.UnsafeKey().Key); err == nil {
// The first key in this range represents a row in a SQL table. Advance the
// minSplitKey past this row to avoid the problems described above.
firstRowKey, err := keys.EnsureSafeSplitKey(it.Key().Key)
if err != nil {
return nil, err
}
// Allow a split key before other rows in the same table or before any
// rows in interleaved tables.
minSplitKey = encoding.EncodeInterleavedSentinel(firstRowKey)
}
// Allow a split key before other rows in the same table or before any
// rows in interleaved tables.
minSplitKey = encoding.EncodeInterleavedSentinel(firstRowKey)
} else {
}
if minSplitKey == nil {
// The first key in the range does not represent a row in a SQL table.
// Allow a split at any key that sorts after it.
minSplitKey = it.Key().Key.Next()
Expand Down
157 changes: 98 additions & 59 deletions pkg/storage/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4309,21 +4309,42 @@ func TestFindValidSplitKeys(t *testing.T) {
return encoding.EncodeInterleavedSentinel(rowKey)
}

testCases := []struct {
type testCase struct {
keys []roachpb.Key
rangeStart roachpb.Key // optional
expSplit roachpb.Key
expError bool
}{
skipTenant bool // if true, skip tenant subtests
}
prefixTestKeys := func(test testCase, prefix roachpb.Key) testCase {
// testCase.keys
oldKeys := test.keys
test.keys = make([]roachpb.Key, len(oldKeys))
for i, key := range oldKeys {
test.keys[i] = append(prefix, key...)
}
// testCase.rangeStart
if test.rangeStart != nil {
test.rangeStart = append(prefix, test.rangeStart...)
}
// testCase.expSplit
if test.expSplit != nil {
test.expSplit = append(prefix, test.expSplit...)
}
return test
}

testCases := []testCase{
// All m1 cannot be split.
{
keys: []roachpb.Key{
roachpb.Key("\x02"),
roachpb.Key("\x02\x00"),
roachpb.Key("\x02\xff"),
},
expSplit: nil,
expError: false,
expSplit: nil,
expError: false,
skipTenant: true,
},
// All system span cannot be split.
{
Expand All @@ -4334,6 +4355,7 @@ func TestFindValidSplitKeys(t *testing.T) {
rangeStart: keys.SystemSQLCodec.TablePrefix(1),
expSplit: nil,
expError: false,
skipTenant: true,
},
// Between meta1 and meta2, splits at meta2.
{
Expand All @@ -4345,8 +4367,9 @@ func TestFindValidSplitKeys(t *testing.T) {
roachpb.Key("\x03\x00"),
roachpb.Key("\x03\xff"),
},
expSplit: roachpb.Key("\x03"),
expError: false,
expSplit: roachpb.Key("\x03"),
expError: false,
skipTenant: true,
},
// Even lopsided, always split at meta2.
{
Expand All @@ -4356,8 +4379,9 @@ func TestFindValidSplitKeys(t *testing.T) {
roachpb.Key("\x02\xff"),
roachpb.Key("\x03"),
},
expSplit: roachpb.Key("\x03"),
expError: false,
expSplit: roachpb.Key("\x03"),
expError: false,
skipTenant: true,
},
// Between meta2Max and metaMax, splits at metaMax.
{
Expand All @@ -4367,8 +4391,9 @@ func TestFindValidSplitKeys(t *testing.T) {
roachpb.Key("\x04"),
roachpb.Key("\x04\xff\xff\x88"),
},
expSplit: roachpb.Key("\x04"),
expError: false,
expSplit: roachpb.Key("\x04"),
expError: false,
skipTenant: true,
},
// Even lopsided, always split at metaMax.
{
Expand All @@ -4379,8 +4404,9 @@ func TestFindValidSplitKeys(t *testing.T) {
roachpb.Key("\x03\xff\xff\xee"),
roachpb.Key("\x04"),
},
expSplit: roachpb.Key("\x04"),
expError: false,
expSplit: roachpb.Key("\x04"),
expError: false,
skipTenant: true,
},
// Lopsided, truncate non-zone prefix.
{
Expand All @@ -4390,8 +4416,9 @@ func TestFindValidSplitKeys(t *testing.T) {
roachpb.Key("\x04zone\x00"),
roachpb.Key("\x04zone\xff"),
},
expSplit: roachpb.Key("\x04zone\x00"),
expError: false,
expSplit: roachpb.Key("\x04zone\x00"),
expError: false,
skipTenant: true,
},
// Lopsided, truncate non-zone suffix.
{
Expand All @@ -4401,8 +4428,9 @@ func TestFindValidSplitKeys(t *testing.T) {
roachpb.Key("\x04zone\xff"),
roachpb.Key("\x04zonf"),
},
expSplit: roachpb.Key("\x04zone\xff"),
expError: false,
expSplit: roachpb.Key("\x04zone\xff"),
expError: false,
skipTenant: true,
},
// A Range for which MVCCSplitKey would return the start key isn't fair
// game, even if the StartKey actually exists. There was once a bug
Expand Down Expand Up @@ -4583,53 +4611,64 @@ func TestFindValidSplitKeys(t *testing.T) {

for _, engineImpl := range mvccEngineImpls {
t.Run(engineImpl.name, func(t *testing.T) {
for i, test := range testCases {
t.Run("", func(t *testing.T) {
ctx := context.Background()
engine := engineImpl.create()
defer engine.Close()
testutils.RunTrueAndFalse(t, "tenant", func(t *testing.T, tenant bool) {
for i, test := range testCases {
t.Run("", func(t *testing.T) {
if tenant {
if test.skipTenant {
t.Skip("")
}
// Update all keys to include a tenant prefix.
tenPrefix := keys.MakeSQLCodec(roachpb.MinTenantID).TenantPrefix()
test = prefixTestKeys(test, tenPrefix)
}

ms := &enginepb.MVCCStats{}
val := roachpb.MakeValueFromString(strings.Repeat("X", 10))
for _, k := range test.keys {
// Add three MVCC versions of every key. Splits are not allowed
// between MVCC versions, so this shouldn't have any effect.
for j := 1; j <= 3; j++ {
ts := hlc.Timestamp{Logical: int32(j)}
if err := MVCCPut(ctx, engine, ms, []byte(k), ts, val, nil); err != nil {
t.Fatal(err)
ctx := context.Background()
engine := engineImpl.create()
defer engine.Close()

ms := &enginepb.MVCCStats{}
val := roachpb.MakeValueFromString(strings.Repeat("X", 10))
for _, k := range test.keys {
// Add three MVCC versions of every key. Splits are not allowed
// between MVCC versions, so this shouldn't have any effect.
for j := 1; j <= 3; j++ {
ts := hlc.Timestamp{Logical: int32(j)}
if err := MVCCPut(ctx, engine, ms, []byte(k), ts, val, nil); err != nil {
t.Fatal(err)
}
}
}
}
rangeStart := test.keys[0]
if len(test.rangeStart) > 0 {
rangeStart = test.rangeStart
}
rangeEnd := test.keys[len(test.keys)-1].Next()
rangeStartAddr, err := keys.Addr(rangeStart)
if err != nil {
t.Fatal(err)
}
rangeEndAddr, err := keys.Addr(rangeEnd)
if err != nil {
t.Fatal(err)
}
targetSize := (ms.KeyBytes + ms.ValBytes) / 2
splitKey, err := MVCCFindSplitKey(ctx, engine, rangeStartAddr, rangeEndAddr, targetSize)
if test.expError {
if !testutils.IsError(err, "has no valid splits") {
t.Fatalf("%d: unexpected error: %+v", i, err)
rangeStart := test.keys[0]
if len(test.rangeStart) > 0 {
rangeStart = test.rangeStart
}
return
}
if err != nil {
t.Fatalf("%d; unexpected error: %+v", i, err)
}
if !splitKey.Equal(test.expSplit) {
t.Errorf("%d: expected split key %q; got %q", i, test.expSplit, splitKey)
}
})
}
rangeEnd := test.keys[len(test.keys)-1].Next()
rangeStartAddr, err := keys.Addr(rangeStart)
if err != nil {
t.Fatal(err)
}
rangeEndAddr, err := keys.Addr(rangeEnd)
if err != nil {
t.Fatal(err)
}
targetSize := (ms.KeyBytes + ms.ValBytes) / 2
splitKey, err := MVCCFindSplitKey(ctx, engine, rangeStartAddr, rangeEndAddr, targetSize)
if test.expError {
if !testutils.IsError(err, "has no valid splits") {
t.Fatalf("%d: unexpected error: %+v", i, err)
}
return
}
if err != nil {
t.Fatalf("%d; unexpected error: %+v", i, err)
}
if !splitKey.Equal(test.expSplit) {
t.Errorf("%d: expected split key %q; got %q", i, test.expSplit, splitKey)
}
})
}
})
})
}
}
Expand Down

0 comments on commit a4cd3ad

Please sign in to comment.