Skip to content

Commit

Permalink
storage: add limits to skipped data iteration
Browse files Browse the repository at this point in the history
Previously when iterating engine using  MVCCIncrementalIterator caller
could skip large amounts of non-matching data which would result in
"unbounded" resource usage.
This is becoming a problem for resource constrained clusters where low
priority requests like export that are used by backups to interfere with
high priority workloads. If we want to throttle backups we need to be able
to limit how many underlying operations we want to perform per request.
This change adds an optional iteration limit to the iterator. Once the
limit is reached, iterator will return an error. Error will provide a
resume key to continue iteration in next request.

Release note: None
  • Loading branch information
aliher1911 committed Aug 13, 2021
1 parent 4ff11bc commit e18e0ff
Show file tree
Hide file tree
Showing 5 changed files with 340 additions and 63 deletions.
7 changes: 4 additions & 3 deletions pkg/storage/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,10 +555,11 @@ func loadTestData(dir string, numKeys, numBatches, batchTimeSpan, valueBytes int

var batch Batch
var minWallTime int64
batchSize := len(keys) / numBatches
for i, key := range keys {
if scaled := len(keys) / numBatches; (i % scaled) == 0 {
if (i % batchSize) == 0 {
if i > 0 {
log.Infof(ctx, "committing (%d/~%d)", i/scaled, numBatches)
log.Infof(ctx, "committing (%d/~%d)", i/batchSize, numBatches)
if err := batch.Commit(false /* sync */); err != nil {
return nil, err
}
Expand All @@ -568,7 +569,7 @@ func loadTestData(dir string, numKeys, numBatches, batchTimeSpan, valueBytes int
}
}
batch = eng.NewBatch()
minWallTime = sstTimestamps[i/scaled]
minWallTime = sstTimestamps[i/batchSize]
}
timestamp := hlc.Timestamp{WallTime: minWallTime + rand.Int63n(int64(batchTimeSpan))}
value := roachpb.MakeValueFromBytes(randutil.RandBytes(rng, valueBytes))
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,12 @@ type ExportOptions struct {
// would stop immediately when targetSize is reached and return a next versions
// timestamp in resumeTs so that subsequent operation can pass it to firstKeyTs.
StopMidKey bool
// MaxAllowerdIterations limits number of iterations that storage would perform
// while colllecting matching values to export. This limit could only be used
// together with StopMidKey as it could break iteration over data on any version
// and even on a key and timestamp that would not be included in the resulting
// export.
MaxAllowerdIterations int64
// If UseTBI is true, the backing MVCCIncrementalIterator will initialize a
// time-bound iterator along with its regular iterator. The TBI will be used
// as an optimization to skip over swaths of uninteresting keys i.e. keys
Expand Down
59 changes: 49 additions & 10 deletions pkg/storage/mvcc_incremental_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
package storage

import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand All @@ -29,6 +31,8 @@ import (
// 1. An inline value (non-user data)
// 2. An intent whose timestamp lies within the time bounds
// (if not using enableWriteIntentAggregation)
// 3. Iterator hits a limit on number of Next calls on underlying
// iterator (if limit was requested).
//
// Note: The endTime is inclusive to be consistent with the non-incremental
// iterator, where reads at a given timestamp return writes at that
Expand Down Expand Up @@ -93,6 +97,10 @@ type MVCCIncrementalIterator struct {
enableWriteIntentAggregation bool
// Optional collection of intents created on demand when first intent encountered.
intents []roachpb.Intent

// Underlying iteration limiting.
maxAllowedIterations int64
iteratorAdvanceCount int64
}

var _ SimpleMVCCIterator = &MVCCIncrementalIterator{}
Expand All @@ -113,6 +121,13 @@ type MVCCIncrementalIterOptions struct {
// would be free to decide if it wants to keep collecting entries and intents or
// skip entries.
EnableWriteIntentAggregation bool
// If non zero iterator would only advance or seek up to a certain number of times
// before bailing out with ResourceLimitError. When using the feature it is
// important to use resume key from the error to ensure forward progress could
// be made as advancing up to the first key could already exhaust the limit.
// Resume key is not necessarily a valid iteration key as we could stop in between
// eligible keys.
MaxAllowedIterations int64
}

// NewMVCCIncrementalIterator creates an MVCCIncrementalIterator with the
Expand Down Expand Up @@ -153,6 +168,7 @@ func NewMVCCIncrementalIterator(
endTime: opts.EndTime,
timeBoundIter: timeBoundIter,
enableWriteIntentAggregation: opts.EnableWriteIntentAggregation,
maxAllowedIterations: opts.MaxAllowedIterations,
}
}

Expand All @@ -176,9 +192,8 @@ func (i *MVCCIncrementalIterator) SeekGE(startKey MVCCKey) {
}
}
i.iter.SeekGE(startKey)
if ok, err := i.iter.Valid(); !ok {
i.err = err
i.valid = false
i.iteratorAdvanceCount++
if !i.checkValidAndSaveErr() {
return
}
i.err = nil
Expand All @@ -199,6 +214,7 @@ func (i *MVCCIncrementalIterator) Close() {
// key.
func (i *MVCCIncrementalIterator) Next() {
i.iter.Next()
i.iteratorAdvanceCount++
if !i.checkValidAndSaveErr() {
return
}
Expand All @@ -222,6 +238,7 @@ func (i *MVCCIncrementalIterator) checkValidAndSaveErr() bool {
// key if the iterator is currently located at the last version for a key.
func (i *MVCCIncrementalIterator) NextKey() {
i.iter.NextKey()
i.iteratorAdvanceCount++
if !i.checkValidAndSaveErr() {
return
}
Expand Down Expand Up @@ -287,6 +304,7 @@ func (i *MVCCIncrementalIterator) maybeSkipKeys() {
// expensive than a Next call.
seekKey := MakeMVCCMetadataKey(tbiKey)
i.iter.SeekGE(seekKey)
i.iteratorAdvanceCount++
if !i.checkValidAndSaveErr() {
return
}
Expand Down Expand Up @@ -367,11 +385,20 @@ func (i *MVCCIncrementalIterator) advance() {
return
}

// If we reached requested iteration limit here, stop iteration with an error and
// provide a resume key.
if i.maxAllowedIterations > 0 && i.iteratorAdvanceCount > i.maxAllowedIterations {
i.err = &ResourceLimitError{Limit: i.maxAllowedIterations, ResumeKey: i.Key()}
i.valid = false
return
}

// We have encountered an intent but it does not lie in the timestamp span
// (startTime, endTime] so we do not throw an error, and attempt to move to
// the next valid KV.
if i.meta.Txn != nil {
i.iter.Next()
i.iteratorAdvanceCount++
if !i.checkValidAndSaveErr() {
return
}
Expand All @@ -391,10 +418,8 @@ func (i *MVCCIncrementalIterator) advance() {
// done.
break
}

if ok, err := i.iter.Valid(); !ok {
i.err = err
i.valid = false
i.iteratorAdvanceCount++
if !i.checkValidAndSaveErr() {
return
}
}
Expand Down Expand Up @@ -439,9 +464,8 @@ func (i *MVCCIncrementalIterator) UnsafeValue() []byte {
func (i *MVCCIncrementalIterator) NextIgnoringTime() {
for {
i.iter.Next()
if ok, err := i.iter.Valid(); !ok {
i.err = err
i.valid = false
i.iteratorAdvanceCount++
if !i.checkValidAndSaveErr() {
return
}

Expand Down Expand Up @@ -479,3 +503,18 @@ func (i *MVCCIncrementalIterator) TryGetIntentError() error {
Intents: i.intents,
}
}

// ResourceLimitError is returned by MVCCIncrementalIterator when iterator reaches
// maximum number of iterations in the underlying storage iterator. Error will
// provide used Limit as well as ResumeKey that could be used to resume iteration
// on the subsequent attempt.
type ResourceLimitError struct {
Limit int64
ResumeKey MVCCKey
}

var _ error = &ResourceLimitError{}

func (e *ResourceLimitError) Error() string {
return fmt.Sprintf("iteration count exhausted limit %d", e.Limit)
}
Loading

0 comments on commit e18e0ff

Please sign in to comment.