From 134de82180a6dc4e4405e45450d1c0036f59b36f Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Thu, 7 Oct 2021 19:21:06 +0100 Subject: [PATCH] kvserver: scan only intents during rangefeed rts scan In 21.2, seperated intents are the default. Once migrated, we can then use this to iterate over substantially less data to find all intents for a given keyspan. The hope is that this will make rangefeed start-up substantially cheaper. Release note: None --- .../batcheval/cmd_query_resolved_timestamp.go | 2 +- pkg/kv/kvserver/rangefeed/BUILD.bazel | 1 + pkg/kv/kvserver/rangefeed/processor.go | 12 +- pkg/kv/kvserver/rangefeed/processor_test.go | 6 +- pkg/kv/kvserver/rangefeed/task.go | 147 +++++++++++--- pkg/kv/kvserver/rangefeed/task_test.go | 188 +++++++++++++----- pkg/kv/kvserver/replica_rangefeed.go | 26 ++- 7 files changed, 293 insertions(+), 89 deletions(-) diff --git a/pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp.go b/pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp.go index 6b0aa088ea1d..07ca21fb2155 100644 --- a/pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp.go +++ b/pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp.go @@ -117,7 +117,7 @@ func computeMinIntentTimestamp( } engineKey, err := iter.EngineKey() if err != nil { - continue + return hlc.Timestamp{}, nil, err } lockedKey, err := keys.DecodeLockTableSingleKey(engineKey.Key) if err != nil { diff --git a/pkg/kv/kvserver/rangefeed/BUILD.bazel b/pkg/kv/kvserver/rangefeed/BUILD.bazel index 8b8f98449ebf..a019042dd6ce 100644 --- a/pkg/kv/kvserver/rangefeed/BUILD.bazel +++ b/pkg/kv/kvserver/rangefeed/BUILD.bazel @@ -14,6 +14,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed", visibility = ["//visibility:public"], deps = [ + "//pkg/keys", "//pkg/roachpb:with-mocks", "//pkg/storage", "//pkg/storage/enginepb", diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 20e34ca8f446..ba6526bf2e44 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -17,7 +17,6 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -178,9 +177,10 @@ func NewProcessor(cfg Config) *Processor { } } -// IteratorConstructor is used to construct an iterator. It should be called -// from underneath a stopper task to ensure that the engine has not been closed. -type IteratorConstructor func() storage.SimpleMVCCIterator +// IntentScannerConstructor is used to construct an IntentScanner. It +// should be called from underneath a stopper task to ensure that the +// engine has not been closed. +type IntentScannerConstructor func() IntentScanner // CatchUpIteratorConstructor is used to construct an iterator that // can be used for catchup-scans. It should be called from underneath @@ -198,7 +198,7 @@ type CatchUpIteratorConstructor func() *CatchUpIterator // calling its Close method when it is finished. If the iterator is nil then // no initialization scan will be performed and the resolved timestamp will // immediately be considered initialized. -func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IteratorConstructor) { +func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IntentScannerConstructor) { ctx := p.AnnotateCtx(context.Background()) if err := stopper.RunAsyncTask(ctx, "rangefeed.Processor", func(ctx context.Context) { p.run(ctx, p.RangeID, rtsIterFunc, stopper) @@ -213,7 +213,7 @@ func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IteratorConstructor func (p *Processor) run( ctx context.Context, _forStacks roachpb.RangeID, - rtsIterFunc IteratorConstructor, + rtsIterFunc IntentScannerConstructor, stopper *stop.Stopper, ) { defer close(p.stoppedC) diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 8a76e54165dc..327a7b2d6219 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -154,15 +154,15 @@ func newTestProcessorWithTxnPusher( EventChanCap: testProcessorEventCCap, CheckStreamsInterval: 10 * time.Millisecond, }) - p.Start(stopper, makeIteratorConstructor(rtsIter)) + p.Start(stopper, makeIntentScannerConstructor(rtsIter)) return p, stopper } -func makeIteratorConstructor(rtsIter storage.SimpleMVCCIterator) IteratorConstructor { +func makeIntentScannerConstructor(rtsIter storage.SimpleMVCCIterator) IntentScannerConstructor { if rtsIter == nil { return nil } - return func() storage.SimpleMVCCIterator { return rtsIter } + return func() IntentScanner { return NewLegacyIntentScanner(rtsIter) } } func newTestProcessor(rtsIter storage.SimpleMVCCIterator) (*Processor, *stop.Stopper) { diff --git a/pkg/kv/kvserver/rangefeed/task.go b/pkg/kv/kvserver/rangefeed/task.go index 5981894bb9f9..a0a505342a58 100644 --- a/pkg/kv/kvserver/rangefeed/task.go +++ b/pkg/kv/kvserver/rangefeed/task.go @@ -13,6 +13,7 @@ package rangefeed import ( "context" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -37,20 +38,13 @@ type runnable interface { // The Processor can initialize its resolvedTimestamp once the scan completes // because it knows it is now tracking all intents in its key range. // -// MVCCIterator Contract: -// The provided MVCCIterator must observe all intents in the Processor's keyspan. -// An important implication of this is that if the iterator is a -// TimeBoundIterator, its MinTimestamp cannot be above the keyspan's largest -// known resolved timestamp, if one has ever been recorded. If one has never -// been recorded, the TimeBoundIterator cannot have any lower bound. -// type initResolvedTSScan struct { p *Processor - it storage.SimpleMVCCIterator + is IntentScanner } -func newInitResolvedTSScan(p *Processor, it storage.SimpleMVCCIterator) runnable { - return &initResolvedTSScan{p: p, it: it} +func newInitResolvedTSScan(p *Processor, c IntentScanner) runnable { + return &initResolvedTSScan{p: p, is: c} } func (s *initResolvedTSScan) Run(ctx context.Context) { @@ -66,50 +60,157 @@ func (s *initResolvedTSScan) Run(ctx context.Context) { } func (s *initResolvedTSScan) iterateAndConsume(ctx context.Context) error { - startKey := storage.MakeMVCCMetadataKey(s.p.Span.Key.AsRawKey()) - endKey := storage.MakeMVCCMetadataKey(s.p.Span.EndKey.AsRawKey()) + startKey := s.p.Span.Key.AsRawKey() + endKey := s.p.Span.EndKey.AsRawKey() + return s.is.ConsumeIntents(ctx, startKey, endKey, func(op enginepb.MVCCWriteIntentOp) bool { + var ops [1]enginepb.MVCCLogicalOp + ops[0].SetValue(&op) + return s.p.sendEvent(event{ops: ops[:]}, 0 /* timeout */) + }) +} + +func (s *initResolvedTSScan) Cancel() { + s.is.Close() +} + +type eventConsumer func(enginepb.MVCCWriteIntentOp) bool + +// IntentScanner is used by the ResolvedTSScan to find all intents on +// a range. +type IntentScanner interface { + // ConsumeIntents calls consumer on any intents found on keys between startKey and endKey. + ConsumeIntents(ctx context.Context, startKey roachpb.Key, endKey roachpb.Key, consumer eventConsumer) error + // Close closes the IntentScanner. + Close() +} + +// SeparatedIntentScanner is an IntentScanner that assumes that +// separated intents are in use. +// +// EngineIterator Contract: +// +// - The EngineIterator must have an UpperBound set. +// - The range must be using separated intents. +type SeparatedIntentScanner struct { + iter storage.EngineIterator +} + +// NewSeparatedIntentScanner returns an IntentScanner appropriate for +// use when the separated intents migration has completed. +func NewSeparatedIntentScanner(iter storage.EngineIterator) IntentScanner { + return &SeparatedIntentScanner{iter: iter} +} + +// ConsumeIntents implements the IntentScanner interface. +func (s *SeparatedIntentScanner) ConsumeIntents( + ctx context.Context, startKey roachpb.Key, _ roachpb.Key, consumer eventConsumer, +) error { + ltStart, _ := keys.LockTableSingleKey(startKey, nil) + var meta enginepb.MVCCMetadata + for valid, err := s.iter.SeekEngineKeyGE(storage.EngineKey{Key: ltStart}); ; valid, err = s.iter.NextEngineKey() { + if err != nil { + return err + } else if !valid { + // We depend on the iterator having an + // UpperBound set and becoming invalid when it + // hits the UpperBound. + break + } + + engineKey, err := s.iter.EngineKey() + if err != nil { + return err + } + lockedKey, err := keys.DecodeLockTableSingleKey(engineKey.Key) + if err != nil { + return errors.Wrapf(err, "decoding LockTable key: %s", lockedKey) + } + + if err := protoutil.Unmarshal(s.iter.UnsafeValue(), &meta); err != nil { + return errors.Wrapf(err, "unmarshaling mvcc meta for locked key %s", lockedKey) + } + if meta.Txn == nil { + return errors.Newf("expected transaction metadata but found none for %s", lockedKey) + } + + consumer(enginepb.MVCCWriteIntentOp{ + TxnID: meta.Txn.ID, + TxnKey: meta.Txn.Key, + TxnMinTimestamp: meta.Txn.MinTimestamp, + Timestamp: meta.Txn.WriteTimestamp, + }) + } + return nil +} + +// Close implements the IntentScanner interface. +func (s *SeparatedIntentScanner) Close() { s.iter.Close() } + +// LegacyIntentScanner is an IntentScanner that assumers intents might +// not be separated. +// +// MVCCIterator Contract: +// +// The provided MVCCIterator must observe all intents in the Processor's keyspan. +// An important implication of this is that if the iterator is a +// TimeBoundIterator, its MinTimestamp cannot be above the keyspan's largest +// known resolved timestamp, if one has ever been recorded. If one has never +// been recorded, the TimeBoundIterator cannot have any lower bound. +// +type LegacyIntentScanner struct { + iter storage.SimpleMVCCIterator +} + +// NewLegacyIntentScanner returns an IntentScanner appropriate for use +// when the separated intents migration has not yet completed. +func NewLegacyIntentScanner(iter storage.SimpleMVCCIterator) IntentScanner { + return &LegacyIntentScanner{iter: iter} +} +// ConsumeIntents implements the IntentScanner interface. +func (l *LegacyIntentScanner) ConsumeIntents( + ctx context.Context, start roachpb.Key, end roachpb.Key, consumer eventConsumer, +) error { + startKey := storage.MakeMVCCMetadataKey(start) + endKey := storage.MakeMVCCMetadataKey(end) // Iterate through all keys using NextKey. This will look at the first MVCC // version for each key. We're only looking for MVCCMetadata versions, which // will always be the first version of a key if it exists, so its fine that // we skip over all other versions of keys. var meta enginepb.MVCCMetadata - for s.it.SeekGE(startKey); ; s.it.NextKey() { - if ok, err := s.it.Valid(); err != nil { + for l.iter.SeekGE(startKey); ; l.iter.NextKey() { + if ok, err := l.iter.Valid(); err != nil { return err - } else if !ok || !s.it.UnsafeKey().Less(endKey) { + } else if !ok || !l.iter.UnsafeKey().Less(endKey) { break } // If the key is not a metadata key, ignore it. - unsafeKey := s.it.UnsafeKey() + unsafeKey := l.iter.UnsafeKey() if unsafeKey.IsValue() { continue } // Found a metadata key. Unmarshal. - if err := protoutil.Unmarshal(s.it.UnsafeValue(), &meta); err != nil { + if err := protoutil.Unmarshal(l.iter.UnsafeValue(), &meta); err != nil { return errors.Wrapf(err, "unmarshaling mvcc meta: %v", unsafeKey) } // If this is an intent, inform the Processor. if meta.Txn != nil { - var ops [1]enginepb.MVCCLogicalOp - ops[0].SetValue(&enginepb.MVCCWriteIntentOp{ + consumer(enginepb.MVCCWriteIntentOp{ TxnID: meta.Txn.ID, TxnKey: meta.Txn.Key, TxnMinTimestamp: meta.Txn.MinTimestamp, Timestamp: meta.Txn.WriteTimestamp, }) - s.p.sendEvent(event{ops: ops[:]}, 0 /* timeout */) } } return nil } -func (s *initResolvedTSScan) Cancel() { - s.it.Close() -} +// Close implements the IntentScanner interface. +func (l *LegacyIntentScanner) Close() { l.iter.Close() } // TxnPusher is capable of pushing transactions to a new timestamp and // cleaning up the intents of transactions that are found to be committed. diff --git a/pkg/kv/kvserver/rangefeed/task_test.go b/pkg/kv/kvserver/rangefeed/task_test.go index 1717e8bbaca3..5f7a99d0ec2c 100644 --- a/pkg/kv/kvserver/rangefeed/task_test.go +++ b/pkg/kv/kvserver/rangefeed/task_test.go @@ -15,6 +15,7 @@ import ( "sort" "testing" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -22,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -190,66 +192,158 @@ func (s *testIterator) curKV() storage.MVCCKeyValue { func TestInitResolvedTSScan(t *testing.T) { defer leaktest.AfterTest(t)() + startKey := roachpb.RKey("d") + endKey := roachpb.RKey("w") + + makeTxn := func(key string, id uuid.UUID, ts hlc.Timestamp) roachpb.Transaction { + txnMeta := enginepb.TxnMeta{ + Key: []byte(key), + ID: id, + Epoch: 1, + WriteTimestamp: ts, + MinTimestamp: ts, + } + return roachpb.Transaction{ + TxnMeta: txnMeta, + ReadTimestamp: ts, + } + } - // Mock processor. We just needs its eventC. - p := Processor{ - Config: Config{ - Span: roachpb.RSpan{ - Key: roachpb.RKey("d"), - EndKey: roachpb.RKey("w"), - }, - }, - eventC: make(chan *event, 100), + txn1ID := uuid.MakeV4() + txn1TS := hlc.Timestamp{WallTime: 15} + txn1Key := "txnKey1" + txn1 := makeTxn(txn1Key, txn1ID, txn1TS) + + txn2ID := uuid.MakeV4() + txn2TS := hlc.Timestamp{WallTime: 21} + txn2Key := "txnKey2" + txn2 := makeTxn(txn2Key, txn2ID, txn2TS) + + type op struct { + kv storage.MVCCKeyValue + txn *roachpb.Transaction } - // Run an init rts scan over a test iterator with the following keys. - txn1, txn2 := uuid.MakeV4(), uuid.MakeV4() - iter := newTestIterator([]storage.MVCCKeyValue{ - makeKV("a", "val1", 10), - makeInline("b", "val2"), - makeIntent("c", txn1, "txnKey1", 15), - makeProvisionalKV("c", "txnKey1", 15), - makeKV("c", "val3", 11), - makeKV("c", "val4", 9), - makeIntent("d", txn2, "txnKey2", 21), - makeProvisionalKV("d", "txnKey2", 21), - makeKV("d", "val5", 20), - makeKV("d", "val6", 19), - makeInline("g", "val7"), - makeKV("m", "val8", 1), - makeIntent("n", txn1, "txnKey1", 12), - makeProvisionalKV("n", "txnKey1", 12), - makeIntent("r", txn1, "txnKey1", 19), - makeProvisionalKV("r", "txnKey1", 19), - makeKV("r", "val9", 4), - makeIntent("w", txn1, "txnKey1", 3), - makeProvisionalKV("w", "txnKey1", 3), - makeInline("x", "val10"), - makeIntent("z", txn2, "txnKey2", 21), - makeProvisionalKV("z", "txnKey2", 21), - makeKV("z", "val11", 4), - }, nil) - - initScan := newInitResolvedTSScan(&p, iter) - initScan.Run(context.Background()) - require.True(t, iter.closed) + makeEngine := func(enableSeparatedIntents bool) storage.Engine { + ctx := context.Background() + engine := storage.NewInMemForTesting(enableSeparatedIntents) + testData := []op{ + {kv: makeKV("a", "val1", 10)}, + {kv: makeInline("b", "val2")}, + {kv: makeKV("c", "val4", 9)}, + {kv: makeKV("c", "val3", 11)}, + { + txn: &txn1, + kv: makeProvisionalKV("c", "txnKey1", 15), + }, + {kv: makeKV("d", "val6", 19)}, + {kv: makeKV("d", "val5", 20)}, + { + txn: &txn2, + kv: makeProvisionalKV("d", "txnKey2", 21), + }, + {kv: makeInline("g", "val7")}, + {kv: makeKV("m", "val8", 1)}, + { + txn: &txn1, + kv: makeProvisionalKV("n", "txnKey1", 15), + }, + {kv: makeKV("r", "val9", 4)}, + { + txn: &txn1, + kv: makeProvisionalKV("r", "txnKey1", 15), + }, + { + txn: &txn1, + kv: makeProvisionalKV("w", "txnKey1", 15), + }, + {kv: makeInline("x", "val10")}, + {kv: makeKV("z", "val11", 4)}, + { + txn: &txn2, + kv: makeProvisionalKV("z", "txnKey2", 21), + }, + } + for _, op := range testData { + kv := op.kv + err := storage.MVCCPut(ctx, engine, nil, kv.Key.Key, kv.Key.Timestamp, roachpb.Value{RawBytes: kv.Value}, op.txn) + require.NoError(t, err) + } + return engine + } - // Compare the event channel to the expected events. expEvents := []*event{ {ops: []enginepb.MVCCLogicalOp{ - writeIntentOpWithKey(txn2, []byte("txnKey2"), hlc.Timestamp{WallTime: 21}), + writeIntentOpWithKey(txn2ID, []byte("txnKey2"), hlc.Timestamp{WallTime: 21}), }}, {ops: []enginepb.MVCCLogicalOp{ - writeIntentOpWithKey(txn1, []byte("txnKey1"), hlc.Timestamp{WallTime: 12}), + writeIntentOpWithKey(txn1ID, []byte("txnKey1"), hlc.Timestamp{WallTime: 15}), }}, {ops: []enginepb.MVCCLogicalOp{ - writeIntentOpWithKey(txn1, []byte("txnKey1"), hlc.Timestamp{WallTime: 19}), + writeIntentOpWithKey(txn1ID, []byte("txnKey1"), hlc.Timestamp{WallTime: 15}), }}, {initRTS: true}, } - require.Equal(t, len(expEvents), len(p.eventC)) - for _, expEvent := range expEvents { - require.Equal(t, expEvent, <-p.eventC) + + testCases := map[string]struct { + intentScanner func() (IntentScanner, func()) + }{ + "legacy intent scanner": { + intentScanner: func() (IntentScanner, func()) { + engine := makeEngine(true) + iter := engine.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ + UpperBound: endKey.AsRawKey(), + }) + return NewLegacyIntentScanner(iter), func() { engine.Close() } + }, + }, + "legacy intent scanner with interleaved intents": { + intentScanner: func() (IntentScanner, func()) { + engine := makeEngine(false) + iter := engine.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ + UpperBound: endKey.AsRawKey(), + }) + return NewLegacyIntentScanner(iter), func() { engine.Close() } + }, + }, + "separated intent scanner": { + intentScanner: func() (IntentScanner, func()) { + engine := makeEngine(true) + require.True(t, engine.IsSeparatedIntentsEnabledForTesting(context.Background())) + lowerBound, _ := keys.LockTableSingleKey(startKey.AsRawKey(), nil) + upperBound, _ := keys.LockTableSingleKey(endKey.AsRawKey(), nil) + iter := engine.NewEngineIterator(storage.IterOptions{ + LowerBound: lowerBound, + UpperBound: upperBound, + }) + return NewSeparatedIntentScanner(iter), func() { engine.Close() } + }, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + // Mock processor. We just needs its eventC. + p := Processor{ + Config: Config{ + Span: roachpb.RSpan{ + Key: startKey, + EndKey: endKey, + }, + }, + eventC: make(chan *event, 100), + } + isc, cleanup := tc.intentScanner() + defer cleanup() + initScan := newInitResolvedTSScan(&p, isc) + initScan.Run(context.Background()) + // Compare the event channel to the expected events. + assert.Equal(t, len(expEvents), len(p.eventC)) + for _, expEvent := range expEvents { + assert.Equal(t, expEvent, <-p.eventC) + } + + }) } } diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index e780afbc5708..c9b3d3cc22ba 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -378,23 +378,31 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( p = rangefeed.NewProcessor(cfg) // Start it with an iterator to initialize the resolved timestamp. - rtsIter := func() storage.SimpleMVCCIterator { + rtsIter := func() rangefeed.IntentScanner { // Assert that we still hold the raftMu when this is called to ensure // that the rtsIter reads from the current snapshot. The replica // synchronizes with the rangefeed Processor calling this function by // waiting for the Register call below to return. r.raftMu.AssertHeld() - return r.Engine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ + + onlySeparatedIntents := r.store.cfg.Settings.Version.IsActive(ctx, clusterversion.PostSeparatedIntentsMigration) + + if onlySeparatedIntents { + lowerBound, _ := keys.LockTableSingleKey(desc.StartKey.AsRawKey(), nil) + upperBound, _ := keys.LockTableSingleKey(desc.EndKey.AsRawKey(), nil) + iter := r.Engine().NewEngineIterator(storage.IterOptions{ + LowerBound: lowerBound, + UpperBound: upperBound, + }) + return rangefeed.NewSeparatedIntentScanner(iter) + + } + iter := r.Engine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ UpperBound: desc.EndKey.AsRawKey(), - // TODO(nvanbenschoten): To facilitate fast restarts of rangefeed - // we should periodically persist the resolved timestamp so that we - // can initialize the rangefeed using an iterator that only needs to - // observe timestamps back to the last recorded resolved timestamp. - // This is safe because we know that there are no unresolved intents - // at times before a resolved timestamp. - // MinTimestampHint: r.ResolvedTimestamp, }) + return rangefeed.NewLegacyIntentScanner(iter) } + p.Start(r.store.Stopper(), rtsIter) // Register with the processor *before* we attach its reference to the