diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index df55799bb088..eba3c7f2db93 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -527,6 +527,7 @@ go_test( "@com_github_cockroachdb_errors//oserror", "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_pebble//:pebble", + "@com_github_cockroachdb_pebble//sstable", "@com_github_cockroachdb_pebble//vfs", "@com_github_cockroachdb_redact//:redact", "@com_github_dustin_go_humanize//:go-humanize", diff --git a/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go b/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go index 0ded1aa2fc51..6e13695a7e98 100644 --- a/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go +++ b/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go @@ -31,6 +31,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/errors/oserror" + "github.com/cockroachdb/pebble/sstable" + "github.com/cockroachdb/pebble/vfs" "github.com/stretchr/testify/require" "golang.org/x/time/rate" ) @@ -276,10 +278,12 @@ func TestMultiSSTWriterInitSST(t *testing.T) { EndKey: roachpb.RKeyMax, } keySpans := rditer.MakeReplicatedKeySpans(&desc) + localSpans := keySpans[:len(keySpans)-1] + mvccSpan := keySpans[len(keySpans)-1] msstw, err := newMultiSSTWriter( - ctx, cluster.MakeTestingClusterSettings(), scratch, keySpans, 0, - false, /* skipRangeDelForLastSpan */ + ctx, cluster.MakeTestingClusterSettings(), scratch, localSpans, mvccSpan, 0, + false, /* skipRangeDelForMVCCSpan */ ) require.NoError(t, err) _, err = msstw.Finish(ctx) @@ -315,6 +319,117 @@ func TestMultiSSTWriterInitSST(t *testing.T) { } } +func buildIterForScratch( + t *testing.T, keySpans []roachpb.Span, scratch *SSTSnapshotStorageScratch, +) (storage.MVCCIterator, error) { + var openFiles []sstable.ReadableFile + for _, sstPath := range scratch.SSTs()[len(keySpans)-1:] { + f, err := vfs.Default.Open(sstPath) + require.NoError(t, err) + openFiles = append(openFiles, f) + } + mvccSpan := keySpans[len(keySpans)-1] + + return storage.NewSSTIterator([][]sstable.ReadableFile{openFiles}, storage.IterOptions{ + LowerBound: mvccSpan.Key, + UpperBound: mvccSpan.EndKey, + }) +} + +// TestMultiSSTWriterSize tests the effect of lowering the max size +// of sstables in a multiSSTWriter, and ensuring that the produced sstables +// are still correct. +func TestMultiSSTWriterSize(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + testRangeID := roachpb.RangeID(1) + testSnapUUID := uuid.Must(uuid.FromBytes([]byte("foobar1234567890"))) + testLimiter := rate.NewLimiter(rate.Inf, 0) + + cleanup, eng := newOnDiskEngine(ctx, t) + defer cleanup() + defer eng.Close() + + sstSnapshotStorage := NewSSTSnapshotStorage(eng, testLimiter) + ref := sstSnapshotStorage.NewScratchSpace(testRangeID, testSnapUUID) + scratch := sstSnapshotStorage.NewScratchSpace(testRangeID, testSnapUUID) + settings := cluster.MakeTestingClusterSettings() + + desc := roachpb.RangeDescriptor{ + StartKey: roachpb.RKey("d"), + EndKey: roachpb.RKeyMax, + } + keySpans := rditer.MakeReplicatedKeySpans(&desc) + localSpans := keySpans[:len(keySpans)-1] + mvccSpan := keySpans[len(keySpans)-1] + + // Make a reference msstw with the default size. + referenceMsstw, err := newMultiSSTWriter(ctx, settings, ref, localSpans, mvccSpan, 0, false) + require.NoError(t, err) + require.Equal(t, int64(0), referenceMsstw.dataSize) + + for i := range localSpans { + require.NoError(t, referenceMsstw.Put(ctx, storage.EngineKey{Key: localSpans[i].Key}, []byte("foo"))) + } + + for i := 0; i < 100; i++ { + require.NoError(t, referenceMsstw.Put(ctx, storage.EngineKey{Key: roachpb.Key(append(desc.StartKey, byte(i)))}, []byte("foobarbaz"))) + } + _, err = referenceMsstw.Finish(ctx) + require.NoError(t, err) + + refIter, err := buildIterForScratch(t, keySpans, ref) + require.NoError(t, err) + defer refIter.Close() + + MaxSnapshotSSTableSize.Override(ctx, &settings.SV, 100) + + multiSSTWriter, err := newMultiSSTWriter(ctx, settings, scratch, localSpans, mvccSpan, 0, false) + require.NoError(t, err) + require.Equal(t, int64(0), multiSSTWriter.dataSize) + + for i := range localSpans { + require.NoError(t, multiSSTWriter.Put(ctx, storage.EngineKey{Key: localSpans[i].Key}, []byte("foo"))) + } + + for i := 0; i < 100; i++ { + require.NoError(t, multiSSTWriter.Put(ctx, storage.EngineKey{Key: roachpb.Key(append(desc.StartKey, byte(i)))}, []byte("foobarbaz"))) + } + + _, err = multiSSTWriter.Finish(ctx) + require.NoError(t, err) + require.Greater(t, len(scratch.SSTs()), len(ref.SSTs())) + + iter, err := buildIterForScratch(t, keySpans, scratch) + require.NoError(t, err) + defer iter.Close() + + iter.SeekGE(storage.MVCCKey{Key: mvccSpan.Key}) + refIter.SeekGE(storage.MVCCKey{Key: mvccSpan.Key}) + valid, err := iter.Valid() + valid2, err2 := refIter.Valid() + require.NoError(t, err) + require.NoError(t, err2) + + for valid && valid2 { + + require.Equal(t, iter.UnsafeKey(), refIter.UnsafeKey()) + val, err := iter.UnsafeValue() + require.NoError(t, err) + val2, err2 := refIter.UnsafeValue() + require.NoError(t, err2) + require.Equal(t, val, val2) + iter.Next() + refIter.Next() + valid, err = iter.Valid() + valid2, err2 = refIter.Valid() + require.NoError(t, err) + require.NoError(t, err2) + } + require.Equal(t, valid, valid2) +} + // TestMultiSSTWriterAddLastSpan tests that multiSSTWriter initializes each of // the SST files associated with the replicated key ranges by writing a range // deletion tombstone that spans the entire range of each respectively, except @@ -342,14 +457,16 @@ func TestMultiSSTWriterAddLastSpan(t *testing.T) { EndKey: roachpb.RKeyMax, } keySpans := rditer.MakeReplicatedKeySpans(&desc) + localSpans := keySpans[:len(keySpans)-1] + mvccSpan := keySpans[len(keySpans)-1] msstw, err := newMultiSSTWriter( - ctx, cluster.MakeTestingClusterSettings(), scratch, keySpans, 0, - true, /* skipRangeDelForLastSpan */ + ctx, cluster.MakeTestingClusterSettings(), scratch, localSpans, mvccSpan, 0, + true, /* skipRangeDelForMVCCSpan */ ) require.NoError(t, err) if addRangeDel { - require.NoError(t, msstw.addRangeDelForLastSpan()) + require.NoError(t, msstw.addClearForMVCCSpan()) } testKey := storage.MVCCKey{Key: roachpb.RKey("d1").AsRawKey(), Timestamp: hlc.Timestamp{WallTime: 1}} testEngineKey, _ := storage.DecodeEngineKey(storage.EncodeMVCCKey(testKey)) diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 997132b4411f..01a34a952033 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -11,7 +11,9 @@ package kvserver import ( + "bytes" "context" + "fmt" "io" "time" @@ -62,6 +64,16 @@ const ( tagSnapshotTiming = "snapshot_timing_tag" ) +// MaxSnapshotSSTableSize is the maximum size of an sstable containing MVCC/user keys +// in a snapshot before we truncate and write a new snapshot sstable. +var MaxSnapshotSSTableSize = settings.RegisterByteSizeSetting( + settings.SystemOnly, + "kv.snapshot_rebalance.max_sst_size", + "maximum size of a rebalance or recovery SST size", + 128<<20, // 128 MB + settings.PositiveInt, +) + // snapshotMetrics contains metrics on the number and size of snapshots in // progress or in the snapshot queue. type snapshotMetrics struct { @@ -115,10 +127,23 @@ type kvBatchSnapshotStrategy struct { // multiSSTWriter is a wrapper around an SSTWriter and SSTSnapshotStorageScratch // that handles chunking SSTs and persisting them to disk. type multiSSTWriter struct { - st *cluster.Settings - scratch *SSTSnapshotStorageScratch - currSST storage.SSTWriter - keySpans []roachpb.Span + st *cluster.Settings + scratch *SSTSnapshotStorageScratch + currSST storage.SSTWriter + // localKeySpans are key spans that are considered unsplittable across sstables, and + // represent the range's range local key spans. In contrast, mvccKeySpan can be split + // across multiple sstables if one of them exceeds maxSSTSize. The expectation is + // that for large ranges, keys in mvccKeySpan will dominate in size compared to keys + // in localKeySpans. + localKeySpans []roachpb.Span + mvccKeySpan roachpb.Span + // mvccSSTSpans reflects the actual split of the mvccKeySpan into constituent + // sstables. + mvccSSTSpans []storage.EngineKeyRange + // currSpan is the index of the current span being written to. The first + // len(localKeySpans) spans are localKeySpans, and the rest are mvccSSTSpans. + // In a sense, currSpan indexes into a slice composed of + // append(localKeySpans, mvccSSTSpans). currSpan int // The approximate size of the SST chunk to buffer in memory on the receiver // before flushing to disk. @@ -128,33 +153,84 @@ type multiSSTWriter struct { dataSize int64 // The total size of the SSTs. sstSize int64 - // if skipRangeDelForLastSpan is true, the last span is not ClearRanged in the - // same sstable. We rely on the caller to take care of clearing this span - // through a different process (eg. IngestAndExcise on pebble). - skipRangeDelForLastSpan bool + // if skipClearForMVCCSpan is true, the MVCC span is not ClearEngineRange()d in + // the same sstable. We rely on the caller to take care of clearing this span + // through a different process (eg. IngestAndExcise on pebble). Note that + // having this bool to true also disables all range key fragmentation + // and splitting of sstables in the mvcc span. + skipClearForMVCCSpan bool + // maxSSTSize is the maximum size to use for SSTs containing MVCC/user keys. + // Once the sstable writer reaches this size, it will be finalized and a new + // sstable will be created. + maxSSTSize int64 + // rangeKeyFrag is used to fragment range keys across the mvcc key spans. + rangeKeyFrag rangekey.Fragmenter } func newMultiSSTWriter( ctx context.Context, st *cluster.Settings, scratch *SSTSnapshotStorageScratch, - keySpans []roachpb.Span, + localKeySpans []roachpb.Span, + mvccKeySpan roachpb.Span, sstChunkSize int64, - skipRangeDelForLastSpan bool, -) (multiSSTWriter, error) { - msstw := multiSSTWriter{ - st: st, - scratch: scratch, - keySpans: keySpans, - sstChunkSize: sstChunkSize, - skipRangeDelForLastSpan: skipRangeDelForLastSpan, + skipClearForMVCCSpan bool, +) (*multiSSTWriter, error) { + msstw := &multiSSTWriter{ + st: st, + scratch: scratch, + localKeySpans: localKeySpans, + mvccKeySpan: mvccKeySpan, + mvccSSTSpans: []storage.EngineKeyRange{{ + Start: storage.EngineKey{Key: mvccKeySpan.Key}, + End: storage.EngineKey{Key: mvccKeySpan.EndKey}, + }}, + sstChunkSize: sstChunkSize, + skipClearForMVCCSpan: skipClearForMVCCSpan, + } + if !skipClearForMVCCSpan { + // If skipClearForMVCCSpan is true, we don't split the MVCC span across + // multiple sstables, as addClearForMVCCSpan could be called by the caller + // at any time. + msstw.maxSSTSize = MaxSnapshotSSTableSize.Get(&st.SV) + } + msstw.rangeKeyFrag = rangekey.Fragmenter{ + Cmp: storage.EngineComparer.Compare, + Format: storage.EngineComparer.FormatKey, + Emit: msstw.emitRangeKey, } + if err := msstw.initSST(ctx); err != nil { return msstw, err } return msstw, nil } +func (msstw *multiSSTWriter) emitRangeKey(key rangekey.Span) { + for i := range key.Keys { + if err := msstw.currSST.PutInternalRangeKey(key.Start, key.End, key.Keys[i]); err != nil { + panic(fmt.Sprintf("failed to put range key in sst: %s", err)) + } + } +} + +// currentSpan returns the current user-provided span that +// is being written to. Note that this does not account for +// mvcc keys being split across multiple sstables. +func (msstw *multiSSTWriter) currentSpan() roachpb.Span { + if msstw.currSpanIsMVCCSpan() { + return msstw.mvccKeySpan + } + return msstw.localKeySpans[msstw.currSpan] +} + +func (msstw *multiSSTWriter) currSpanIsMVCCSpan() bool { + if msstw.currSpan >= len(msstw.localKeySpans)+len(msstw.mvccSSTSpans) { + panic("current span is out of bounds") + } + return msstw.currSpan >= len(msstw.localKeySpans) +} + func (msstw *multiSSTWriter) initSST(ctx context.Context) error { newSSTFile, err := msstw.scratch.NewFile(ctx, msstw.sstChunkSize) if err != nil { @@ -162,26 +238,94 @@ func (msstw *multiSSTWriter) initSST(ctx context.Context) error { } newSST := storage.MakeIngestionSSTWriter(ctx, msstw.st, newSSTFile) msstw.currSST = newSST - if msstw.skipRangeDelForLastSpan && msstw.currSpan == len(msstw.keySpans)-1 { - // Skip this ClearRange, as it will be excised at ingestion time in the - // engine instead. - return nil - } - if err := msstw.currSST.ClearRawRange( - msstw.keySpans[msstw.currSpan].Key, msstw.keySpans[msstw.currSpan].EndKey, - true /* pointKeys */, true, /* rangeKeys */ - ); err != nil { - msstw.currSST.Close() - return errors.Wrap(err, "failed to clear range on sst file writer") + if !msstw.currSpanIsMVCCSpan() || (!msstw.skipClearForMVCCSpan && msstw.currSpan <= len(msstw.localKeySpans)) { + // We're either in a local key span, or we're in the first MVCC sstable + // span (before any splits). Add a RangeKeyDel for the whole span. If this + // is the MVCC span, we don't need to keep re-adding it to the fragmenter + // as the fragmenter will take care of splits. Note that currentSpan() + // will return the entire mvcc span in the case we're at an MVCC span. + startKey := storage.EngineKey{Key: msstw.currentSpan().Key}.Encode() + endKey := storage.EngineKey{Key: msstw.currentSpan().EndKey}.Encode() + trailer := pebble.MakeInternalKeyTrailer(0, pebble.InternalKeyKindRangeKeyDelete) + s := rangekey.Span{Start: startKey, End: endKey, Keys: []rangekey.Key{{Trailer: trailer}}} + msstw.rangeKeyFrag.Add(s) } return nil } -func (msstw *multiSSTWriter) finalizeSST(ctx context.Context) error { +func (msstw *multiSSTWriter) finalizeSST(ctx context.Context, nextKey *storage.EngineKey) error { + currSpan := msstw.currentSpan() + if msstw.currSpanIsMVCCSpan() { + // We're in the MVCC span (ie. MVCC / user keys). If skipClearForMVCCSpan + // is true, we don't write a clearRange for the last span at all. Otherwise, + // we need to write a clearRange for all keys leading up to the current key + // we're writing. + currEngineSpan := msstw.mvccSSTSpans[msstw.currSpan-len(msstw.localKeySpans)] + if !msstw.skipClearForMVCCSpan { + if err := msstw.currSST.ClearEngineRange( + currEngineSpan.Start, currEngineSpan.End, + ); err != nil { + msstw.currSST.Close() + return errors.Wrap(err, "failed to clear range on sst file writer") + } + } + } else { + if err := msstw.currSST.ClearRawRange( + currSpan.Key, currSpan.EndKey, + true /* pointKeys */, false, /* rangeKeys */ + ); err != nil { + msstw.currSST.Close() + return errors.Wrap(err, "failed to clear range on sst file writer") + } + } + + // If we're at the last span, call Finish on the fragmenter. If we're not at the + // last span, call Truncate. + if msstw.currSpan == len(msstw.localKeySpans)+len(msstw.mvccSSTSpans)-1 { + msstw.rangeKeyFrag.Finish() + } else { + endKey := storage.EngineKey{Key: currSpan.EndKey} + if msstw.currSpanIsMVCCSpan() { + endKey = msstw.mvccSSTSpans[msstw.currSpan-len(msstw.localKeySpans)].End + } + msstw.rangeKeyFrag.Truncate(endKey.Encode()) + } + err := msstw.currSST.Finish() if err != nil { return errors.Wrap(err, "failed to finish sst") } + if nextKey != nil { + meta := msstw.currSST.Meta + encodedNextKey := nextKey.Encode() + if meta.HasPointKeys && storage.EngineKeyCompare(meta.LargestPoint.UserKey, encodedNextKey) > 0 { + metaEndKey, ok := storage.DecodeEngineKey(meta.LargestPoint.UserKey) + if !ok { + return errors.Errorf("multiSSTWriter created overlapping ingestion sstables: sstable largest point key %s > next sstable start key %s", + meta.LargestPoint.UserKey, nextKey) + } + return errors.Errorf("multiSSTWriter created overlapping ingestion sstables: sstable largest point key %s > next sstable start key %s", + metaEndKey, nextKey) + } + if meta.HasRangeDelKeys && storage.EngineKeyCompare(meta.LargestRangeDel.UserKey, encodedNextKey) > 0 { + metaEndKey, ok := storage.DecodeEngineKey(meta.LargestRangeDel.UserKey) + if !ok { + return errors.Errorf("multiSSTWriter created overlapping ingestion sstables: sstable largest range del %s > next sstable start key %s", + meta.LargestRangeDel.UserKey, nextKey) + } + return errors.Errorf("multiSSTWriter created overlapping ingestion sstables: sstable largest range del %s > next sstable start key %s", + metaEndKey, nextKey) + } + if meta.HasRangeKeys && storage.EngineKeyCompare(meta.LargestRangeKey.UserKey, encodedNextKey) > 0 { + metaEndKey, ok := storage.DecodeEngineKey(meta.LargestRangeKey.UserKey) + if !ok { + return errors.Errorf("multiSSTWriter created overlapping ingestion sstables: sstable largest range key %s > next sstable start key %s", + meta.LargestRangeKey.UserKey, nextKey) + } + return errors.Errorf("multiSSTWriter created overlapping ingestion sstables: sstable largest range key %s > next sstable start key %s", + metaEndKey, nextKey) + } + } msstw.dataSize += msstw.currSST.DataSize msstw.sstSize += int64(msstw.currSST.Meta.Size) msstw.currSpan++ @@ -189,22 +333,22 @@ func (msstw *multiSSTWriter) finalizeSST(ctx context.Context) error { return nil } -// addRangeDelForLastSpan allows us to explicitly add a deletion tombstone -// for the last span in the msstw, if it was instantiated with the expectation +// addClearForMVCCSpan allows us to explicitly add a deletion tombstone +// for the mvcc span in the msstw, if it was instantiated with the expectation // that no tombstone was necessary. -func (msstw *multiSSTWriter) addRangeDelForLastSpan() error { - if !msstw.skipRangeDelForLastSpan { +func (msstw *multiSSTWriter) addClearForMVCCSpan() error { + if !msstw.skipClearForMVCCSpan { // Nothing to do. return nil } - if msstw.currSpan < len(msstw.keySpans)-1 { - // When we switch to the last key span, we will just add a rangedel for it. - // Set skipRangeDelForLastSpan to false. - msstw.skipRangeDelForLastSpan = false + if msstw.currSpan < len(msstw.localKeySpans) { + // When we switch to the mvcc key span, we will just add a rangedel for it. + // Set skipClearForMVCCSpan to false. + msstw.skipClearForMVCCSpan = false return nil } - if msstw.currSpan > len(msstw.keySpans)-1 { - panic("cannot addRangeDel if sst writer has moved past user keys") + if msstw.currSpan >= len(msstw.localKeySpans) { + panic("cannot clearEngineRange if sst writer has moved past user keys") } panic("multiSSTWriter already added keys to sstable that cannot be deleted by a rangedel/rangekeydel within it") } @@ -213,31 +357,57 @@ func (msstw *multiSSTWriter) addRangeDelForLastSpan() error { // writer for writing a point/range key at key. For point keys, endKey and key // must equal each other. func (msstw *multiSSTWriter) rolloverSST( - ctx context.Context, key roachpb.Key, endKey roachpb.Key, + ctx context.Context, key storage.EngineKey, endKey storage.EngineKey, ) error { - for msstw.keySpans[msstw.currSpan].EndKey.Compare(key) <= 0 { + for msstw.currentSpan().EndKey.Compare(key.Key) <= 0 { // Finish the current SST, write to the file, and move to the next key // range. - if err := msstw.finalizeSST(ctx); err != nil { + if err := msstw.finalizeSST(ctx, &key); err != nil { return err } if err := msstw.initSST(ctx); err != nil { return err } } - if msstw.keySpans[msstw.currSpan].Key.Compare(key) > 0 || - msstw.keySpans[msstw.currSpan].EndKey.Compare(endKey) < 0 { - if !key.Equal(endKey) { - return errors.AssertionFailedf("client error: expected %s to fall in one of %s", - roachpb.Span{Key: key, EndKey: endKey}, msstw.keySpans) + currSpan := msstw.currentSpan() + if currSpan.Key.Compare(key.Key) > 0 || currSpan.EndKey.Compare(endKey.Key) < 0 { + if !key.Key.Equal(endKey.Key) { + return errors.AssertionFailedf("client error: expected %s to fall in one of %s or %s", + roachpb.Span{Key: key.Key, EndKey: endKey.Key}, msstw.localKeySpans, msstw.mvccKeySpan) + } + return errors.AssertionFailedf("client error: expected %s to fall in one of %s or %s", key, msstw.localKeySpans, msstw.mvccKeySpan) + } + if msstw.currSpanIsMVCCSpan() && msstw.maxSSTSize > 0 && msstw.currSST.DataSize > msstw.maxSSTSize { + // We're in an MVCC / user keys span, and the current sstable has exceeded + // the max size for MVCC sstables that we should be creating. Split this + // sstable into smaller ones. We do this by splitting the mvccKeySpan + // from [oldStartKey, oldEndKey) to [oldStartKey, key) and [key, oldEndKey). + // The split spans are added to msstw.mvccSSTSpans. + currSpan := &msstw.mvccSSTSpans[msstw.currSpan-len(msstw.localKeySpans)] + if bytes.Equal(currSpan.Start.Key, key.Key) && bytes.Equal(currSpan.Start.Version, key.Version) { + panic("unexpectedly reached max sstable size at start of an mvcc sstable span") + } + oldEndKey := currSpan.End + currSpan.End = key.Copy() + newSpan := storage.EngineKeyRange{Start: currSpan.End, End: oldEndKey} + msstw.mvccSSTSpans = append(msstw.mvccSSTSpans, newSpan) + if msstw.currSpan < len(msstw.localKeySpans)+len(msstw.mvccSSTSpans)-2 { + // This should never happen; we only split sstables when we're at the end + // of mvccSSTSpans. + panic("unexpectedly split an earlier mvcc sstable span in multiSSTWriter") + } + if err := msstw.finalizeSST(ctx, &key); err != nil { + return err + } + if err := msstw.initSST(ctx); err != nil { + return err } - return errors.AssertionFailedf("client error: expected %s to fall in one of %s", key, msstw.keySpans) } return nil } func (msstw *multiSSTWriter) Put(ctx context.Context, key storage.EngineKey, value []byte) error { - if err := msstw.rolloverSST(ctx, key.Key, key.Key); err != nil { + if err := msstw.rolloverSST(ctx, key, key); err != nil { return err } if err := msstw.currSST.PutEngineKey(key, value); err != nil { @@ -253,7 +423,7 @@ func (msstw *multiSSTWriter) PutInternalPointKey( if !ok { return errors.New("cannot decode engine key") } - if err := msstw.rolloverSST(ctx, decodedKey.Key, decodedKey.Key); err != nil { + if err := msstw.rolloverSST(ctx, decodedKey, decodedKey); err != nil { return err } var err error @@ -290,11 +460,14 @@ func decodeRangeStartEnd( } func (msstw *multiSSTWriter) PutInternalRangeDelete(ctx context.Context, start, end []byte) error { + if !msstw.skipClearForMVCCSpan { + panic("can only add internal range deletes to multiSSTWriter if skipClearForMVCCSpan is true") + } decodedStart, decodedEnd, err := decodeRangeStartEnd(start, end) if err != nil { return err } - if err := msstw.rolloverSST(ctx, decodedStart.Key, decodedEnd.Key); err != nil { + if err := msstw.rolloverSST(ctx, decodedStart, decodedEnd); err != nil { return err } if err := msstw.currSST.ClearRawEncodedRange(start, end); err != nil { @@ -306,11 +479,14 @@ func (msstw *multiSSTWriter) PutInternalRangeDelete(ctx context.Context, start, func (msstw *multiSSTWriter) PutInternalRangeKey( ctx context.Context, start, end []byte, key rangekey.Key, ) error { + if !msstw.skipClearForMVCCSpan { + panic("can only add internal range deletes to multiSSTWriter if skipClearForMVCCSpan is true") + } decodedStart, decodedEnd, err := decodeRangeStartEnd(start, end) if err != nil { return err } - if err := msstw.rolloverSST(ctx, decodedStart.Key, decodedEnd.Key); err != nil { + if err := msstw.rolloverSST(ctx, decodedStart, decodedEnd); err != nil { return err } if err := msstw.currSST.PutInternalRangeKey(start, end, key); err != nil { @@ -325,22 +501,33 @@ func (msstw *multiSSTWriter) PutRangeKey( if start.Compare(end) >= 0 { return errors.AssertionFailedf("start key %s must be before end key %s", end, start) } - if err := msstw.rolloverSST(ctx, start, end); err != nil { + if err := msstw.rolloverSST(ctx, storage.EngineKey{Key: start}, storage.EngineKey{Key: end}); err != nil { return err } - if err := msstw.currSST.PutEngineRangeKey(start, end, suffix, value); err != nil { - return errors.Wrap(err, "failed to put range key in sst") + if msstw.skipClearForMVCCSpan { + // Skip the fragmenter. See the comment in skipClearForMVCCSpan. + if err := msstw.currSST.PutEngineRangeKey(start, end, suffix, value); err != nil { + return errors.Wrap(err, "failed to put range key in sst") + } + return nil } + startKey, endKey := storage.EngineKey{Key: start}.Encode(), storage.EngineKey{Key: end}.Encode() + startTrailer := pebble.MakeInternalKeyTrailer(0, pebble.InternalKeyKindRangeKeySet) + msstw.rangeKeyFrag.Add(rangekey.Span{ + Start: startKey, + End: endKey, + Keys: []rangekey.Key{{Trailer: startTrailer, Suffix: suffix, Value: value}}, + }) return nil } func (msstw *multiSSTWriter) Finish(ctx context.Context) (int64, error) { - if msstw.currSpan < len(msstw.keySpans) { + if msstw.currSpan < (len(msstw.localKeySpans) + len(msstw.mvccSSTSpans)) { for { - if err := msstw.finalizeSST(ctx); err != nil { + if err := msstw.finalizeSST(ctx, nil /* nextKey */); err != nil { return 0, err } - if msstw.currSpan >= len(msstw.keySpans) { + if msstw.currSpan >= (len(msstw.localKeySpans) + len(msstw.mvccSSTSpans)) { break } if err := msstw.initSST(ctx); err != nil { @@ -506,8 +693,11 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( // TODO(aaditya): Remove once we support flushableIngests for shared and // external files in the engine. - skipRangeDelForLastSpan := doExcise && (header.SharedReplicate || header.ExternalReplicate) - msstw, err := newMultiSSTWriter(ctx, kvSS.st, kvSS.scratch, keyRanges, kvSS.sstChunkSize, skipRangeDelForLastSpan) + skipClearForMVCCSpan := doExcise && (header.SharedReplicate || header.ExternalReplicate) + // The last key range is the user key span. + localRanges := keyRanges[:len(keyRanges)-1] + mvccRange := keyRanges[len(keyRanges)-1] + msstw, err := newMultiSSTWriter(ctx, kvSS.st, kvSS.scratch, localRanges, mvccRange, kvSS.sstChunkSize, skipClearForMVCCSpan) if err != nil { return noSnap, err } @@ -533,7 +723,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( doExcise = false sharedSSTs = nil externalSSTs = nil - if err := msstw.addRangeDelForLastSpan(); err != nil { + if err := msstw.addClearForMVCCSpan(); err != nil { return noSnap, errors.Wrap(err, "adding tombstone for last span") } } @@ -706,7 +896,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( sharedSSTs: sharedSSTs, externalSSTs: externalSSTs, doExcise: doExcise, - includesRangeDelForLastSpan: !skipRangeDelForLastSpan, + includesRangeDelForLastSpan: !skipClearForMVCCSpan, clearedSpans: keyRanges, } diff --git a/pkg/storage/engine_key.go b/pkg/storage/engine_key.go index 91bb63747cd8..191805666106 100644 --- a/pkg/storage/engine_key.go +++ b/pkg/storage/engine_key.go @@ -419,3 +419,8 @@ func decodeMVCCMetaAndVerify(key roachpb.Key, value []byte) error { } return decodeMVCCValueAndVerify(key, meta.RawBytes) } + +// EngineKeyRange is a key range composed of EngineKeys. +type EngineKeyRange struct { + Start, End EngineKey +} diff --git a/pkg/storage/sst_writer.go b/pkg/storage/sst_writer.go index 4c4179e15bdc..66ee08e83fac 100644 --- a/pkg/storage/sst_writer.go +++ b/pkg/storage/sst_writer.go @@ -291,6 +291,17 @@ func (fw *SSTWriter) ClearEngineRangeKey(start, end roachpb.Key, suffix []byte) return fw.fw.RangeKeyUnset(EngineKey{Key: start}.Encode(), EngineKey{Key: end}.Encode(), suffix) } +// ClearEngineRange clears point keys in the specified EngineKey range. +func (fw *SSTWriter) ClearEngineRange(start, end EngineKey) error { + fw.scratch = start.EncodeToBuf(fw.scratch[:0]) + endRaw := end.Encode() + fw.DataSize += int64(len(start.Key)) + int64(len(end.Key)) + if err := fw.fw.DeleteRange(fw.scratch, endRaw); err != nil { + return err + } + return nil +} + // ClearRawEncodedRange implements the InternalWriter interface. func (fw *SSTWriter) ClearRawEncodedRange(start, end []byte) error { startEngine, ok := DecodeEngineKey(start)