Skip to content

Commit

Permalink
kvserver: scan only intents during rangefeed rts scan
Browse files Browse the repository at this point in the history
In 21.2, seperated intents are the default. Once migrated, we can then
use this to iterate over substantially less data to find all intents
for a given keyspan. The hope is that this will make rangefeed
start-up substantially cheaper.

Release note: None
  • Loading branch information
stevendanna committed Oct 15, 2021
1 parent 0984f87 commit 134de82
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 89 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func computeMinIntentTimestamp(
}
engineKey, err := iter.EngineKey()
if err != nil {
continue
return hlc.Timestamp{}, nil, err
}
lockedKey, err := keys.DecodeLockTableSingleKey(engineKey.Key)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed",
visibility = ["//visibility:public"],
deps = [
"//pkg/keys",
"//pkg/roachpb:with-mocks",
"//pkg/storage",
"//pkg/storage/enginepb",
Expand Down
12 changes: 6 additions & 6 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -178,9 +177,10 @@ func NewProcessor(cfg Config) *Processor {
}
}

// IteratorConstructor is used to construct an iterator. It should be called
// from underneath a stopper task to ensure that the engine has not been closed.
type IteratorConstructor func() storage.SimpleMVCCIterator
// IntentScannerConstructor is used to construct an IntentScanner. It
// should be called from underneath a stopper task to ensure that the
// engine has not been closed.
type IntentScannerConstructor func() IntentScanner

// CatchUpIteratorConstructor is used to construct an iterator that
// can be used for catchup-scans. It should be called from underneath
Expand All @@ -198,7 +198,7 @@ type CatchUpIteratorConstructor func() *CatchUpIterator
// calling its Close method when it is finished. If the iterator is nil then
// no initialization scan will be performed and the resolved timestamp will
// immediately be considered initialized.
func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IteratorConstructor) {
func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IntentScannerConstructor) {
ctx := p.AnnotateCtx(context.Background())
if err := stopper.RunAsyncTask(ctx, "rangefeed.Processor", func(ctx context.Context) {
p.run(ctx, p.RangeID, rtsIterFunc, stopper)
Expand All @@ -213,7 +213,7 @@ func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IteratorConstructor
func (p *Processor) run(
ctx context.Context,
_forStacks roachpb.RangeID,
rtsIterFunc IteratorConstructor,
rtsIterFunc IntentScannerConstructor,
stopper *stop.Stopper,
) {
defer close(p.stoppedC)
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,15 @@ func newTestProcessorWithTxnPusher(
EventChanCap: testProcessorEventCCap,
CheckStreamsInterval: 10 * time.Millisecond,
})
p.Start(stopper, makeIteratorConstructor(rtsIter))
p.Start(stopper, makeIntentScannerConstructor(rtsIter))
return p, stopper
}

func makeIteratorConstructor(rtsIter storage.SimpleMVCCIterator) IteratorConstructor {
func makeIntentScannerConstructor(rtsIter storage.SimpleMVCCIterator) IntentScannerConstructor {
if rtsIter == nil {
return nil
}
return func() storage.SimpleMVCCIterator { return rtsIter }
return func() IntentScanner { return NewLegacyIntentScanner(rtsIter) }
}

func newTestProcessor(rtsIter storage.SimpleMVCCIterator) (*Processor, *stop.Stopper) {
Expand Down
147 changes: 124 additions & 23 deletions pkg/kv/kvserver/rangefeed/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package rangefeed
import (
"context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
Expand All @@ -37,20 +38,13 @@ type runnable interface {
// The Processor can initialize its resolvedTimestamp once the scan completes
// because it knows it is now tracking all intents in its key range.
//
// MVCCIterator Contract:
// The provided MVCCIterator must observe all intents in the Processor's keyspan.
// An important implication of this is that if the iterator is a
// TimeBoundIterator, its MinTimestamp cannot be above the keyspan's largest
// known resolved timestamp, if one has ever been recorded. If one has never
// been recorded, the TimeBoundIterator cannot have any lower bound.
//
type initResolvedTSScan struct {
p *Processor
it storage.SimpleMVCCIterator
is IntentScanner
}

func newInitResolvedTSScan(p *Processor, it storage.SimpleMVCCIterator) runnable {
return &initResolvedTSScan{p: p, it: it}
func newInitResolvedTSScan(p *Processor, c IntentScanner) runnable {
return &initResolvedTSScan{p: p, is: c}
}

func (s *initResolvedTSScan) Run(ctx context.Context) {
Expand All @@ -66,50 +60,157 @@ func (s *initResolvedTSScan) Run(ctx context.Context) {
}

func (s *initResolvedTSScan) iterateAndConsume(ctx context.Context) error {
startKey := storage.MakeMVCCMetadataKey(s.p.Span.Key.AsRawKey())
endKey := storage.MakeMVCCMetadataKey(s.p.Span.EndKey.AsRawKey())
startKey := s.p.Span.Key.AsRawKey()
endKey := s.p.Span.EndKey.AsRawKey()
return s.is.ConsumeIntents(ctx, startKey, endKey, func(op enginepb.MVCCWriteIntentOp) bool {
var ops [1]enginepb.MVCCLogicalOp
ops[0].SetValue(&op)
return s.p.sendEvent(event{ops: ops[:]}, 0 /* timeout */)
})
}

func (s *initResolvedTSScan) Cancel() {
s.is.Close()
}

type eventConsumer func(enginepb.MVCCWriteIntentOp) bool

// IntentScanner is used by the ResolvedTSScan to find all intents on
// a range.
type IntentScanner interface {
// ConsumeIntents calls consumer on any intents found on keys between startKey and endKey.
ConsumeIntents(ctx context.Context, startKey roachpb.Key, endKey roachpb.Key, consumer eventConsumer) error
// Close closes the IntentScanner.
Close()
}

// SeparatedIntentScanner is an IntentScanner that assumes that
// separated intents are in use.
//
// EngineIterator Contract:
//
// - The EngineIterator must have an UpperBound set.
// - The range must be using separated intents.
type SeparatedIntentScanner struct {
iter storage.EngineIterator
}

// NewSeparatedIntentScanner returns an IntentScanner appropriate for
// use when the separated intents migration has completed.
func NewSeparatedIntentScanner(iter storage.EngineIterator) IntentScanner {
return &SeparatedIntentScanner{iter: iter}
}

// ConsumeIntents implements the IntentScanner interface.
func (s *SeparatedIntentScanner) ConsumeIntents(
ctx context.Context, startKey roachpb.Key, _ roachpb.Key, consumer eventConsumer,
) error {
ltStart, _ := keys.LockTableSingleKey(startKey, nil)
var meta enginepb.MVCCMetadata
for valid, err := s.iter.SeekEngineKeyGE(storage.EngineKey{Key: ltStart}); ; valid, err = s.iter.NextEngineKey() {
if err != nil {
return err
} else if !valid {
// We depend on the iterator having an
// UpperBound set and becoming invalid when it
// hits the UpperBound.
break
}

engineKey, err := s.iter.EngineKey()
if err != nil {
return err
}
lockedKey, err := keys.DecodeLockTableSingleKey(engineKey.Key)
if err != nil {
return errors.Wrapf(err, "decoding LockTable key: %s", lockedKey)
}

if err := protoutil.Unmarshal(s.iter.UnsafeValue(), &meta); err != nil {
return errors.Wrapf(err, "unmarshaling mvcc meta for locked key %s", lockedKey)
}
if meta.Txn == nil {
return errors.Newf("expected transaction metadata but found none for %s", lockedKey)
}

consumer(enginepb.MVCCWriteIntentOp{
TxnID: meta.Txn.ID,
TxnKey: meta.Txn.Key,
TxnMinTimestamp: meta.Txn.MinTimestamp,
Timestamp: meta.Txn.WriteTimestamp,
})
}
return nil
}

// Close implements the IntentScanner interface.
func (s *SeparatedIntentScanner) Close() { s.iter.Close() }

// LegacyIntentScanner is an IntentScanner that assumers intents might
// not be separated.
//
// MVCCIterator Contract:
//
// The provided MVCCIterator must observe all intents in the Processor's keyspan.
// An important implication of this is that if the iterator is a
// TimeBoundIterator, its MinTimestamp cannot be above the keyspan's largest
// known resolved timestamp, if one has ever been recorded. If one has never
// been recorded, the TimeBoundIterator cannot have any lower bound.
//
type LegacyIntentScanner struct {
iter storage.SimpleMVCCIterator
}

// NewLegacyIntentScanner returns an IntentScanner appropriate for use
// when the separated intents migration has not yet completed.
func NewLegacyIntentScanner(iter storage.SimpleMVCCIterator) IntentScanner {
return &LegacyIntentScanner{iter: iter}
}

// ConsumeIntents implements the IntentScanner interface.
func (l *LegacyIntentScanner) ConsumeIntents(
ctx context.Context, start roachpb.Key, end roachpb.Key, consumer eventConsumer,
) error {
startKey := storage.MakeMVCCMetadataKey(start)
endKey := storage.MakeMVCCMetadataKey(end)
// Iterate through all keys using NextKey. This will look at the first MVCC
// version for each key. We're only looking for MVCCMetadata versions, which
// will always be the first version of a key if it exists, so its fine that
// we skip over all other versions of keys.
var meta enginepb.MVCCMetadata
for s.it.SeekGE(startKey); ; s.it.NextKey() {
if ok, err := s.it.Valid(); err != nil {
for l.iter.SeekGE(startKey); ; l.iter.NextKey() {
if ok, err := l.iter.Valid(); err != nil {
return err
} else if !ok || !s.it.UnsafeKey().Less(endKey) {
} else if !ok || !l.iter.UnsafeKey().Less(endKey) {
break
}

// If the key is not a metadata key, ignore it.
unsafeKey := s.it.UnsafeKey()
unsafeKey := l.iter.UnsafeKey()
if unsafeKey.IsValue() {
continue
}

// Found a metadata key. Unmarshal.
if err := protoutil.Unmarshal(s.it.UnsafeValue(), &meta); err != nil {
if err := protoutil.Unmarshal(l.iter.UnsafeValue(), &meta); err != nil {
return errors.Wrapf(err, "unmarshaling mvcc meta: %v", unsafeKey)
}

// If this is an intent, inform the Processor.
if meta.Txn != nil {
var ops [1]enginepb.MVCCLogicalOp
ops[0].SetValue(&enginepb.MVCCWriteIntentOp{
consumer(enginepb.MVCCWriteIntentOp{
TxnID: meta.Txn.ID,
TxnKey: meta.Txn.Key,
TxnMinTimestamp: meta.Txn.MinTimestamp,
Timestamp: meta.Txn.WriteTimestamp,
})
s.p.sendEvent(event{ops: ops[:]}, 0 /* timeout */)
}
}
return nil
}

func (s *initResolvedTSScan) Cancel() {
s.it.Close()
}
// Close implements the IntentScanner interface.
func (l *LegacyIntentScanner) Close() { l.iter.Close() }

// TxnPusher is capable of pushing transactions to a new timestamp and
// cleaning up the intents of transactions that are found to be committed.
Expand Down
Loading

0 comments on commit 134de82

Please sign in to comment.