Skip to content

Commit

Permalink
storage: Add an option to break export mid key
Browse files Browse the repository at this point in the history
Previously when exporting ranges with very large number of versions
export could hit the situation where single key export would exceed
maximum export byte limit. To resolve this, safety threshold has to
be raised which could cause nodes going out of memory.
To address this, this patch adds an ability to stop and resume on
the arbitrary key timestamp so that it could be stopped mid key and
resumed later.

Release note: None
  • Loading branch information
aliher1911 committed Aug 6, 2021
1 parent 3418f92 commit 2082744
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 88 deletions.
5 changes: 3 additions & 2 deletions pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -180,8 +181,8 @@ func evalExport(
var curSizeOfExportedSSTs int64
for start := args.Key; start != nil; {
destFile := &storage.MemFile{}
summary, resume, err := reader.ExportMVCCToSst(ctx, start, args.EndKey, args.StartTime,
h.Timestamp, exportAllRevisions, targetSize, maxSize, useTBI, destFile)
summary, resume, _, err := reader.ExportMVCCToSst(ctx, start, args.EndKey, args.StartTime,
h.Timestamp, hlc.Timestamp{}, exportAllRevisions, targetSize, maxSize, false, useTBI, destFile)
if err != nil {
if errors.HasType(err, (*storage.ExceedMaxSizeError)(nil)) {
err = errors.WithHintf(err,
Expand Down
102 changes: 65 additions & 37 deletions pkg/ccl/storageccl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func exportUsingGoIterator(
filter roachpb.MVCCFilter,
startTime, endTime hlc.Timestamp,
startKey, endKey roachpb.Key,
enableTimeBoundIteratorOptimization bool,
enableTimeBoundIteratorOptimization timeBoundOptimisation,
reader storage.Reader,
) ([]byte, error) {
memFile := &storage.MemFile{}
Expand All @@ -466,7 +466,7 @@ func exportUsingGoIterator(

iter := storage.NewMVCCIncrementalIterator(reader, storage.MVCCIncrementalIterOptions{
EndKey: endKey,
EnableTimeBoundIteratorOptimization: enableTimeBoundIteratorOptimization,
EnableTimeBoundIteratorOptimization: bool(enableTimeBoundIteratorOptimization),
StartTime: startTime,
EndTime: endTime,
})
Expand Down Expand Up @@ -539,13 +539,29 @@ func loadSST(t *testing.T, data []byte, start, end roachpb.Key) []storage.MVCCKe
return kvs
}

type exportRevisions bool
type batchBoundaries bool
type timeBoundOptimisation bool

const (
exportAll exportRevisions = true
exportLatest exportRevisions = false

stopAtTimestamps batchBoundaries = true
stopAtKeys batchBoundaries = false

optimizeTimeBounds timeBoundOptimisation = true
dontOptimizeTimeBounds timeBoundOptimisation = false
)

func assertEqualKVs(
ctx context.Context,
e storage.Engine,
startKey, endKey roachpb.Key,
startTime, endTime hlc.Timestamp,
exportAllRevisions bool,
enableTimeBoundIteratorOptimization bool,
exportAllRevisions exportRevisions,
stopMidKey batchBoundaries,
enableTimeBoundIteratorOptimization timeBoundOptimisation,
targetSize uint64,
) func(*testing.T) {
return func(t *testing.T) {
Expand All @@ -568,14 +584,16 @@ func assertEqualKVs(

// Run the actual code path used when exporting MVCCs to SSTs.
var kvs []storage.MVCCKeyValue
var resumeTs hlc.Timestamp
for start := startKey; start != nil; {
var sst []byte
var summary roachpb.BulkOpSummary
maxSize := uint64(0)
prevStart := start
prevTs := resumeTs
sstFile := &storage.MemFile{}
summary, start, err = e.ExportMVCCToSst(ctx, start, endKey, startTime, endTime,
exportAllRevisions, targetSize, maxSize, enableTimeBoundIteratorOptimization, sstFile)
summary, start, resumeTs, err = e.ExportMVCCToSst(ctx, start, endKey, startTime, endTime, resumeTs,
bool(exportAllRevisions), targetSize, maxSize, bool(stopMidKey), bool(enableTimeBoundIteratorOptimization), sstFile)
require.NoError(t, err)
sst = sstFile.Data()
loaded := loadSST(t, sst, startKey, endKey)
Expand Down Expand Up @@ -614,8 +632,8 @@ func assertEqualKVs(
if dataSizeWhenExceeded == maxSize {
maxSize--
}
_, _, err = e.ExportMVCCToSst(ctx, prevStart, endKey, startTime, endTime,
exportAllRevisions, targetSize, maxSize, enableTimeBoundIteratorOptimization, &storage.MemFile{})
_, _, _, err = e.ExportMVCCToSst(ctx, prevStart, endKey, startTime, endTime, prevTs,
bool(exportAllRevisions), targetSize, maxSize, false, bool(enableTimeBoundIteratorOptimization), &storage.MemFile{})
require.Regexp(t, fmt.Sprintf("export size \\(%d bytes\\) exceeds max size \\(%d bytes\\)",
dataSizeWhenExceeded, maxSize), err)
}
Expand All @@ -639,6 +657,7 @@ func assertEqualKVs(
}
}
}

func TestRandomKeyAndTimestampExport(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand All @@ -658,12 +677,13 @@ func TestRandomKeyAndTimestampExport(t *testing.T) {
cleanupDir()
}
}
const keySize = 100
const bytesPerValue = 300
getNumKeys := func(t *testing.T, rnd *rand.Rand, targetSize uint64) (numKeys int) {
const (
targetPages = 10
bytesPerValue = 300
minNumKeys = 2 // need > 1 keys for random key test
maxNumKeys = 5000
targetPages = 10
minNumKeys = 2 // need > 1 keys for random key test
maxNumKeys = 5000
)
numKeys = maxNumKeys
if targetSize > 0 {
Expand Down Expand Up @@ -695,11 +715,13 @@ func TestRandomKeyAndTimestampExport(t *testing.T) {
timestamps = append(timestamps, ts)

// Make keys unique and ensure they are monotonically increasing.
key := roachpb.Key(randutil.RandBytes(rnd, 100))
key := roachpb.Key(randutil.RandBytes(rnd, keySize))
key = append([]byte(fmt.Sprintf("#%d", i)), key...)
keys = append(keys, key)

value := roachpb.MakeValueFromBytes(randutil.RandBytes(rnd, 200))
averageValueSize := bytesPerValue - keySize
valueSize := randutil.RandIntInRange(rnd, averageValueSize-100, averageValueSize+100)
value := roachpb.MakeValueFromBytes(randutil.RandBytes(rnd, valueSize))
value.InitChecksum(key)
if err := storage.MVCCPut(ctx, batch, nil, key, ts, value, nil); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -741,28 +763,35 @@ func TestRandomKeyAndTimestampExport(t *testing.T) {
tsMax = hlc.Timestamp{WallTime: math.MaxInt64, Logical: 0}
)

t.Run("ts (0-∞], latest, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, false, false, targetSize))
t.Run("ts (0-∞], all, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, true, false, targetSize))
t.Run("ts (0-∞], latest, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, false, true, targetSize))
t.Run("ts (0-∞], all, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, true, true, targetSize))

upperBound := randutil.RandIntInRange(rnd, 1, numKeys)
lowerBound := rnd.Intn(upperBound)

// Exercise random key ranges.
t.Run("kv [randLower, randUpper), latest, nontimebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, false, false, targetSize))
t.Run("kv [randLower, randUpper), all, nontimebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, true, false, targetSize))
t.Run("kv [randLower, randUpper), latest, timebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, false, true, targetSize))
t.Run("kv [randLower, randUpper), all, timebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, true, true, targetSize))

upperBound = randutil.RandIntInRange(rnd, 1, numKeys)
lowerBound = rnd.Intn(upperBound)

// Exercise random timestamps.
t.Run("kv (randLowerTime, randUpperTime], latest, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], false, false, targetSize))
t.Run("kv (randLowerTime, randUpperTime], all, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], true, false, targetSize))
t.Run("kv (randLowerTime, randUpperTime], latest, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], false, true, targetSize))
t.Run("kv (randLowerTime, randUpperTime], all, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], true, true, targetSize))
keyUpperBound := randutil.RandIntInRange(rnd, 1, numKeys)
keyLowerBound := rnd.Intn(keyUpperBound)
tsUpperBound := randutil.RandIntInRange(rnd, 1, numKeys)
tsLowerBound := rnd.Intn(tsUpperBound)

for _, s := range []struct {
name string
keyMin roachpb.Key
keyMax roachpb.Key
tsMin hlc.Timestamp
tsMax hlc.Timestamp
}{
{"ts (0-∞]", keyMin, keyMax, tsMin, tsMax},
{"kv [randLower, randUpper)", keys[keyLowerBound], keys[keyUpperBound], tsMin, tsMax},
{"kv (randLowerTime, randUpperTime]", keyMin, keyMax, timestamps[tsLowerBound], timestamps[tsUpperBound]},
} {
t.Run(fmt.Sprintf("%s, latest, nontimebound", s.name),
assertEqualKVs(ctx, e, s.keyMin, s.keyMax, s.tsMin, s.tsMax, exportLatest, stopAtKeys, dontOptimizeTimeBounds, targetSize))
t.Run(fmt.Sprintf("%s, all, nontimebound", s.name),
assertEqualKVs(ctx, e, s.keyMin, s.keyMax, s.tsMin, s.tsMax, exportAll, stopAtKeys, dontOptimizeTimeBounds, targetSize))
t.Run(fmt.Sprintf("%s, all, split rows, nontimebound", s.name),
assertEqualKVs(ctx, e, s.keyMin, s.keyMax, s.tsMin, s.tsMax, exportAll, stopAtTimestamps, dontOptimizeTimeBounds, targetSize))
t.Run(fmt.Sprintf("%s, latest, timebound", s.name),
assertEqualKVs(ctx, e, s.keyMin, s.keyMax, s.tsMin, s.tsMax, exportLatest, stopAtKeys, optimizeTimeBounds, targetSize))
t.Run(fmt.Sprintf("%s, all, timebound", s.name),
assertEqualKVs(ctx, e, s.keyMin, s.keyMax, s.tsMin, s.tsMax, exportAll, stopAtKeys, optimizeTimeBounds, targetSize))
t.Run(fmt.Sprintf("%s, all, split rows, timebound", s.name),
assertEqualKVs(ctx, e, s.keyMin, s.keyMax, s.tsMin, s.tsMax, exportAll, stopAtTimestamps, optimizeTimeBounds, targetSize))
}
}
// Exercise min to max time and key ranges.
for _, targetSize := range []uint64{
Expand All @@ -772,5 +801,4 @@ func TestRandomKeyAndTimestampExport(t *testing.T) {
testWithTargetSize(t, targetSize)
})
}

}
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,14 +401,14 @@ func (s spanSetReader) Closed() bool {
func (s spanSetReader) ExportMVCCToSst(
ctx context.Context,
startKey, endKey roachpb.Key,
startTS, endTS hlc.Timestamp,
startTS, endTS, firstKeyTS hlc.Timestamp,
exportAllRevisions bool,
targetSize, maxSize uint64,
useTBI bool,
stopMidKey, useTBI bool,
dest io.Writer,
) (roachpb.BulkOpSummary, roachpb.Key, error) {
return s.r.ExportMVCCToSst(ctx, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize,
maxSize, useTBI, dest)
) (roachpb.BulkOpSummary, roachpb.Key, hlc.Timestamp, error) {
return s.r.ExportMVCCToSst(ctx, startKey, endKey, startTS, endTS, firstKeyTS, exportAllRevisions, targetSize,
maxSize, stopMidKey, useTBI, dest)
}

func (s spanSetReader) MVCCGet(key storage.MVCCKey) ([]byte, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1428,8 +1428,8 @@ func runExportToSst(
for i := 0; i < b.N; i++ {
startTS := hlc.Timestamp{WallTime: int64(numRevisions / 2)}
endTS := hlc.Timestamp{WallTime: int64(numRevisions + 2)}
_, _, err := engine.ExportMVCCToSst(context.Background(), keys.LocalMax, roachpb.KeyMax, startTS, endTS,
exportAllRevisions, 0 /* targetSize */, 0 /* maxSize */, useTBI, noopWriter{})
_, _, _, err := engine.ExportMVCCToSst(context.Background(), keys.LocalMax, roachpb.KeyMax, startTS, endTS, hlc.Timestamp{},
exportAllRevisions, 0 /* targetSize */, 0 /* maxSize */, false, useTBI, noopWriter{})
if err != nil {
b.Fatal(err)
}
Expand Down
27 changes: 21 additions & 6 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,11 @@ type Reader interface {
// interval (startTS, endTS]. Passing exportAllRevisions exports
// every revision of a key for the interval, otherwise only the latest value
// within the interval is exported. Deletions are included if all revisions are
// requested or if the start.Timestamp is non-zero. Returns the bytes of an
// SSTable containing the exported keys, the size of exported data, or an error.
// requested or if the start.Timestamp is non-zero.
//
// firstKeyTS is either empty which represent starting from potential intent
// and continuing to versions or non-empty, which represents starting from
// particular version. firstKeyTS will always be empty when !stopMidKey
//
// If targetSize is positive, it indicates that the export should produce SSTs
// which are roughly target size. Specifically, it will return an SST such that
Expand All @@ -400,19 +403,31 @@ type Reader interface {
// to an SST that exceeds maxSize, an error will be returned. This parameter
// exists to prevent creating SSTs which are too large to be used.
//
// If stopMidKey is false, once function reaches targetSize it would continue
// adding all versions until it reaches next key or end of range. If true, it
// would stop immediately when targetSize is reached and return a next versions
// timestamp in resumeTs so that subsequent operation can pass it to firstKeyTs.
//
// If useTBI is true, the backing MVCCIncrementalIterator will initialize a
// time-bound iterator along with its regular iterator. The TBI will be used
// as an optimization to skip over swaths of uninteresting keys i.e. keys
// outside our time bounds, while locating the KVs to export.
//
// This function looks at MVCC versions and intents, and returns an error if an
// intent is found.
//
// Data is written to dest as it is collected. If error is returned content of
// dest is undefined.
//
// Returns summary containing number of exported bytes, resumeKey and resumeTS
// that allow resuming export if it was cut short because it reached limits or
// an error if export failed for some reason.
ExportMVCCToSst(
ctx context.Context, startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp,
exportAllRevisions bool, targetSize uint64, maxSize uint64, useTBI bool,
ctx context.Context, startKey, endKey roachpb.Key, startTS, endTS, firstKeyTs hlc.Timestamp,
exportAllRevisions bool, targetSize uint64, maxSize uint64, stopMidKey bool, useTBI bool,
dest io.Writer,
) (_ roachpb.BulkOpSummary, resumeKey roachpb.Key, _ error)
// Get returns the value for the given key, nil otherwise. Semantically, it
) (_ roachpb.BulkOpSummary, resumeKey roachpb.Key, resumeTs hlc.Timestamp, _ error)
// MVCCGet returns the value for the given key, nil otherwise. Semantically, it
// behaves as if an iterator with MVCCKeyAndIntentsIterKind was used.
//
// Deprecated: use storage.MVCCGet instead.
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/mvcc_incremental_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
// encountered:
// 1. An inline value (non-user data)
// 2. An intent whose timestamp lies within the time bounds
// (if not using enableWriteIntentAggregation)
//
// Note: The endTime is inclusive to be consistent with the non-incremental
// iterator, where reads at a given timestamp return writes at that
Expand Down Expand Up @@ -156,8 +157,8 @@ func NewMVCCIncrementalIterator(
}

// SeekGE advances the iterator to the first key in the engine which is >= the
// provided key. startKey should be a metadata key to ensure that the iterator
// has a chance to observe any intents on the key if they are there.
// provided key. startKey is not restricted to metadata key and could point to
// any version within a history as required.
func (i *MVCCIncrementalIterator) SeekGE(startKey MVCCKey) {
if i.timeBoundIter != nil {
// Check which is the first key seen by the TBI.
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/mvcc_incremental_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ func assertExportedErrs(
) {
const big = 1 << 30
sstFile := &MemFile{}
_, _, err := e.ExportMVCCToSst(context.Background(), startKey, endKey, startTime, endTime,
revisions, big, big, useTBI, sstFile)
_, _, _, err := e.ExportMVCCToSst(context.Background(), startKey, endKey, startTime, endTime, hlc.Timestamp{},
revisions, big, big, false, useTBI, sstFile)
require.Error(t, err)

if intentErr := (*roachpb.WriteIntentError)(nil); errors.As(err, &intentErr) {
Expand Down Expand Up @@ -182,8 +182,8 @@ func assertExportedKVs(
) {
const big = 1 << 30
sstFile := &MemFile{}
_, _, err := e.ExportMVCCToSst(context.Background(), startKey, endKey, startTime, endTime,
revisions, big, big, useTBI, sstFile)
_, _, _, err := e.ExportMVCCToSst(context.Background(), startKey, endKey, startTime, endTime, hlc.Timestamp{},
revisions, big, big, false, useTBI, sstFile)
require.NoError(t, err)
data := sstFile.Data()
if data == nil {
Expand Down
Loading

0 comments on commit 2082744

Please sign in to comment.