Skip to content

Commit

Permalink
storage/engine: lift Go MVCC implementation above Iterator interface
Browse files Browse the repository at this point in the history
This commit removes the MVCCGet and MVCCScan methods from engine.Iterator and
uses the rest of the interface to implement these methods as free functions.
This restructuring allows the MVCC operations to support polymorphism of the
iterator, which is what the code was intending to do when originally written.

The code was moved to the current structure as a way of avoiding cgo calls when
using RocksDB's iterator implementation. This is an important optimization when
using RocksDB (but not Pebble) so the commit retains it through optional
specializations of MVCCGet and MVCCScan. RocksDB's iterator implements this
specialization but Pebble's does not need to.

This isn't quite ready for a review. I'm mainly pushing it in case others want
to take a look. It will be used to get the prototype of #41720 up and running.
Benchmarks show about a 0-1% performance regression due to this change. More
testing should be done if we actually want to productionize this.
  • Loading branch information
nvanbenschoten committed Nov 6, 2019
1 parent 79b4c77 commit 56f99c4
Show file tree
Hide file tree
Showing 7 changed files with 283 additions and 243 deletions.
29 changes: 15 additions & 14 deletions pkg/storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,23 @@ type Iterator interface {
// and the encoded SST data specified, within the provided key range. Returns
// stats on skipped KVs, or an error if a collision is found.
CheckForKeyCollisions(sstData []byte, start, end roachpb.Key) (enginepb.MVCCStats, error)
// SetUpperBound installs a new upper bound for this iterator.
SetUpperBound(roachpb.Key)
// Stats returns statistics about the iterator.
Stats() IteratorStats
}

// MVCCIterator is an interface that extends Iterator and provides concrete
// implementations for MVCCGet and MVCCScan operations. It is used by instances
// of the interface backed by RocksDB iterators to avoid cgo hops.
type MVCCIterator interface {
Iterator
// MVCCOpsSpecialized returns whether the iterator has a specialized
// implementation of MVCCGet and MVCCScan. This is exposed as a method
// so that wrapper types can defer to their wrapped iterators.
MVCCOpsSpecialized() bool
// MVCCGet is the internal implementation of the family of package-level
// MVCCGet functions.
//
// DO NOT CALL directly (except in wrapper Iterator implementations). Use the
// package-level MVCCGet, or one of its variants, instead.
MVCCGet(
key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions,
) (*roachpb.Value, *roachpb.Intent, error)
Expand All @@ -119,20 +131,9 @@ type Iterator interface {
// returned raw, as a series of buffers of length-prefixed slices,
// alternating from key to value, where numKVs specifies the number of pairs
// in the buffer.
//
// DO NOT CALL directly (except in wrapper Iterator implementations). Use the
// package-level MVCCScan, or one of its variants, instead. For correct
// operation, the caller must set the lower and upper bounds on the iterator
// before calling this method.
//
// TODO(peter): unexport this method.
MVCCScan(
start, end roachpb.Key, max int64, timestamp hlc.Timestamp, opts MVCCScanOptions,
) (kvData [][]byte, numKVs int64, resumeSpan *roachpb.Span, intents []roachpb.Intent, err error)
// SetUpperBound installs a new upper bound for this iterator.
SetUpperBound(roachpb.Key)

Stats() IteratorStats
}

// IterOptions contains options used to create an Iterator.
Expand Down
137 changes: 131 additions & 6 deletions pkg/storage/engine/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,14 +728,79 @@ type MVCCGetOptions struct {
func MVCCGet(
ctx context.Context, eng Reader, key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions,
) (*roachpb.Value, *roachpb.Intent, error) {
iter := eng.NewIterator(IterOptions{Prefix: true})
defer iter.Close()
return mvccGet(ctx, iter, key, timestamp, opts)
}

func mvccGet(
ctx context.Context, iter Iterator, key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions,
) (value *roachpb.Value, intent *roachpb.Intent, err error) {
if timestamp.WallTime < 0 {
return nil, nil, errors.Errorf("cannot write to %q at timestamp %s", key, timestamp)
}
if opts.Inconsistent && opts.Txn != nil {
return nil, nil, errors.Errorf("cannot allow inconsistent reads within a transaction")
}
if len(key) == 0 {
return nil, nil, emptyKeyError()
}

iter := eng.NewIterator(IterOptions{Prefix: true})
value, intent, err := iter.MVCCGet(key, timestamp, opts)
iter.Close()
return value, intent, err
// If the iterator has a specialized implementation, defer to that.
if mvccIter, ok := iter.(MVCCIterator); ok && mvccIter.MVCCOpsSpecialized() {
return mvccIter.MVCCGet(key, timestamp, opts)
}

mvccScanner := pebbleMVCCScannerPool.Get().(*pebbleMVCCScanner)
defer pebbleMVCCScannerPool.Put(mvccScanner)

// MVCCGet is implemented as an MVCCScan where we retrieve a single key. We
// specify an empty key for the end key which will ensure we don't retrieve a
// key different than the start key. This is a bit of a hack.
*mvccScanner = pebbleMVCCScanner{
parent: iter,
start: key,
ts: timestamp,
maxKeys: 1,
inconsistent: opts.Inconsistent,
tombstones: opts.Tombstones,
ignoreSeq: opts.IgnoreSequence,
}

mvccScanner.init(opts.Txn)
mvccScanner.get()

if mvccScanner.err != nil {
return nil, nil, mvccScanner.err
}
intents, err := buildScanIntents(mvccScanner.intents.Repr())
if err != nil {
return nil, nil, err
}
if !opts.Inconsistent && len(intents) > 0 {
return nil, nil, &roachpb.WriteIntentError{Intents: intents}
}

if len(intents) > 1 {
return nil, nil, errors.Errorf("expected 0 or 1 intents, got %d", len(intents))
} else if len(intents) == 1 {
intent = &intents[0]
}

if len(mvccScanner.results.repr) == 0 {
return nil, intent, nil
}

mvccKey, rawValue, _, err := MVCCScanDecodeKeyValue(mvccScanner.results.repr)
if err != nil {
return nil, nil, err
}

value = &roachpb.Value{
RawBytes: rawValue,
Timestamp: mvccKey.Timestamp,
}
return
}

// MVCCGetAsTxn constructs a temporary transaction from the given transaction
Expand Down Expand Up @@ -2168,6 +2233,66 @@ func MVCCDeleteRange(
return keys, resumeSpan, int64(len(kvs)), err
}

func mvccScanToBytes(
ctx context.Context,
iter Iterator,
key, endKey roachpb.Key,
max int64,
timestamp hlc.Timestamp,
opts MVCCScanOptions,
) (kvData [][]byte, numKVs int64, resumeSpan *roachpb.Span, intents []roachpb.Intent, err error) {
if opts.Inconsistent && opts.Txn != nil {
return nil, 0, nil, nil, errors.Errorf("cannot allow inconsistent reads within a transaction")
}
if len(endKey) == 0 {
return nil, 0, nil, nil, emptyKeyError()
}
if max == 0 {
resumeSpan = &roachpb.Span{Key: key, EndKey: endKey}
return nil, 0, resumeSpan, nil, nil
}

// If the iterator has a specialized implementation, defer to that.
if mvccIter, ok := iter.(MVCCIterator); ok && mvccIter.MVCCOpsSpecialized() {
return mvccIter.MVCCScan(key, endKey, max, timestamp, opts)
}

mvccScanner := pebbleMVCCScannerPool.Get().(*pebbleMVCCScanner)
defer pebbleMVCCScannerPool.Put(mvccScanner)

*mvccScanner = pebbleMVCCScanner{
parent: iter,
reverse: opts.Reverse,
start: key,
end: endKey,
ts: timestamp,
maxKeys: max,
inconsistent: opts.Inconsistent,
tombstones: opts.Tombstones,
ignoreSeq: opts.IgnoreSequence,
}

mvccScanner.init(opts.Txn)
resumeSpan, err = mvccScanner.scan()

if err != nil {
return nil, 0, nil, nil, err
}

kvData = mvccScanner.results.finish()
numKVs = mvccScanner.results.count

intents, err = buildScanIntents(mvccScanner.intents.Repr())
if err != nil {
return nil, 0, nil, nil, err
}

if !opts.Inconsistent && len(intents) > 0 {
return nil, 0, resumeSpan, nil, &roachpb.WriteIntentError{Intents: intents}
}
return
}

// mvccScanToKvs converts the raw key/value pairs returned by Iterator.MVCCScan
// into a slice of roachpb.KeyValues.
func mvccScanToKvs(
Expand All @@ -2178,7 +2303,7 @@ func mvccScanToKvs(
timestamp hlc.Timestamp,
opts MVCCScanOptions,
) ([]roachpb.KeyValue, *roachpb.Span, []roachpb.Intent, error) {
kvData, numKVs, resumeSpan, intents, err := iter.MVCCScan(key, endKey, max, timestamp, opts)
kvData, numKVs, resumeSpan, intents, err := mvccScanToBytes(ctx, iter, key, endKey, max, timestamp, opts)
if err != nil {
return nil, nil, nil, err
}
Expand Down Expand Up @@ -2305,7 +2430,7 @@ func MVCCScanToBytes(
) ([][]byte, int64, *roachpb.Span, []roachpb.Intent, error) {
iter := engine.NewIterator(IterOptions{LowerBound: key, UpperBound: endKey})
defer iter.Close()
return iter.MVCCScan(key, endKey, max, timestamp, opts)
return mvccScanToBytes(ctx, iter, key, endKey, max, timestamp, opts)
}

// MVCCIterate iterates over the key range [start,end). At each step of the
Expand Down
121 changes: 0 additions & 121 deletions pkg/storage/engine/pebble_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
)

Expand Down Expand Up @@ -377,126 +376,6 @@ func (p *pebbleIterator) FindSplitKey(
return bestSplitKey, nil
}

// MVCCGet implements the Iterator interface.
func (p *pebbleIterator) MVCCGet(
key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions,
) (value *roachpb.Value, intent *roachpb.Intent, err error) {
if opts.Inconsistent && opts.Txn != nil {
return nil, nil, errors.Errorf("cannot allow inconsistent reads within a transaction")
}
if len(key) == 0 {
return nil, nil, emptyKeyError()
}
if p.iter == nil {
panic("uninitialized iterator")
}

mvccScanner := pebbleMVCCScannerPool.Get().(*pebbleMVCCScanner)
defer pebbleMVCCScannerPool.Put(mvccScanner)

// MVCCGet is implemented as an MVCCScan where we retrieve a single key. We
// specify an empty key for the end key which will ensure we don't retrieve a
// key different than the start key. This is a bit of a hack.
*mvccScanner = pebbleMVCCScanner{
parent: p.iter,
start: key,
ts: timestamp,
maxKeys: 1,
inconsistent: opts.Inconsistent,
tombstones: opts.Tombstones,
ignoreSeq: opts.IgnoreSequence,
}

mvccScanner.init(opts.Txn)
mvccScanner.get()

if mvccScanner.err != nil {
return nil, nil, mvccScanner.err
}
intents, err := buildScanIntents(mvccScanner.intents.Repr())
if err != nil {
return nil, nil, err
}
if !opts.Inconsistent && len(intents) > 0 {
return nil, nil, &roachpb.WriteIntentError{Intents: intents}
}

if len(intents) > 1 {
return nil, nil, errors.Errorf("expected 0 or 1 intents, got %d", len(intents))
} else if len(intents) == 1 {
intent = &intents[0]
}

if len(mvccScanner.results.repr) == 0 {
return nil, intent, nil
}

mvccKey, rawValue, _, err := MVCCScanDecodeKeyValue(mvccScanner.results.repr)
if err != nil {
return nil, nil, err
}

value = &roachpb.Value{
RawBytes: rawValue,
Timestamp: mvccKey.Timestamp,
}
return
}

// MVCCScan implements the Iterator interface.
func (p *pebbleIterator) MVCCScan(
start, end roachpb.Key, max int64, timestamp hlc.Timestamp, opts MVCCScanOptions,
) (kvData [][]byte, numKVs int64, resumeSpan *roachpb.Span, intents []roachpb.Intent, err error) {
if opts.Inconsistent && opts.Txn != nil {
return nil, 0, nil, nil, errors.Errorf("cannot allow inconsistent reads within a transaction")
}
if len(end) == 0 {
return nil, 0, nil, nil, emptyKeyError()
}
if max == 0 {
resumeSpan = &roachpb.Span{Key: start, EndKey: end}
return nil, 0, resumeSpan, nil, nil
}
if p.iter == nil {
panic("uninitialized iterator")
}

mvccScanner := pebbleMVCCScannerPool.Get().(*pebbleMVCCScanner)
defer pebbleMVCCScannerPool.Put(mvccScanner)

*mvccScanner = pebbleMVCCScanner{
parent: p.iter,
reverse: opts.Reverse,
start: start,
end: end,
ts: timestamp,
maxKeys: max,
inconsistent: opts.Inconsistent,
tombstones: opts.Tombstones,
ignoreSeq: opts.IgnoreSequence,
}

mvccScanner.init(opts.Txn)
resumeSpan, err = mvccScanner.scan()

if err != nil {
return nil, 0, nil, nil, err
}

kvData = mvccScanner.results.finish()
numKVs = mvccScanner.results.count

intents, err = buildScanIntents(mvccScanner.intents.Repr())
if err != nil {
return nil, 0, nil, nil, err
}

if !opts.Inconsistent && len(intents) > 0 {
return nil, 0, resumeSpan, nil, &roachpb.WriteIntentError{Intents: intents}
}
return
}

// SetUpperBound implements the Iterator interface.
func (p *pebbleIterator) SetUpperBound(upperBound roachpb.Key) {
p.upperBoundBuf = append(p.upperBoundBuf[:0], upperBound...)
Expand Down
Loading

0 comments on commit 56f99c4

Please sign in to comment.