Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: add limits to skipped data iteration #68467

Merged
merged 3 commits into from
Oct 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ go_library(
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/protoutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
Expand Down
31 changes: 29 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/types"
Expand Down Expand Up @@ -58,6 +59,20 @@ var ExportRequestMaxAllowedFileSizeOverage = settings.RegisterByteSizeSetting(
64<<20, /* 64 MiB */
).WithPublic()

// exportRequestMaxIterationTime controls time spent by export request iterating
// over data in underlying storage. This threshold preventing export request from
// holding locks for too long and preventing non mvcc operations from progressing.
// If request takes longer than this threshold it would stop and return already
// collected data and allow caller to use resume span to continue.
var exportRequestMaxIterationTime = settings.RegisterDurationSetting(
"kv.bulk_sst.max_request_time",
"if set, limits amount of time spent in export requests; "+
"if export request can not finish within allocated time it will resume from the point it stopped in "+
"subsequent request",
// Feature is disabled by default.
0,
)

func init() {
RegisterReadOnlyCommand(roachpb.Export, declareKeysExport, evalExport)
}
Expand Down Expand Up @@ -134,6 +149,8 @@ func evalExport(
maxSize = targetSize + uint64(allowedOverage)
}

maxRunTime := exportRequestMaxIterationTime.Get(&cArgs.EvalCtx.ClusterSettings().SV)

// Time-bound iterators only make sense to use if the start time is set.
useTBI := args.EnableTimeBoundIteratorOptimization && !args.StartTime.IsEmpty()
// Only use resume timestamp if splitting mid key is enabled.
Expand All @@ -145,8 +162,18 @@ func evalExport(
var curSizeOfExportedSSTs int64
for start := args.Key; start != nil; {
destFile := &storage.MemFile{}
summary, resume, resumeTS, err := reader.ExportMVCCToSst(ctx, start, args.EndKey, args.StartTime,
h.Timestamp, resumeKeyTS, exportAllRevisions, targetSize, maxSize, args.SplitMidKey, useTBI, destFile)
summary, resume, resumeTS, err := reader.ExportMVCCToSst(ctx, storage.ExportOptions{
StartKey: storage.MVCCKey{Key: start, Timestamp: resumeKeyTS},
EndKey: args.EndKey,
StartTS: args.StartTime,
EndTS: h.Timestamp,
ExportAllRevisions: exportAllRevisions,
TargetSize: targetSize,
MaxSize: maxSize,
StopMidKey: args.SplitMidKey,
UseTBI: useTBI,
ResourceLimiter: storage.NewResourceLimiter(storage.ResourceLimiterOptions{MaxRunTime: maxRunTime}, timeutil.DefaultTimeSource{}),
}, destFile)
if err != nil {
if errors.HasType(err, (*storage.ExceedMaxSizeError)(nil)) {
err = errors.WithHintf(err,
Expand Down
26 changes: 22 additions & 4 deletions pkg/kv/kvserver/batcheval/cmd_export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,8 +594,17 @@ func assertEqualKVs(
prevStart := start
prevTs := resumeTs
sstFile := &storage.MemFile{}
summary, start, resumeTs, err = e.ExportMVCCToSst(ctx, start, endKey, startTime, endTime, resumeTs,
bool(exportAllRevisions), targetSize, maxSize, bool(stopMidKey), bool(enableTimeBoundIteratorOptimization), sstFile)
summary, start, resumeTs, err = e.ExportMVCCToSst(ctx, storage.ExportOptions{
StartKey: storage.MVCCKey{Key: start, Timestamp: resumeTs},
EndKey: endKey,
StartTS: startTime,
EndTS: endTime,
ExportAllRevisions: bool(exportAllRevisions),
TargetSize: targetSize,
MaxSize: maxSize,
StopMidKey: bool(stopMidKey),
UseTBI: bool(enableTimeBoundIteratorOptimization),
}, sstFile)
require.NoError(t, err)
sst = sstFile.Data()
loaded := loadSST(t, sst, startKey, endKey)
Expand Down Expand Up @@ -634,8 +643,17 @@ func assertEqualKVs(
if dataSizeWhenExceeded == maxSize {
maxSize--
}
_, _, _, err = e.ExportMVCCToSst(ctx, prevStart, endKey, startTime, endTime, prevTs,
bool(exportAllRevisions), targetSize, maxSize, false, bool(enableTimeBoundIteratorOptimization), &storage.MemFile{})
_, _, _, err = e.ExportMVCCToSst(ctx, storage.ExportOptions{
StartKey: storage.MVCCKey{Key: prevStart, Timestamp: prevTs},
EndKey: endKey,
StartTS: startTime,
EndTS: endTime,
ExportAllRevisions: bool(exportAllRevisions),
TargetSize: targetSize,
MaxSize: maxSize,
StopMidKey: false,
UseTBI: bool(enableTimeBoundIteratorOptimization),
}, &storage.MemFile{})
require.Regexp(t, fmt.Sprintf("export size \\(%d bytes\\) exceeds max size \\(%d bytes\\)",
dataSizeWhenExceeded, maxSize), err)
}
Expand Down
11 changes: 2 additions & 9 deletions pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,16 +418,9 @@ func (s spanSetReader) Closed() bool {

// ExportMVCCToSst is part of the storage.Reader interface.
func (s spanSetReader) ExportMVCCToSst(
ctx context.Context,
startKey, endKey roachpb.Key,
startTS, endTS, firstKeyTS hlc.Timestamp,
exportAllRevisions bool,
targetSize, maxSize uint64,
stopMidKey, useTBI bool,
dest io.Writer,
ctx context.Context, exportOptions storage.ExportOptions, dest io.Writer,
) (roachpb.BulkOpSummary, roachpb.Key, hlc.Timestamp, error) {
return s.r.ExportMVCCToSst(ctx, startKey, endKey, startTS, endTS, firstKeyTS, exportAllRevisions, targetSize,
maxSize, stopMidKey, useTBI, dest)
return s.r.ExportMVCCToSst(ctx, exportOptions, dest)
}

func (s spanSetReader) MVCCGet(key storage.MVCCKey) ([]byte, error) {
Expand Down
10 changes: 10 additions & 0 deletions pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
load("//build:STRINGER.bzl", "stringer")

go_library(
name = "storage",
Expand Down Expand Up @@ -28,6 +29,7 @@ go_library(
"pebble_iterator.go",
"pebble_merge.go",
"pebble_mvcc_scanner.go",
"resource_limiter.go",
"row_counter.go",
"slice.go",
"slice_go1.9.go",
Expand All @@ -37,6 +39,7 @@ go_library(
"temp_dir.go",
"temp_engine.go",
"testing_knobs.go",
":gen-resourcelimitreached-stringer", # keep
],
importpath = "github.com/cockroachdb/cockroach/pkg/storage",
visibility = ["//visibility:public"],
Expand Down Expand Up @@ -109,6 +112,7 @@ go_test(
"pebble_file_registry_test.go",
"pebble_mvcc_scanner_test.go",
"pebble_test.go",
"resource_limiter_test.go",
"sst_info_test.go",
"sst_iterator_test.go",
"sst_writer_test.go",
Expand Down Expand Up @@ -160,3 +164,9 @@ go_test(
"@org_golang_x_sync//errgroup",
],
)

stringer(
name = "gen-resourcelimitreached-stringer",
src = "resource_limiter.go",
typ = "ResourceLimitReached",
)
20 changes: 15 additions & 5 deletions pkg/storage/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,10 +653,11 @@ func loadTestData(dir string, numKeys, numBatches, batchTimeSpan, valueBytes int

var batch Batch
var minWallTime int64
batchSize := len(keys) / numBatches
for i, key := range keys {
if scaled := len(keys) / numBatches; (i % scaled) == 0 {
if (i % batchSize) == 0 {
if i > 0 {
log.Infof(ctx, "committing (%d/~%d)", i/scaled, numBatches)
log.Infof(ctx, "committing (%d/~%d)", i/batchSize, numBatches)
if err := batch.Commit(false /* sync */); err != nil {
return nil, err
}
Expand All @@ -666,7 +667,7 @@ func loadTestData(dir string, numKeys, numBatches, batchTimeSpan, valueBytes int
}
}
batch = eng.NewBatch()
minWallTime = sstTimestamps[i/scaled]
minWallTime = sstTimestamps[i/batchSize]
}
timestamp := hlc.Timestamp{WallTime: minWallTime + rand.Int63n(int64(batchTimeSpan))}
value := roachpb.MakeValueFromBytes(randutil.RandBytes(rng, valueBytes))
Expand Down Expand Up @@ -1521,8 +1522,17 @@ func runExportToSst(
for i := 0; i < b.N; i++ {
startTS := hlc.Timestamp{WallTime: int64(numRevisions / 2)}
endTS := hlc.Timestamp{WallTime: int64(numRevisions + 2)}
_, _, _, err := engine.ExportMVCCToSst(context.Background(), keys.LocalMax, roachpb.KeyMax, startTS, endTS, hlc.Timestamp{},
exportAllRevisions, 0 /* targetSize */, 0 /* maxSize */, false, useTBI, noopWriter{})
_, _, _, err := engine.ExportMVCCToSst(context.Background(), ExportOptions{
StartKey: MVCCKey{Key: keys.LocalMax},
EndKey: roachpb.KeyMax,
StartTS: startTS,
EndTS: endTS,
ExportAllRevisions: exportAllRevisions,
TargetSize: 0,
MaxSize: 0,
StopMidKey: false,
UseTBI: useTBI,
}, noopWriter{})
if err != nil {
b.Fatal(err)
}
Expand Down
82 changes: 48 additions & 34 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,47 @@ const (
MVCCKeyIterKind
)

// ExportOptions contains options provided to export operation.
type ExportOptions struct {
// StartKey determines start of the exported interval (inclusive).
// StartKey.Timestamp is either empty which represent starting from a potential
// intent and continuing to versions or non-empty, which represents starting
// from a particular version.
StartKey MVCCKey
// EndKey determines the end of exported interval (exclusive).
EndKey roachpb.Key
// StartTS and EndTS determine exported time range as (startTS, endTS].
StartTS, EndTS hlc.Timestamp
// If ExportAllRevisions is true export every revision of a key for the interval,
// otherwise only the latest value within the interval is exported.
ExportAllRevisions bool
// If TargetSize is positive, it indicates that the export should produce SSTs
// which are roughly target size. Specifically, it will return an SST such that
// the last key is responsible for meeting or exceeding the targetSize. If the
// resumeKey is non-nil then the data size of the returned sst will be greater
// than or equal to the targetSize.
TargetSize uint64
// If MaxSize is positive, it is an absolute maximum on byte size for the
// returned sst. If it is the case that the versions of the last key will lead
// to an SST that exceeds maxSize, an error will be returned. This parameter
// exists to prevent creating SSTs which are too large to be used.
MaxSize uint64
// If StopMidKey is false, once function reaches targetSize it would continue
// adding all versions until it reaches next key or end of range. If true, it
// would stop immediately when targetSize is reached and return the next versions
// timestamp in resumeTs so that subsequent operation can pass it to firstKeyTs.
StopMidKey bool
// ResourceLimiter limits how long iterator could run until it exhausts allocated
// resources. Expot queries limiter in its iteration loop to break out once
// resources are exhausted.
ResourceLimiter ResourceLimiter
// If UseTBI is true, the backing MVCCIncrementalIterator will initialize a
// time-bound iterator along with its regular iterator. The TBI will be used
// as an optimization to skip over swaths of uninteresting keys i.e. keys
// outside our time bounds, while locating the KVs to export.
UseTBI bool
}

// Reader is the read interface to an engine's data. Certain implementations
// of Reader guarantee consistency of the underlying engine state across the
// different iterators created by NewMVCCIterator, NewEngineIterator:
Expand All @@ -382,39 +423,14 @@ type Reader interface {
// that they are not using a closed engine. Intended for use within package
// engine; exported to enable wrappers to exist in other packages.
Closed() bool
// ExportMVCCToSst exports changes to the keyrange [startKey, endKey) over the
// interval (startTS, endTS]. Passing exportAllRevisions exports
// every revision of a key for the interval, otherwise only the latest value
// within the interval is exported. Deletions are included if all revisions are
// requested or if the start.Timestamp is non-zero.
//
// firstKeyTS is either empty which represent starting from potential intent
// and continuing to versions or non-empty, which represents starting from
// particular version. firstKeyTS will always be empty when !stopMidKey
//
// If targetSize is positive, it indicates that the export should produce SSTs
// which are roughly target size. Specifically, it will return an SST such that
// the last key is responsible for meeting or exceeding the targetSize. If the
// resumeKey is non-nil then the data size of the returned sst will be greater
// than or equal to the targetSize.
//
// If maxSize is positive, it is an absolute maximum on byte size for the
// returned sst. If it is the case that the versions of the last key will lead
// to an SST that exceeds maxSize, an error will be returned. This parameter
// exists to prevent creating SSTs which are too large to be used.
//
// If stopMidKey is false, once function reaches targetSize it would continue
// adding all versions until it reaches next key or end of range. If true, it
// would stop immediately when targetSize is reached and return a next versions
// timestamp in resumeTs so that subsequent operation can pass it to firstKeyTs.
//
// If useTBI is true, the backing MVCCIncrementalIterator will initialize a
// time-bound iterator along with its regular iterator. The TBI will be used
// as an optimization to skip over swaths of uninteresting keys i.e. keys
// outside our time bounds, while locating the KVs to export.
//
// ExportMVCCToSst exports changes to the keyrange [StartKey, EndKey) over the
// interval (StartTS, EndTS].
// Deletions are included if all revisions are requested or if the StartTS
// is non-zero.
// This function looks at MVCC versions and intents, and returns an error if an
// intent is found.
// exportOptions determine ranges as well as additional export options. See
// struct definition for details.
//
// Data is written to dest as it is collected. If error is returned content of
// dest is undefined.
Expand All @@ -423,9 +439,7 @@ type Reader interface {
// that allow resuming export if it was cut short because it reached limits or
// an error if export failed for some reason.
ExportMVCCToSst(
ctx context.Context, startKey, endKey roachpb.Key, startTS, endTS, firstKeyTS hlc.Timestamp,
exportAllRevisions bool, targetSize uint64, maxSize uint64, stopMidKey bool, useTBI bool,
dest io.Writer,
ctx context.Context, exportOptions ExportOptions, dest io.Writer,
) (_ roachpb.BulkOpSummary, resumeKey roachpb.Key, resumeTS hlc.Timestamp, _ error)
// MVCCGet returns the value for the given key, nil otherwise. Semantically, it
// behaves as if an iterator with MVCCKeyAndIntentsIterKind was used.
Expand Down
13 changes: 3 additions & 10 deletions pkg/storage/mvcc_incremental_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,7 @@ func (i *MVCCIncrementalIterator) SeekGE(startKey MVCCKey) {
}
}
i.iter.SeekGE(startKey)
if ok, err := i.iter.Valid(); !ok {
i.err = err
i.valid = false
if !i.checkValidAndSaveErr() {
return
}
i.err = nil
Expand Down Expand Up @@ -463,10 +461,7 @@ func (i *MVCCIncrementalIterator) advance() {
// done.
break
}

if ok, err := i.iter.Valid(); !ok {
i.err = err
i.valid = false
if !i.checkValidAndSaveErr() {
return
}
}
Expand Down Expand Up @@ -511,9 +506,7 @@ func (i *MVCCIncrementalIterator) UnsafeValue() []byte {
func (i *MVCCIncrementalIterator) NextIgnoringTime() {
for {
i.iter.Next()
if ok, err := i.iter.Valid(); !ok {
i.err = err
i.valid = false
if !i.checkValidAndSaveErr() {
return
}

Expand Down
Loading