diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index b8583820a7ae..4c4e37a266e4 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -6432,7 +6432,7 @@ func mvccExportToWriter( firstIteration := true // skipTombstones controls whether we include tombstones. // - // We want tombstones if we are exporting all reivions or if + // We want tombstones if we are exporting all revisions or if // we have a StartTS. A non-empty StartTS is used by // incremental backups and thus needs to see tombstones if // that happens to be the latest value. @@ -6441,7 +6441,10 @@ func mvccExportToWriter( var rows RowCounter // Only used if trackKeyBoundary is true. var curKey roachpb.Key + var resumeKey MVCCKey + var resumeIsCPUOverLimit bool + var rangeKeys MVCCRangeKeyStack var rangeKeysSize int64 @@ -6513,7 +6516,8 @@ func mvccExportToWriter( if isNewKey { resumeKey.Timestamp = hlc.Timestamp{} } - return rows.BulkOpSummary, ExportRequestResumeInfo{ResumeKey: resumeKey, CPUOverlimit: true}, nil + resumeIsCPUOverLimit = true + break } } @@ -6722,7 +6726,7 @@ func mvccExportToWriter( rows.BulkOpSummary.DataSize += rangeKeysSize } - return rows.BulkOpSummary, ExportRequestResumeInfo{ResumeKey: resumeKey}, nil + return rows.BulkOpSummary, ExportRequestResumeInfo{ResumeKey: resumeKey, CPUOverlimit: resumeIsCPUOverLimit}, nil } // MVCCExportOptions contains options for MVCCExportToSST. diff --git a/pkg/storage/mvcc_test.go b/pkg/storage/mvcc_test.go index e74a9afbafc3..64f0ad117e95 100644 --- a/pkg/storage/mvcc_test.go +++ b/pkg/storage/mvcc_test.go @@ -45,6 +45,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble" "github.com/kr/pretty" "github.com/stretchr/testify/require" ) @@ -6141,6 +6142,10 @@ func TestWillOverflow(t *testing.T) { // in which mis-handling of resume spans would cause MVCCExportToSST // to return an empty resume key in cases where the resource limiters // caused an early return of a resume span. +// +// NB: That this test treats the result of MVCCExportToSST _without_ +// CPU rate limiting as the truth. Bugs that affect all exports will +// not be caught by this test. func TestMVCCExportToSSTExhaustedAtStart(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -6152,10 +6157,46 @@ func TestMVCCExportToSSTExhaustedAtStart(t *testing.T) { maxKey = int64(1000) minTimestamp = hlc.Timestamp{WallTime: 100000} maxTimestamp = hlc.Timestamp{WallTime: 200000} + + exportAllQuery = queryLimits{ + minKey: minKey, + maxKey: maxKey, + minTimestamp: minTimestamp, + maxTimestamp: maxTimestamp, + latest: false, + } ) - assertExportEqualWithOptions := func(t *testing.T, ctx context.Context, engine Engine, expectedData []MVCCKey, initialOpts MVCCExportOptions) { - dataIndex := 0 + // When ExportRequest is interrupted by the CPU limiter, the currently + // buffered range key stack will have its EndKey truncated to the resume + // key. To account for this, we write all of the range keys back into a + // store and then export them out again without interruption. + canonicalizeRangeKeys := func(in []MVCCRangeKeyStack) []MVCCRangeKeyStack { + if len(in) == 0 { + return in + } + + engine := createTestPebbleEngine() + defer engine.Close() + for _, keyStack := range in { + for _, version := range keyStack.Versions { + require.NoError(t, engine.PutRawMVCCRangeKey(keyStack.AsRangeKey(version), []byte{})) + } + } + require.NoError(t, engine.Flush()) + keys, rKeys := exportAllData(t, engine, exportAllQuery) + require.Equal(t, 0, len(keys)) + return rKeys + } + + assertExportEqualWithOptions := func(t *testing.T, ctx context.Context, engine Engine, + expectedKeys []MVCCKey, + expectedRangeKeys []MVCCRangeKeyStack, + initialOpts MVCCExportOptions) { + + keysIndex := 0 + rKeysBuf := []MVCCRangeKeyStack{} + startKey := initialOpts.StartKey for len(startKey.Key) > 0 { var sstFile bytes.Buffer @@ -6163,15 +6204,27 @@ func TestMVCCExportToSSTExhaustedAtStart(t *testing.T) { opts.StartKey = startKey _, resumeInfo, err := MVCCExportToSST(ctx, st, engine, opts, &sstFile) require.NoError(t, err) - chunk := sstToKeys(t, sstFile.Bytes()) - require.LessOrEqual(t, len(chunk), len(expectedData)-dataIndex, "remaining test data") - for _, key := range chunk { - require.True(t, key.Equal(expectedData[dataIndex]), "returned key is not equal") - dataIndex++ + + keys, rangeKeys := sstToKeys(t, sstFile.Bytes()) + + require.LessOrEqual(t, len(keys), len(expectedKeys)-keysIndex, "remaining test key data") + + for _, key := range keys { + require.True(t, key.Equal(expectedKeys[keysIndex]), "returned key is not equal") + keysIndex++ } + rKeysBuf = append(rKeysBuf, rangeKeys...) startKey = resumeInfo.ResumeKey } - require.Equal(t, len(expectedData), dataIndex, "not all expected data was consumed") + require.Equal(t, len(expectedKeys), keysIndex, "not all expected keys were consumed") + + actualRangeKeys := canonicalizeRangeKeys(rKeysBuf) + require.Equal(t, len(expectedRangeKeys), len(actualRangeKeys)) + for i, actual := range actualRangeKeys { + expected := expectedRangeKeys[i] + require.True(t, actual.Equal(expected), "range key mismatch %v != %v", actual, expected) + } + } t.Run("elastic CPU limit exhausted", func(t *testing.T) { @@ -6186,13 +6239,7 @@ func TestMVCCExportToSSTExhaustedAtStart(t *testing.T) { tombstoneChance: 0.01, } generateData(t, engine, limits, (limits.maxKey-limits.minKey)*10) - data := exportAllData(t, engine, queryLimits{ - minKey: minKey, - maxKey: maxKey, - minTimestamp: minTimestamp, - maxTimestamp: maxTimestamp, - latest: false, - }) + keys, rKeys := exportAllData(t, engine, exportAllQuery) // Our ElasticCPUWorkHandle will fail on the very first call. As a result, // the very first return from MVCCExportToSST will actually contain no @@ -6205,7 +6252,7 @@ func TestMVCCExportToSSTExhaustedAtStart(t *testing.T) { } return false, 0 })) - assertExportEqualWithOptions(t, ctx, engine, data, MVCCExportOptions{ + assertExportEqualWithOptions(t, ctx, engine, keys, rKeys, MVCCExportOptions{ StartKey: MVCCKey{Key: testKey(limits.minKey), Timestamp: limits.minTimestamp}, EndKey: testKey(limits.maxKey), StartTS: limits.minTimestamp, @@ -6227,26 +6274,50 @@ func TestMVCCExportToSSTExhaustedAtStart(t *testing.T) { tombstoneChance: 0.01, } generateData(t, engine, limits, (limits.maxKey-limits.minKey)*10) - data := exportAllData(t, engine, queryLimits{ - minKey: minKey, - maxKey: maxKey, - minTimestamp: minTimestamp, - maxTimestamp: maxTimestamp, - latest: false, + keys, rKeys := exportAllData(t, engine, exportAllQuery) + + // Our ElasticCPUWorkHandle will always fail. But, we + // should still make progress, one key at a time. + ctx := admission.ContextWithElasticCPUWorkHandle(context.Background(), admission.TestingNewElasticCPUHandleWithCallback(func() (bool, time.Duration) { + return true, 0 + })) + assertExportEqualWithOptions(t, ctx, engine, keys, rKeys, MVCCExportOptions{ + StartKey: MVCCKey{Key: testKey(limits.minKey), Timestamp: limits.minTimestamp}, + EndKey: testKey(limits.maxKey), + StartTS: limits.minTimestamp, + EndTS: limits.maxTimestamp, + ExportAllRevisions: true, }) + }) + t.Run("elastic CPU limit always exhausted with range keys", + func(t *testing.T) { + engine := createTestPebbleEngine() + defer engine.Close() + limits := dataLimits{ + minKey: minKey, + maxKey: maxKey, + minTimestamp: minTimestamp, + maxTimestamp: maxTimestamp, + tombstoneChance: 0.50, + useRangeTombstones: true, + } + // Adding many range keys makes this test much slower, + // so we use 2*keyRange rather than 10*keyRange here. + generateData(t, engine, limits, (limits.maxKey-limits.minKey)*2) + keys, rKeys := exportAllData(t, engine, exportAllQuery) - // Our ElasticCPUWorkHandle will always - // fail. But, we should still make progress, - // one key at a time. + // Our ElasticCPUWorkHandle will always fail. But, we + // should still make progress, one key at a time. ctx := admission.ContextWithElasticCPUWorkHandle(context.Background(), admission.TestingNewElasticCPUHandleWithCallback(func() (bool, time.Duration) { - return false, 0 + return true, 0 })) - assertExportEqualWithOptions(t, ctx, engine, data, MVCCExportOptions{ + assertExportEqualWithOptions(t, ctx, engine, keys, rKeys, MVCCExportOptions{ StartKey: MVCCKey{Key: testKey(limits.minKey), Timestamp: limits.minTimestamp}, EndKey: testKey(limits.maxKey), StartTS: limits.minTimestamp, EndTS: limits.maxTimestamp, ExportAllRevisions: true, + StopMidKey: true, }) }) t.Run("elastic CPU limit exhausted respects StopMidKey", @@ -6299,7 +6370,7 @@ func TestMVCCExportToSSTExhaustedAtStart(t *testing.T) { // revisions or 0 revisions. _, _, err := MVCCExportToSST(ctx, st, engine, opts, &sstFile) require.NoError(t, err) - chunk := sstToKeys(t, sstFile.Bytes()) + chunk, _ := sstToKeys(t, sstFile.Bytes()) require.Equal(t, 6, len(chunk)) // With StopMidKey=true, we can stop in the @@ -6309,7 +6380,7 @@ func TestMVCCExportToSSTExhaustedAtStart(t *testing.T) { opts.StopMidKey = true _, _, err = MVCCExportToSST(ctx, st, engine, opts, &sstFile) require.NoError(t, err) - chunk = sstToKeys(t, sstFile.Bytes()) + chunk, _ = sstToKeys(t, sstFile.Bytes()) // We expect 3 here rather than 2 because the // first iteration never calls the handler. require.Equal(t, 3, len(chunk)) @@ -6330,14 +6401,17 @@ func testKey(id int64) roachpb.Key { } type dataLimits struct { - minKey int64 - maxKey int64 - minTimestamp hlc.Timestamp - maxTimestamp hlc.Timestamp - tombstoneChance float64 + minKey int64 + maxKey int64 + minTimestamp hlc.Timestamp + maxTimestamp hlc.Timestamp + tombstoneChance float64 + useRangeTombstones bool } -func exportAllData(t *testing.T, engine Engine, limits queryLimits) []MVCCKey { +func exportAllData( + t *testing.T, engine Engine, limits queryLimits, +) ([]MVCCKey, []MVCCRangeKeyStack) { st := cluster.MakeTestingClusterSettings() var sstFile bytes.Buffer _, _, err := MVCCExportToSST(context.Background(), st, engine, MVCCExportOptions{ @@ -6351,9 +6425,11 @@ func exportAllData(t *testing.T, engine Engine, limits queryLimits) []MVCCKey { return sstToKeys(t, sstFile.Bytes()) } -func sstToKeys(t *testing.T, data []byte) []MVCCKey { +func sstToKeys(t *testing.T, data []byte) ([]MVCCKey, []MVCCRangeKeyStack) { var results []MVCCKey + var rangeKeyRes []MVCCRangeKeyStack it, err := NewMemSSTIterator(data, false, IterOptions{ + KeyTypes: pebble.IterKeyTypePointsAndRanges, LowerBound: keys.MinKey, UpperBound: keys.MaxKey, }) @@ -6365,26 +6441,47 @@ func sstToKeys(t *testing.T, data []byte) []MVCCKey { if !ok { break } + + if it.RangeKeyChanged() { + hasPoint, hasRange := it.HasPointAndRange() + if hasRange { + rangeKeyRes = append(rangeKeyRes, it.RangeKeys().Clone()) + } + if !hasPoint { + it.Next() + continue + } + } + results = append(results, MVCCKey{ Key: append(roachpb.Key(nil), it.UnsafeKey().Key...), Timestamp: it.UnsafeKey().Timestamp, }) it.Next() } - return results + return results, rangeKeyRes } func generateData(t *testing.T, engine Engine, limits dataLimits, totalEntries int64) { rng := rand.New(rand.NewSource(timeutil.Now().Unix())) for i := int64(0); i < totalEntries; i++ { - key := testKey(limits.minKey + rand.Int63n(limits.maxKey-limits.minKey)) + keyID := limits.minKey + rand.Int63n(limits.maxKey-limits.minKey) + key := testKey(keyID) timestamp := limits.minTimestamp.Add(rand.Int63n(limits.maxTimestamp.WallTime-limits.minTimestamp.WallTime), 0) size := 256 if rng.Float64() < limits.tombstoneChance { size = 0 } - value := MVCCValue{Value: roachpb.MakeValueFromBytes(randutil.RandBytes(rng, size))} - require.NoError(t, engine.PutMVCC(MVCCKey{Key: key, Timestamp: timestamp}, value), "Write data to test storage") + + if limits.useRangeTombstones && size == 0 { + require.NoError(t, engine.PutRawMVCCRangeKey(MVCCRangeKey{ + StartKey: key, + EndKey: testKey(keyID + 2), + Timestamp: timestamp}, []byte{}), "write data to test storage") + } else { + value := MVCCValue{Value: roachpb.MakeValueFromBytes(randutil.RandBytes(rng, size))} + require.NoError(t, engine.PutMVCC(MVCCKey{Key: key, Timestamp: timestamp}, value), "Write data to test storage") + } } require.NoError(t, engine.Flush(), "Flush engine data") }