Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage/engine: lift Go MVCC implementation above Iterator interface #42210

Merged
merged 1 commit into from
Nov 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions pkg/storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,23 @@ type Iterator interface {
// and the encoded SST data specified, within the provided key range. Returns
// stats on skipped KVs, or an error if a collision is found.
CheckForKeyCollisions(sstData []byte, start, end roachpb.Key) (enginepb.MVCCStats, error)
// SetUpperBound installs a new upper bound for this iterator.
SetUpperBound(roachpb.Key)
// Stats returns statistics about the iterator.
Stats() IteratorStats
}

// MVCCIterator is an interface that extends Iterator and provides concrete
// implementations for MVCCGet and MVCCScan operations. It is used by instances
// of the interface backed by RocksDB iterators to avoid cgo hops.
type MVCCIterator interface {
Iterator
// MVCCOpsSpecialized returns whether the iterator has a specialized
// implementation of MVCCGet and MVCCScan. This is exposed as a method
// so that wrapper types can defer to their wrapped iterators.
MVCCOpsSpecialized() bool
// MVCCGet is the internal implementation of the family of package-level
// MVCCGet functions.
//
// DO NOT CALL directly (except in wrapper Iterator implementations). Use the
// package-level MVCCGet, or one of its variants, instead.
MVCCGet(
key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions,
) (*roachpb.Value, *roachpb.Intent, error)
Expand All @@ -129,20 +141,9 @@ type Iterator interface {
// returned raw, as a series of buffers of length-prefixed slices,
// alternating from key to value, where numKVs specifies the number of pairs
// in the buffer.
//
// DO NOT CALL directly (except in wrapper Iterator implementations). Use the
// package-level MVCCScan, or one of its variants, instead. For correct
// operation, the caller must set the lower and upper bounds on the iterator
// before calling this method.
//
// TODO(peter): unexport this method.
MVCCScan(
start, end roachpb.Key, max int64, timestamp hlc.Timestamp, opts MVCCScanOptions,
) (kvData [][]byte, numKVs int64, resumeSpan *roachpb.Span, intents []roachpb.Intent, err error)
// SetUpperBound installs a new upper bound for this iterator.
SetUpperBound(roachpb.Key)

Stats() IteratorStats
}

// IterOptions contains options used to create an Iterator.
Expand Down
135 changes: 129 additions & 6 deletions pkg/storage/engine/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,14 +730,78 @@ type MVCCGetOptions struct {
func MVCCGet(
ctx context.Context, eng Reader, key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions,
) (*roachpb.Value, *roachpb.Intent, error) {
iter := eng.NewIterator(IterOptions{Prefix: true})
defer iter.Close()
return mvccGet(ctx, iter, key, timestamp, opts)
}

func mvccGet(
ctx context.Context, iter Iterator, key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions,
) (value *roachpb.Value, intent *roachpb.Intent, err error) {
if timestamp.WallTime < 0 {
return nil, nil, errors.Errorf("cannot write to %q at timestamp %s", key, timestamp)
}
if opts.Inconsistent && opts.Txn != nil {
return nil, nil, errors.Errorf("cannot allow inconsistent reads within a transaction")
}
if len(key) == 0 {
return nil, nil, emptyKeyError()
}

iter := eng.NewIterator(IterOptions{Prefix: true})
value, intent, err := iter.MVCCGet(key, timestamp, opts)
iter.Close()
return value, intent, err
// If the iterator has a specialized implementation, defer to that.
if mvccIter, ok := iter.(MVCCIterator); ok && mvccIter.MVCCOpsSpecialized() {
return mvccIter.MVCCGet(key, timestamp, opts)
}

mvccScanner := pebbleMVCCScannerPool.Get().(*pebbleMVCCScanner)
defer pebbleMVCCScannerPool.Put(mvccScanner)

// MVCCGet is implemented as an MVCCScan where we retrieve a single key. We
// specify an empty key for the end key which will ensure we don't retrieve a
// key different than the start key. This is a bit of a hack.
*mvccScanner = pebbleMVCCScanner{
parent: iter,
start: key,
ts: timestamp,
maxKeys: 1,
inconsistent: opts.Inconsistent,
tombstones: opts.Tombstones,
}

mvccScanner.init(opts.Txn)
mvccScanner.get()

if mvccScanner.err != nil {
return nil, nil, mvccScanner.err
}
intents, err := buildScanIntents(mvccScanner.intents.Repr())
if err != nil {
return nil, nil, err
}
if !opts.Inconsistent && len(intents) > 0 {
return nil, nil, &roachpb.WriteIntentError{Intents: intents}
}

if len(intents) > 1 {
return nil, nil, errors.Errorf("expected 0 or 1 intents, got %d", len(intents))
} else if len(intents) == 1 {
intent = &intents[0]
}

if len(mvccScanner.results.repr) == 0 {
return nil, intent, nil
}

mvccKey, rawValue, _, err := MVCCScanDecodeKeyValue(mvccScanner.results.repr)
if err != nil {
return nil, nil, err
}

value = &roachpb.Value{
RawBytes: rawValue,
Timestamp: mvccKey.Timestamp,
}
return
}

// MVCCGetAsTxn constructs a temporary transaction from the given transaction
Expand Down Expand Up @@ -2170,6 +2234,65 @@ func MVCCDeleteRange(
return keys, resumeSpan, int64(len(kvs)), err
}

func mvccScanToBytes(
ctx context.Context,
iter Iterator,
key, endKey roachpb.Key,
max int64,
timestamp hlc.Timestamp,
opts MVCCScanOptions,
) (kvData [][]byte, numKVs int64, resumeSpan *roachpb.Span, intents []roachpb.Intent, err error) {
if opts.Inconsistent && opts.Txn != nil {
return nil, 0, nil, nil, errors.Errorf("cannot allow inconsistent reads within a transaction")
}
if len(endKey) == 0 {
return nil, 0, nil, nil, emptyKeyError()
}
if max == 0 {
resumeSpan = &roachpb.Span{Key: key, EndKey: endKey}
return nil, 0, resumeSpan, nil, nil
}

// If the iterator has a specialized implementation, defer to that.
if mvccIter, ok := iter.(MVCCIterator); ok && mvccIter.MVCCOpsSpecialized() {
return mvccIter.MVCCScan(key, endKey, max, timestamp, opts)
}

mvccScanner := pebbleMVCCScannerPool.Get().(*pebbleMVCCScanner)
defer pebbleMVCCScannerPool.Put(mvccScanner)

*mvccScanner = pebbleMVCCScanner{
parent: iter,
reverse: opts.Reverse,
start: key,
end: endKey,
ts: timestamp,
maxKeys: max,
inconsistent: opts.Inconsistent,
tombstones: opts.Tombstones,
}

mvccScanner.init(opts.Txn)
resumeSpan, err = mvccScanner.scan()

if err != nil {
return nil, 0, nil, nil, err
}

kvData = mvccScanner.results.finish()
numKVs = mvccScanner.results.count

intents, err = buildScanIntents(mvccScanner.intents.Repr())
if err != nil {
return nil, 0, nil, nil, err
}

if !opts.Inconsistent && len(intents) > 0 {
return nil, 0, resumeSpan, nil, &roachpb.WriteIntentError{Intents: intents}
}
return
}

// mvccScanToKvs converts the raw key/value pairs returned by Iterator.MVCCScan
// into a slice of roachpb.KeyValues.
func mvccScanToKvs(
Expand All @@ -2180,7 +2303,7 @@ func mvccScanToKvs(
timestamp hlc.Timestamp,
opts MVCCScanOptions,
) ([]roachpb.KeyValue, *roachpb.Span, []roachpb.Intent, error) {
kvData, numKVs, resumeSpan, intents, err := iter.MVCCScan(key, endKey, max, timestamp, opts)
kvData, numKVs, resumeSpan, intents, err := mvccScanToBytes(ctx, iter, key, endKey, max, timestamp, opts)
if err != nil {
return nil, nil, nil, err
}
Expand Down Expand Up @@ -2305,7 +2428,7 @@ func MVCCScanToBytes(
) ([][]byte, int64, *roachpb.Span, []roachpb.Intent, error) {
iter := engine.NewIterator(IterOptions{LowerBound: key, UpperBound: endKey})
defer iter.Close()
return iter.MVCCScan(key, endKey, max, timestamp, opts)
return mvccScanToBytes(ctx, iter, key, endKey, max, timestamp, opts)
}

// MVCCIterate iterates over the key range [start,end). At each step of the
Expand Down
119 changes: 0 additions & 119 deletions pkg/storage/engine/pebble_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
)

Expand Down Expand Up @@ -365,124 +364,6 @@ func (p *pebbleIterator) FindSplitKey(
return bestSplitKey, nil
}

// MVCCGet implements the Iterator interface.
func (p *pebbleIterator) MVCCGet(
key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions,
) (value *roachpb.Value, intent *roachpb.Intent, err error) {
if opts.Inconsistent && opts.Txn != nil {
return nil, nil, errors.Errorf("cannot allow inconsistent reads within a transaction")
}
if len(key) == 0 {
return nil, nil, emptyKeyError()
}
if p.iter == nil {
panic("uninitialized iterator")
}

mvccScanner := pebbleMVCCScannerPool.Get().(*pebbleMVCCScanner)
defer pebbleMVCCScannerPool.Put(mvccScanner)

// MVCCGet is implemented as an MVCCScan where we retrieve a single key. We
// specify an empty key for the end key which will ensure we don't retrieve a
// key different than the start key. This is a bit of a hack.
*mvccScanner = pebbleMVCCScanner{
parent: p.iter,
start: key,
ts: timestamp,
maxKeys: 1,
inconsistent: opts.Inconsistent,
tombstones: opts.Tombstones,
}

mvccScanner.init(opts.Txn)
mvccScanner.get()

if mvccScanner.err != nil {
return nil, nil, mvccScanner.err
}
intents, err := buildScanIntents(mvccScanner.intents.Repr())
if err != nil {
return nil, nil, err
}
if !opts.Inconsistent && len(intents) > 0 {
return nil, nil, &roachpb.WriteIntentError{Intents: intents}
}

if len(intents) > 1 {
return nil, nil, errors.Errorf("expected 0 or 1 intents, got %d", len(intents))
} else if len(intents) == 1 {
intent = &intents[0]
}

if len(mvccScanner.results.repr) == 0 {
return nil, intent, nil
}

mvccKey, rawValue, _, err := MVCCScanDecodeKeyValue(mvccScanner.results.repr)
if err != nil {
return nil, nil, err
}

value = &roachpb.Value{
RawBytes: rawValue,
Timestamp: mvccKey.Timestamp,
}
return
}

// MVCCScan implements the Iterator interface.
func (p *pebbleIterator) MVCCScan(
start, end roachpb.Key, max int64, timestamp hlc.Timestamp, opts MVCCScanOptions,
) (kvData [][]byte, numKVs int64, resumeSpan *roachpb.Span, intents []roachpb.Intent, err error) {
if opts.Inconsistent && opts.Txn != nil {
return nil, 0, nil, nil, errors.Errorf("cannot allow inconsistent reads within a transaction")
}
if len(end) == 0 {
return nil, 0, nil, nil, emptyKeyError()
}
if max == 0 {
resumeSpan = &roachpb.Span{Key: start, EndKey: end}
return nil, 0, resumeSpan, nil, nil
}
if p.iter == nil {
panic("uninitialized iterator")
}

mvccScanner := pebbleMVCCScannerPool.Get().(*pebbleMVCCScanner)
defer pebbleMVCCScannerPool.Put(mvccScanner)

*mvccScanner = pebbleMVCCScanner{
parent: p.iter,
reverse: opts.Reverse,
start: start,
end: end,
ts: timestamp,
maxKeys: max,
inconsistent: opts.Inconsistent,
tombstones: opts.Tombstones,
}

mvccScanner.init(opts.Txn)
resumeSpan, err = mvccScanner.scan()

if err != nil {
return nil, 0, nil, nil, err
}

kvData = mvccScanner.results.finish()
numKVs = mvccScanner.results.count

intents, err = buildScanIntents(mvccScanner.intents.Repr())
if err != nil {
return nil, 0, nil, nil, err
}

if !opts.Inconsistent && len(intents) > 0 {
return nil, 0, resumeSpan, nil, &roachpb.WriteIntentError{Intents: intents}
}
return
}

// SetUpperBound implements the Iterator interface.
func (p *pebbleIterator) SetUpperBound(upperBound roachpb.Key) {
p.upperBoundBuf = append(p.upperBoundBuf[:0], upperBound...)
Expand Down
Loading