From 93daeb49eba367c40361686cf637be05b04bb1f0 Mon Sep 17 00:00:00 2001 From: Bilal Akhtar Date: Wed, 31 Jul 2024 14:18:13 -0400 Subject: [PATCH] kvserver: split snapshot SSTables for mvcc keys into multiple SSTs Previously, we'd only create one sstable for all mvcc keys in a range when ingesting a rebalance/recovery snapshot into Pebble. This increased write-amp in Pebble as more sstables would have to be compacted into it (or the sstable then split into smaller ones in Pebble), and had other consequences such as massive filter blocks in the large singular sstable. This change adds a new cluster setting, kv.snapshot_rebalance.max_sst_size, that sets the max size of the sstables containing user/mvcc keys in a range. If an sstable exceeds this size in multiSSTWriter, we roll over that sstable and create a new one. Epic: CRDB-8471 Fixes: #67284 Release note (performance improvement): Reduce the write-amplification impact of rebalances by splitting snapshot sstable files into smaller ones before ingesting them into Pebble. --- pkg/kv/kvserver/BUILD.bazel | 1 + .../replica_sst_snapshot_storage_test.go | 127 ++++++- pkg/kv/kvserver/store_snapshot.go | 314 ++++++++++++++---- pkg/storage/engine_key.go | 5 + pkg/storage/sst_writer.go | 11 + 5 files changed, 391 insertions(+), 67 deletions(-) diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index df55799bb088..eba3c7f2db93 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -527,6 +527,7 @@ go_test( "@com_github_cockroachdb_errors//oserror", "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_pebble//:pebble", + "@com_github_cockroachdb_pebble//sstable", "@com_github_cockroachdb_pebble//vfs", "@com_github_cockroachdb_redact//:redact", "@com_github_dustin_go_humanize//:go-humanize", diff --git a/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go b/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go index 0ded1aa2fc51..6e13695a7e98 100644 --- a/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go +++ b/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go @@ -31,6 +31,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/errors/oserror" + "github.com/cockroachdb/pebble/sstable" + "github.com/cockroachdb/pebble/vfs" "github.com/stretchr/testify/require" "golang.org/x/time/rate" ) @@ -276,10 +278,12 @@ func TestMultiSSTWriterInitSST(t *testing.T) { EndKey: roachpb.RKeyMax, } keySpans := rditer.MakeReplicatedKeySpans(&desc) + localSpans := keySpans[:len(keySpans)-1] + mvccSpan := keySpans[len(keySpans)-1] msstw, err := newMultiSSTWriter( - ctx, cluster.MakeTestingClusterSettings(), scratch, keySpans, 0, - false, /* skipRangeDelForLastSpan */ + ctx, cluster.MakeTestingClusterSettings(), scratch, localSpans, mvccSpan, 0, + false, /* skipRangeDelForMVCCSpan */ ) require.NoError(t, err) _, err = msstw.Finish(ctx) @@ -315,6 +319,117 @@ func TestMultiSSTWriterInitSST(t *testing.T) { } } +func buildIterForScratch( + t *testing.T, keySpans []roachpb.Span, scratch *SSTSnapshotStorageScratch, +) (storage.MVCCIterator, error) { + var openFiles []sstable.ReadableFile + for _, sstPath := range scratch.SSTs()[len(keySpans)-1:] { + f, err := vfs.Default.Open(sstPath) + require.NoError(t, err) + openFiles = append(openFiles, f) + } + mvccSpan := keySpans[len(keySpans)-1] + + return storage.NewSSTIterator([][]sstable.ReadableFile{openFiles}, storage.IterOptions{ + LowerBound: mvccSpan.Key, + UpperBound: mvccSpan.EndKey, + }) +} + +// TestMultiSSTWriterSize tests the effect of lowering the max size +// of sstables in a multiSSTWriter, and ensuring that the produced sstables +// are still correct. +func TestMultiSSTWriterSize(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + testRangeID := roachpb.RangeID(1) + testSnapUUID := uuid.Must(uuid.FromBytes([]byte("foobar1234567890"))) + testLimiter := rate.NewLimiter(rate.Inf, 0) + + cleanup, eng := newOnDiskEngine(ctx, t) + defer cleanup() + defer eng.Close() + + sstSnapshotStorage := NewSSTSnapshotStorage(eng, testLimiter) + ref := sstSnapshotStorage.NewScratchSpace(testRangeID, testSnapUUID) + scratch := sstSnapshotStorage.NewScratchSpace(testRangeID, testSnapUUID) + settings := cluster.MakeTestingClusterSettings() + + desc := roachpb.RangeDescriptor{ + StartKey: roachpb.RKey("d"), + EndKey: roachpb.RKeyMax, + } + keySpans := rditer.MakeReplicatedKeySpans(&desc) + localSpans := keySpans[:len(keySpans)-1] + mvccSpan := keySpans[len(keySpans)-1] + + // Make a reference msstw with the default size. + referenceMsstw, err := newMultiSSTWriter(ctx, settings, ref, localSpans, mvccSpan, 0, false) + require.NoError(t, err) + require.Equal(t, int64(0), referenceMsstw.dataSize) + + for i := range localSpans { + require.NoError(t, referenceMsstw.Put(ctx, storage.EngineKey{Key: localSpans[i].Key}, []byte("foo"))) + } + + for i := 0; i < 100; i++ { + require.NoError(t, referenceMsstw.Put(ctx, storage.EngineKey{Key: roachpb.Key(append(desc.StartKey, byte(i)))}, []byte("foobarbaz"))) + } + _, err = referenceMsstw.Finish(ctx) + require.NoError(t, err) + + refIter, err := buildIterForScratch(t, keySpans, ref) + require.NoError(t, err) + defer refIter.Close() + + MaxSnapshotSSTableSize.Override(ctx, &settings.SV, 100) + + multiSSTWriter, err := newMultiSSTWriter(ctx, settings, scratch, localSpans, mvccSpan, 0, false) + require.NoError(t, err) + require.Equal(t, int64(0), multiSSTWriter.dataSize) + + for i := range localSpans { + require.NoError(t, multiSSTWriter.Put(ctx, storage.EngineKey{Key: localSpans[i].Key}, []byte("foo"))) + } + + for i := 0; i < 100; i++ { + require.NoError(t, multiSSTWriter.Put(ctx, storage.EngineKey{Key: roachpb.Key(append(desc.StartKey, byte(i)))}, []byte("foobarbaz"))) + } + + _, err = multiSSTWriter.Finish(ctx) + require.NoError(t, err) + require.Greater(t, len(scratch.SSTs()), len(ref.SSTs())) + + iter, err := buildIterForScratch(t, keySpans, scratch) + require.NoError(t, err) + defer iter.Close() + + iter.SeekGE(storage.MVCCKey{Key: mvccSpan.Key}) + refIter.SeekGE(storage.MVCCKey{Key: mvccSpan.Key}) + valid, err := iter.Valid() + valid2, err2 := refIter.Valid() + require.NoError(t, err) + require.NoError(t, err2) + + for valid && valid2 { + + require.Equal(t, iter.UnsafeKey(), refIter.UnsafeKey()) + val, err := iter.UnsafeValue() + require.NoError(t, err) + val2, err2 := refIter.UnsafeValue() + require.NoError(t, err2) + require.Equal(t, val, val2) + iter.Next() + refIter.Next() + valid, err = iter.Valid() + valid2, err2 = refIter.Valid() + require.NoError(t, err) + require.NoError(t, err2) + } + require.Equal(t, valid, valid2) +} + // TestMultiSSTWriterAddLastSpan tests that multiSSTWriter initializes each of // the SST files associated with the replicated key ranges by writing a range // deletion tombstone that spans the entire range of each respectively, except @@ -342,14 +457,16 @@ func TestMultiSSTWriterAddLastSpan(t *testing.T) { EndKey: roachpb.RKeyMax, } keySpans := rditer.MakeReplicatedKeySpans(&desc) + localSpans := keySpans[:len(keySpans)-1] + mvccSpan := keySpans[len(keySpans)-1] msstw, err := newMultiSSTWriter( - ctx, cluster.MakeTestingClusterSettings(), scratch, keySpans, 0, - true, /* skipRangeDelForLastSpan */ + ctx, cluster.MakeTestingClusterSettings(), scratch, localSpans, mvccSpan, 0, + true, /* skipRangeDelForMVCCSpan */ ) require.NoError(t, err) if addRangeDel { - require.NoError(t, msstw.addRangeDelForLastSpan()) + require.NoError(t, msstw.addClearForMVCCSpan()) } testKey := storage.MVCCKey{Key: roachpb.RKey("d1").AsRawKey(), Timestamp: hlc.Timestamp{WallTime: 1}} testEngineKey, _ := storage.DecodeEngineKey(storage.EncodeMVCCKey(testKey)) diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 997132b4411f..01a34a952033 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -11,7 +11,9 @@ package kvserver import ( + "bytes" "context" + "fmt" "io" "time" @@ -62,6 +64,16 @@ const ( tagSnapshotTiming = "snapshot_timing_tag" ) +// MaxSnapshotSSTableSize is the maximum size of an sstable containing MVCC/user keys +// in a snapshot before we truncate and write a new snapshot sstable. +var MaxSnapshotSSTableSize = settings.RegisterByteSizeSetting( + settings.SystemOnly, + "kv.snapshot_rebalance.max_sst_size", + "maximum size of a rebalance or recovery SST size", + 128<<20, // 128 MB + settings.PositiveInt, +) + // snapshotMetrics contains metrics on the number and size of snapshots in // progress or in the snapshot queue. type snapshotMetrics struct { @@ -115,10 +127,23 @@ type kvBatchSnapshotStrategy struct { // multiSSTWriter is a wrapper around an SSTWriter and SSTSnapshotStorageScratch // that handles chunking SSTs and persisting them to disk. type multiSSTWriter struct { - st *cluster.Settings - scratch *SSTSnapshotStorageScratch - currSST storage.SSTWriter - keySpans []roachpb.Span + st *cluster.Settings + scratch *SSTSnapshotStorageScratch + currSST storage.SSTWriter + // localKeySpans are key spans that are considered unsplittable across sstables, and + // represent the range's range local key spans. In contrast, mvccKeySpan can be split + // across multiple sstables if one of them exceeds maxSSTSize. The expectation is + // that for large ranges, keys in mvccKeySpan will dominate in size compared to keys + // in localKeySpans. + localKeySpans []roachpb.Span + mvccKeySpan roachpb.Span + // mvccSSTSpans reflects the actual split of the mvccKeySpan into constituent + // sstables. + mvccSSTSpans []storage.EngineKeyRange + // currSpan is the index of the current span being written to. The first + // len(localKeySpans) spans are localKeySpans, and the rest are mvccSSTSpans. + // In a sense, currSpan indexes into a slice composed of + // append(localKeySpans, mvccSSTSpans). currSpan int // The approximate size of the SST chunk to buffer in memory on the receiver // before flushing to disk. @@ -128,33 +153,84 @@ type multiSSTWriter struct { dataSize int64 // The total size of the SSTs. sstSize int64 - // if skipRangeDelForLastSpan is true, the last span is not ClearRanged in the - // same sstable. We rely on the caller to take care of clearing this span - // through a different process (eg. IngestAndExcise on pebble). - skipRangeDelForLastSpan bool + // if skipClearForMVCCSpan is true, the MVCC span is not ClearEngineRange()d in + // the same sstable. We rely on the caller to take care of clearing this span + // through a different process (eg. IngestAndExcise on pebble). Note that + // having this bool to true also disables all range key fragmentation + // and splitting of sstables in the mvcc span. + skipClearForMVCCSpan bool + // maxSSTSize is the maximum size to use for SSTs containing MVCC/user keys. + // Once the sstable writer reaches this size, it will be finalized and a new + // sstable will be created. + maxSSTSize int64 + // rangeKeyFrag is used to fragment range keys across the mvcc key spans. + rangeKeyFrag rangekey.Fragmenter } func newMultiSSTWriter( ctx context.Context, st *cluster.Settings, scratch *SSTSnapshotStorageScratch, - keySpans []roachpb.Span, + localKeySpans []roachpb.Span, + mvccKeySpan roachpb.Span, sstChunkSize int64, - skipRangeDelForLastSpan bool, -) (multiSSTWriter, error) { - msstw := multiSSTWriter{ - st: st, - scratch: scratch, - keySpans: keySpans, - sstChunkSize: sstChunkSize, - skipRangeDelForLastSpan: skipRangeDelForLastSpan, + skipClearForMVCCSpan bool, +) (*multiSSTWriter, error) { + msstw := &multiSSTWriter{ + st: st, + scratch: scratch, + localKeySpans: localKeySpans, + mvccKeySpan: mvccKeySpan, + mvccSSTSpans: []storage.EngineKeyRange{{ + Start: storage.EngineKey{Key: mvccKeySpan.Key}, + End: storage.EngineKey{Key: mvccKeySpan.EndKey}, + }}, + sstChunkSize: sstChunkSize, + skipClearForMVCCSpan: skipClearForMVCCSpan, + } + if !skipClearForMVCCSpan { + // If skipClearForMVCCSpan is true, we don't split the MVCC span across + // multiple sstables, as addClearForMVCCSpan could be called by the caller + // at any time. + msstw.maxSSTSize = MaxSnapshotSSTableSize.Get(&st.SV) + } + msstw.rangeKeyFrag = rangekey.Fragmenter{ + Cmp: storage.EngineComparer.Compare, + Format: storage.EngineComparer.FormatKey, + Emit: msstw.emitRangeKey, } + if err := msstw.initSST(ctx); err != nil { return msstw, err } return msstw, nil } +func (msstw *multiSSTWriter) emitRangeKey(key rangekey.Span) { + for i := range key.Keys { + if err := msstw.currSST.PutInternalRangeKey(key.Start, key.End, key.Keys[i]); err != nil { + panic(fmt.Sprintf("failed to put range key in sst: %s", err)) + } + } +} + +// currentSpan returns the current user-provided span that +// is being written to. Note that this does not account for +// mvcc keys being split across multiple sstables. +func (msstw *multiSSTWriter) currentSpan() roachpb.Span { + if msstw.currSpanIsMVCCSpan() { + return msstw.mvccKeySpan + } + return msstw.localKeySpans[msstw.currSpan] +} + +func (msstw *multiSSTWriter) currSpanIsMVCCSpan() bool { + if msstw.currSpan >= len(msstw.localKeySpans)+len(msstw.mvccSSTSpans) { + panic("current span is out of bounds") + } + return msstw.currSpan >= len(msstw.localKeySpans) +} + func (msstw *multiSSTWriter) initSST(ctx context.Context) error { newSSTFile, err := msstw.scratch.NewFile(ctx, msstw.sstChunkSize) if err != nil { @@ -162,26 +238,94 @@ func (msstw *multiSSTWriter) initSST(ctx context.Context) error { } newSST := storage.MakeIngestionSSTWriter(ctx, msstw.st, newSSTFile) msstw.currSST = newSST - if msstw.skipRangeDelForLastSpan && msstw.currSpan == len(msstw.keySpans)-1 { - // Skip this ClearRange, as it will be excised at ingestion time in the - // engine instead. - return nil - } - if err := msstw.currSST.ClearRawRange( - msstw.keySpans[msstw.currSpan].Key, msstw.keySpans[msstw.currSpan].EndKey, - true /* pointKeys */, true, /* rangeKeys */ - ); err != nil { - msstw.currSST.Close() - return errors.Wrap(err, "failed to clear range on sst file writer") + if !msstw.currSpanIsMVCCSpan() || (!msstw.skipClearForMVCCSpan && msstw.currSpan <= len(msstw.localKeySpans)) { + // We're either in a local key span, or we're in the first MVCC sstable + // span (before any splits). Add a RangeKeyDel for the whole span. If this + // is the MVCC span, we don't need to keep re-adding it to the fragmenter + // as the fragmenter will take care of splits. Note that currentSpan() + // will return the entire mvcc span in the case we're at an MVCC span. + startKey := storage.EngineKey{Key: msstw.currentSpan().Key}.Encode() + endKey := storage.EngineKey{Key: msstw.currentSpan().EndKey}.Encode() + trailer := pebble.MakeInternalKeyTrailer(0, pebble.InternalKeyKindRangeKeyDelete) + s := rangekey.Span{Start: startKey, End: endKey, Keys: []rangekey.Key{{Trailer: trailer}}} + msstw.rangeKeyFrag.Add(s) } return nil } -func (msstw *multiSSTWriter) finalizeSST(ctx context.Context) error { +func (msstw *multiSSTWriter) finalizeSST(ctx context.Context, nextKey *storage.EngineKey) error { + currSpan := msstw.currentSpan() + if msstw.currSpanIsMVCCSpan() { + // We're in the MVCC span (ie. MVCC / user keys). If skipClearForMVCCSpan + // is true, we don't write a clearRange for the last span at all. Otherwise, + // we need to write a clearRange for all keys leading up to the current key + // we're writing. + currEngineSpan := msstw.mvccSSTSpans[msstw.currSpan-len(msstw.localKeySpans)] + if !msstw.skipClearForMVCCSpan { + if err := msstw.currSST.ClearEngineRange( + currEngineSpan.Start, currEngineSpan.End, + ); err != nil { + msstw.currSST.Close() + return errors.Wrap(err, "failed to clear range on sst file writer") + } + } + } else { + if err := msstw.currSST.ClearRawRange( + currSpan.Key, currSpan.EndKey, + true /* pointKeys */, false, /* rangeKeys */ + ); err != nil { + msstw.currSST.Close() + return errors.Wrap(err, "failed to clear range on sst file writer") + } + } + + // If we're at the last span, call Finish on the fragmenter. If we're not at the + // last span, call Truncate. + if msstw.currSpan == len(msstw.localKeySpans)+len(msstw.mvccSSTSpans)-1 { + msstw.rangeKeyFrag.Finish() + } else { + endKey := storage.EngineKey{Key: currSpan.EndKey} + if msstw.currSpanIsMVCCSpan() { + endKey = msstw.mvccSSTSpans[msstw.currSpan-len(msstw.localKeySpans)].End + } + msstw.rangeKeyFrag.Truncate(endKey.Encode()) + } + err := msstw.currSST.Finish() if err != nil { return errors.Wrap(err, "failed to finish sst") } + if nextKey != nil { + meta := msstw.currSST.Meta + encodedNextKey := nextKey.Encode() + if meta.HasPointKeys && storage.EngineKeyCompare(meta.LargestPoint.UserKey, encodedNextKey) > 0 { + metaEndKey, ok := storage.DecodeEngineKey(meta.LargestPoint.UserKey) + if !ok { + return errors.Errorf("multiSSTWriter created overlapping ingestion sstables: sstable largest point key %s > next sstable start key %s", + meta.LargestPoint.UserKey, nextKey) + } + return errors.Errorf("multiSSTWriter created overlapping ingestion sstables: sstable largest point key %s > next sstable start key %s", + metaEndKey, nextKey) + } + if meta.HasRangeDelKeys && storage.EngineKeyCompare(meta.LargestRangeDel.UserKey, encodedNextKey) > 0 { + metaEndKey, ok := storage.DecodeEngineKey(meta.LargestRangeDel.UserKey) + if !ok { + return errors.Errorf("multiSSTWriter created overlapping ingestion sstables: sstable largest range del %s > next sstable start key %s", + meta.LargestRangeDel.UserKey, nextKey) + } + return errors.Errorf("multiSSTWriter created overlapping ingestion sstables: sstable largest range del %s > next sstable start key %s", + metaEndKey, nextKey) + } + if meta.HasRangeKeys && storage.EngineKeyCompare(meta.LargestRangeKey.UserKey, encodedNextKey) > 0 { + metaEndKey, ok := storage.DecodeEngineKey(meta.LargestRangeKey.UserKey) + if !ok { + return errors.Errorf("multiSSTWriter created overlapping ingestion sstables: sstable largest range key %s > next sstable start key %s", + meta.LargestRangeKey.UserKey, nextKey) + } + return errors.Errorf("multiSSTWriter created overlapping ingestion sstables: sstable largest range key %s > next sstable start key %s", + metaEndKey, nextKey) + } + } msstw.dataSize += msstw.currSST.DataSize msstw.sstSize += int64(msstw.currSST.Meta.Size) msstw.currSpan++ @@ -189,22 +333,22 @@ func (msstw *multiSSTWriter) finalizeSST(ctx context.Context) error { return nil } -// addRangeDelForLastSpan allows us to explicitly add a deletion tombstone -// for the last span in the msstw, if it was instantiated with the expectation +// addClearForMVCCSpan allows us to explicitly add a deletion tombstone +// for the mvcc span in the msstw, if it was instantiated with the expectation // that no tombstone was necessary. -func (msstw *multiSSTWriter) addRangeDelForLastSpan() error { - if !msstw.skipRangeDelForLastSpan { +func (msstw *multiSSTWriter) addClearForMVCCSpan() error { + if !msstw.skipClearForMVCCSpan { // Nothing to do. return nil } - if msstw.currSpan < len(msstw.keySpans)-1 { - // When we switch to the last key span, we will just add a rangedel for it. - // Set skipRangeDelForLastSpan to false. - msstw.skipRangeDelForLastSpan = false + if msstw.currSpan < len(msstw.localKeySpans) { + // When we switch to the mvcc key span, we will just add a rangedel for it. + // Set skipClearForMVCCSpan to false. + msstw.skipClearForMVCCSpan = false return nil } - if msstw.currSpan > len(msstw.keySpans)-1 { - panic("cannot addRangeDel if sst writer has moved past user keys") + if msstw.currSpan >= len(msstw.localKeySpans) { + panic("cannot clearEngineRange if sst writer has moved past user keys") } panic("multiSSTWriter already added keys to sstable that cannot be deleted by a rangedel/rangekeydel within it") } @@ -213,31 +357,57 @@ func (msstw *multiSSTWriter) addRangeDelForLastSpan() error { // writer for writing a point/range key at key. For point keys, endKey and key // must equal each other. func (msstw *multiSSTWriter) rolloverSST( - ctx context.Context, key roachpb.Key, endKey roachpb.Key, + ctx context.Context, key storage.EngineKey, endKey storage.EngineKey, ) error { - for msstw.keySpans[msstw.currSpan].EndKey.Compare(key) <= 0 { + for msstw.currentSpan().EndKey.Compare(key.Key) <= 0 { // Finish the current SST, write to the file, and move to the next key // range. - if err := msstw.finalizeSST(ctx); err != nil { + if err := msstw.finalizeSST(ctx, &key); err != nil { return err } if err := msstw.initSST(ctx); err != nil { return err } } - if msstw.keySpans[msstw.currSpan].Key.Compare(key) > 0 || - msstw.keySpans[msstw.currSpan].EndKey.Compare(endKey) < 0 { - if !key.Equal(endKey) { - return errors.AssertionFailedf("client error: expected %s to fall in one of %s", - roachpb.Span{Key: key, EndKey: endKey}, msstw.keySpans) + currSpan := msstw.currentSpan() + if currSpan.Key.Compare(key.Key) > 0 || currSpan.EndKey.Compare(endKey.Key) < 0 { + if !key.Key.Equal(endKey.Key) { + return errors.AssertionFailedf("client error: expected %s to fall in one of %s or %s", + roachpb.Span{Key: key.Key, EndKey: endKey.Key}, msstw.localKeySpans, msstw.mvccKeySpan) + } + return errors.AssertionFailedf("client error: expected %s to fall in one of %s or %s", key, msstw.localKeySpans, msstw.mvccKeySpan) + } + if msstw.currSpanIsMVCCSpan() && msstw.maxSSTSize > 0 && msstw.currSST.DataSize > msstw.maxSSTSize { + // We're in an MVCC / user keys span, and the current sstable has exceeded + // the max size for MVCC sstables that we should be creating. Split this + // sstable into smaller ones. We do this by splitting the mvccKeySpan + // from [oldStartKey, oldEndKey) to [oldStartKey, key) and [key, oldEndKey). + // The split spans are added to msstw.mvccSSTSpans. + currSpan := &msstw.mvccSSTSpans[msstw.currSpan-len(msstw.localKeySpans)] + if bytes.Equal(currSpan.Start.Key, key.Key) && bytes.Equal(currSpan.Start.Version, key.Version) { + panic("unexpectedly reached max sstable size at start of an mvcc sstable span") + } + oldEndKey := currSpan.End + currSpan.End = key.Copy() + newSpan := storage.EngineKeyRange{Start: currSpan.End, End: oldEndKey} + msstw.mvccSSTSpans = append(msstw.mvccSSTSpans, newSpan) + if msstw.currSpan < len(msstw.localKeySpans)+len(msstw.mvccSSTSpans)-2 { + // This should never happen; we only split sstables when we're at the end + // of mvccSSTSpans. + panic("unexpectedly split an earlier mvcc sstable span in multiSSTWriter") + } + if err := msstw.finalizeSST(ctx, &key); err != nil { + return err + } + if err := msstw.initSST(ctx); err != nil { + return err } - return errors.AssertionFailedf("client error: expected %s to fall in one of %s", key, msstw.keySpans) } return nil } func (msstw *multiSSTWriter) Put(ctx context.Context, key storage.EngineKey, value []byte) error { - if err := msstw.rolloverSST(ctx, key.Key, key.Key); err != nil { + if err := msstw.rolloverSST(ctx, key, key); err != nil { return err } if err := msstw.currSST.PutEngineKey(key, value); err != nil { @@ -253,7 +423,7 @@ func (msstw *multiSSTWriter) PutInternalPointKey( if !ok { return errors.New("cannot decode engine key") } - if err := msstw.rolloverSST(ctx, decodedKey.Key, decodedKey.Key); err != nil { + if err := msstw.rolloverSST(ctx, decodedKey, decodedKey); err != nil { return err } var err error @@ -290,11 +460,14 @@ func decodeRangeStartEnd( } func (msstw *multiSSTWriter) PutInternalRangeDelete(ctx context.Context, start, end []byte) error { + if !msstw.skipClearForMVCCSpan { + panic("can only add internal range deletes to multiSSTWriter if skipClearForMVCCSpan is true") + } decodedStart, decodedEnd, err := decodeRangeStartEnd(start, end) if err != nil { return err } - if err := msstw.rolloverSST(ctx, decodedStart.Key, decodedEnd.Key); err != nil { + if err := msstw.rolloverSST(ctx, decodedStart, decodedEnd); err != nil { return err } if err := msstw.currSST.ClearRawEncodedRange(start, end); err != nil { @@ -306,11 +479,14 @@ func (msstw *multiSSTWriter) PutInternalRangeDelete(ctx context.Context, start, func (msstw *multiSSTWriter) PutInternalRangeKey( ctx context.Context, start, end []byte, key rangekey.Key, ) error { + if !msstw.skipClearForMVCCSpan { + panic("can only add internal range deletes to multiSSTWriter if skipClearForMVCCSpan is true") + } decodedStart, decodedEnd, err := decodeRangeStartEnd(start, end) if err != nil { return err } - if err := msstw.rolloverSST(ctx, decodedStart.Key, decodedEnd.Key); err != nil { + if err := msstw.rolloverSST(ctx, decodedStart, decodedEnd); err != nil { return err } if err := msstw.currSST.PutInternalRangeKey(start, end, key); err != nil { @@ -325,22 +501,33 @@ func (msstw *multiSSTWriter) PutRangeKey( if start.Compare(end) >= 0 { return errors.AssertionFailedf("start key %s must be before end key %s", end, start) } - if err := msstw.rolloverSST(ctx, start, end); err != nil { + if err := msstw.rolloverSST(ctx, storage.EngineKey{Key: start}, storage.EngineKey{Key: end}); err != nil { return err } - if err := msstw.currSST.PutEngineRangeKey(start, end, suffix, value); err != nil { - return errors.Wrap(err, "failed to put range key in sst") + if msstw.skipClearForMVCCSpan { + // Skip the fragmenter. See the comment in skipClearForMVCCSpan. + if err := msstw.currSST.PutEngineRangeKey(start, end, suffix, value); err != nil { + return errors.Wrap(err, "failed to put range key in sst") + } + return nil } + startKey, endKey := storage.EngineKey{Key: start}.Encode(), storage.EngineKey{Key: end}.Encode() + startTrailer := pebble.MakeInternalKeyTrailer(0, pebble.InternalKeyKindRangeKeySet) + msstw.rangeKeyFrag.Add(rangekey.Span{ + Start: startKey, + End: endKey, + Keys: []rangekey.Key{{Trailer: startTrailer, Suffix: suffix, Value: value}}, + }) return nil } func (msstw *multiSSTWriter) Finish(ctx context.Context) (int64, error) { - if msstw.currSpan < len(msstw.keySpans) { + if msstw.currSpan < (len(msstw.localKeySpans) + len(msstw.mvccSSTSpans)) { for { - if err := msstw.finalizeSST(ctx); err != nil { + if err := msstw.finalizeSST(ctx, nil /* nextKey */); err != nil { return 0, err } - if msstw.currSpan >= len(msstw.keySpans) { + if msstw.currSpan >= (len(msstw.localKeySpans) + len(msstw.mvccSSTSpans)) { break } if err := msstw.initSST(ctx); err != nil { @@ -506,8 +693,11 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( // TODO(aaditya): Remove once we support flushableIngests for shared and // external files in the engine. - skipRangeDelForLastSpan := doExcise && (header.SharedReplicate || header.ExternalReplicate) - msstw, err := newMultiSSTWriter(ctx, kvSS.st, kvSS.scratch, keyRanges, kvSS.sstChunkSize, skipRangeDelForLastSpan) + skipClearForMVCCSpan := doExcise && (header.SharedReplicate || header.ExternalReplicate) + // The last key range is the user key span. + localRanges := keyRanges[:len(keyRanges)-1] + mvccRange := keyRanges[len(keyRanges)-1] + msstw, err := newMultiSSTWriter(ctx, kvSS.st, kvSS.scratch, localRanges, mvccRange, kvSS.sstChunkSize, skipClearForMVCCSpan) if err != nil { return noSnap, err } @@ -533,7 +723,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( doExcise = false sharedSSTs = nil externalSSTs = nil - if err := msstw.addRangeDelForLastSpan(); err != nil { + if err := msstw.addClearForMVCCSpan(); err != nil { return noSnap, errors.Wrap(err, "adding tombstone for last span") } } @@ -706,7 +896,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( sharedSSTs: sharedSSTs, externalSSTs: externalSSTs, doExcise: doExcise, - includesRangeDelForLastSpan: !skipRangeDelForLastSpan, + includesRangeDelForLastSpan: !skipClearForMVCCSpan, clearedSpans: keyRanges, } diff --git a/pkg/storage/engine_key.go b/pkg/storage/engine_key.go index 91bb63747cd8..191805666106 100644 --- a/pkg/storage/engine_key.go +++ b/pkg/storage/engine_key.go @@ -419,3 +419,8 @@ func decodeMVCCMetaAndVerify(key roachpb.Key, value []byte) error { } return decodeMVCCValueAndVerify(key, meta.RawBytes) } + +// EngineKeyRange is a key range composed of EngineKeys. +type EngineKeyRange struct { + Start, End EngineKey +} diff --git a/pkg/storage/sst_writer.go b/pkg/storage/sst_writer.go index 4c4179e15bdc..66ee08e83fac 100644 --- a/pkg/storage/sst_writer.go +++ b/pkg/storage/sst_writer.go @@ -291,6 +291,17 @@ func (fw *SSTWriter) ClearEngineRangeKey(start, end roachpb.Key, suffix []byte) return fw.fw.RangeKeyUnset(EngineKey{Key: start}.Encode(), EngineKey{Key: end}.Encode(), suffix) } +// ClearEngineRange clears point keys in the specified EngineKey range. +func (fw *SSTWriter) ClearEngineRange(start, end EngineKey) error { + fw.scratch = start.EncodeToBuf(fw.scratch[:0]) + endRaw := end.Encode() + fw.DataSize += int64(len(start.Key)) + int64(len(end.Key)) + if err := fw.fw.DeleteRange(fw.scratch, endRaw); err != nil { + return err + } + return nil +} + // ClearRawEncodedRange implements the InternalWriter interface. func (fw *SSTWriter) ClearRawEncodedRange(start, end []byte) error { startEngine, ok := DecodeEngineKey(start)