diff --git a/pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp.go b/pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp.go index 6b0aa088ea1d..07ca21fb2155 100644 --- a/pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp.go +++ b/pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp.go @@ -117,7 +117,7 @@ func computeMinIntentTimestamp( } engineKey, err := iter.EngineKey() if err != nil { - continue + return hlc.Timestamp{}, nil, err } lockedKey, err := keys.DecodeLockTableSingleKey(engineKey.Key) if err != nil { diff --git a/pkg/kv/kvserver/rangefeed/BUILD.bazel b/pkg/kv/kvserver/rangefeed/BUILD.bazel index 8b8f98449ebf..a019042dd6ce 100644 --- a/pkg/kv/kvserver/rangefeed/BUILD.bazel +++ b/pkg/kv/kvserver/rangefeed/BUILD.bazel @@ -14,6 +14,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed", visibility = ["//visibility:public"], deps = [ + "//pkg/keys", "//pkg/roachpb:with-mocks", "//pkg/storage", "//pkg/storage/enginepb", diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 20e34ca8f446..ba6526bf2e44 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -17,7 +17,6 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -178,9 +177,10 @@ func NewProcessor(cfg Config) *Processor { } } -// IteratorConstructor is used to construct an iterator. It should be called -// from underneath a stopper task to ensure that the engine has not been closed. -type IteratorConstructor func() storage.SimpleMVCCIterator +// IntentScannerConstructor is used to construct an IntentScanner. It +// should be called from underneath a stopper task to ensure that the +// engine has not been closed. +type IntentScannerConstructor func() IntentScanner // CatchUpIteratorConstructor is used to construct an iterator that // can be used for catchup-scans. It should be called from underneath @@ -198,7 +198,7 @@ type CatchUpIteratorConstructor func() *CatchUpIterator // calling its Close method when it is finished. If the iterator is nil then // no initialization scan will be performed and the resolved timestamp will // immediately be considered initialized. -func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IteratorConstructor) { +func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IntentScannerConstructor) { ctx := p.AnnotateCtx(context.Background()) if err := stopper.RunAsyncTask(ctx, "rangefeed.Processor", func(ctx context.Context) { p.run(ctx, p.RangeID, rtsIterFunc, stopper) @@ -213,7 +213,7 @@ func (p *Processor) Start(stopper *stop.Stopper, rtsIterFunc IteratorConstructor func (p *Processor) run( ctx context.Context, _forStacks roachpb.RangeID, - rtsIterFunc IteratorConstructor, + rtsIterFunc IntentScannerConstructor, stopper *stop.Stopper, ) { defer close(p.stoppedC) diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 8a76e54165dc..327a7b2d6219 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -154,15 +154,15 @@ func newTestProcessorWithTxnPusher( EventChanCap: testProcessorEventCCap, CheckStreamsInterval: 10 * time.Millisecond, }) - p.Start(stopper, makeIteratorConstructor(rtsIter)) + p.Start(stopper, makeIntentScannerConstructor(rtsIter)) return p, stopper } -func makeIteratorConstructor(rtsIter storage.SimpleMVCCIterator) IteratorConstructor { +func makeIntentScannerConstructor(rtsIter storage.SimpleMVCCIterator) IntentScannerConstructor { if rtsIter == nil { return nil } - return func() storage.SimpleMVCCIterator { return rtsIter } + return func() IntentScanner { return NewLegacyIntentScanner(rtsIter) } } func newTestProcessor(rtsIter storage.SimpleMVCCIterator) (*Processor, *stop.Stopper) { diff --git a/pkg/kv/kvserver/rangefeed/task.go b/pkg/kv/kvserver/rangefeed/task.go index 5981894bb9f9..a0a505342a58 100644 --- a/pkg/kv/kvserver/rangefeed/task.go +++ b/pkg/kv/kvserver/rangefeed/task.go @@ -13,6 +13,7 @@ package rangefeed import ( "context" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -37,20 +38,13 @@ type runnable interface { // The Processor can initialize its resolvedTimestamp once the scan completes // because it knows it is now tracking all intents in its key range. // -// MVCCIterator Contract: -// The provided MVCCIterator must observe all intents in the Processor's keyspan. -// An important implication of this is that if the iterator is a -// TimeBoundIterator, its MinTimestamp cannot be above the keyspan's largest -// known resolved timestamp, if one has ever been recorded. If one has never -// been recorded, the TimeBoundIterator cannot have any lower bound. -// type initResolvedTSScan struct { p *Processor - it storage.SimpleMVCCIterator + is IntentScanner } -func newInitResolvedTSScan(p *Processor, it storage.SimpleMVCCIterator) runnable { - return &initResolvedTSScan{p: p, it: it} +func newInitResolvedTSScan(p *Processor, c IntentScanner) runnable { + return &initResolvedTSScan{p: p, is: c} } func (s *initResolvedTSScan) Run(ctx context.Context) { @@ -66,50 +60,157 @@ func (s *initResolvedTSScan) Run(ctx context.Context) { } func (s *initResolvedTSScan) iterateAndConsume(ctx context.Context) error { - startKey := storage.MakeMVCCMetadataKey(s.p.Span.Key.AsRawKey()) - endKey := storage.MakeMVCCMetadataKey(s.p.Span.EndKey.AsRawKey()) + startKey := s.p.Span.Key.AsRawKey() + endKey := s.p.Span.EndKey.AsRawKey() + return s.is.ConsumeIntents(ctx, startKey, endKey, func(op enginepb.MVCCWriteIntentOp) bool { + var ops [1]enginepb.MVCCLogicalOp + ops[0].SetValue(&op) + return s.p.sendEvent(event{ops: ops[:]}, 0 /* timeout */) + }) +} + +func (s *initResolvedTSScan) Cancel() { + s.is.Close() +} + +type eventConsumer func(enginepb.MVCCWriteIntentOp) bool + +// IntentScanner is used by the ResolvedTSScan to find all intents on +// a range. +type IntentScanner interface { + // ConsumeIntents calls consumer on any intents found on keys between startKey and endKey. + ConsumeIntents(ctx context.Context, startKey roachpb.Key, endKey roachpb.Key, consumer eventConsumer) error + // Close closes the IntentScanner. + Close() +} + +// SeparatedIntentScanner is an IntentScanner that assumes that +// separated intents are in use. +// +// EngineIterator Contract: +// +// - The EngineIterator must have an UpperBound set. +// - The range must be using separated intents. +type SeparatedIntentScanner struct { + iter storage.EngineIterator +} + +// NewSeparatedIntentScanner returns an IntentScanner appropriate for +// use when the separated intents migration has completed. +func NewSeparatedIntentScanner(iter storage.EngineIterator) IntentScanner { + return &SeparatedIntentScanner{iter: iter} +} + +// ConsumeIntents implements the IntentScanner interface. +func (s *SeparatedIntentScanner) ConsumeIntents( + ctx context.Context, startKey roachpb.Key, _ roachpb.Key, consumer eventConsumer, +) error { + ltStart, _ := keys.LockTableSingleKey(startKey, nil) + var meta enginepb.MVCCMetadata + for valid, err := s.iter.SeekEngineKeyGE(storage.EngineKey{Key: ltStart}); ; valid, err = s.iter.NextEngineKey() { + if err != nil { + return err + } else if !valid { + // We depend on the iterator having an + // UpperBound set and becoming invalid when it + // hits the UpperBound. + break + } + + engineKey, err := s.iter.EngineKey() + if err != nil { + return err + } + lockedKey, err := keys.DecodeLockTableSingleKey(engineKey.Key) + if err != nil { + return errors.Wrapf(err, "decoding LockTable key: %s", lockedKey) + } + + if err := protoutil.Unmarshal(s.iter.UnsafeValue(), &meta); err != nil { + return errors.Wrapf(err, "unmarshaling mvcc meta for locked key %s", lockedKey) + } + if meta.Txn == nil { + return errors.Newf("expected transaction metadata but found none for %s", lockedKey) + } + + consumer(enginepb.MVCCWriteIntentOp{ + TxnID: meta.Txn.ID, + TxnKey: meta.Txn.Key, + TxnMinTimestamp: meta.Txn.MinTimestamp, + Timestamp: meta.Txn.WriteTimestamp, + }) + } + return nil +} + +// Close implements the IntentScanner interface. +func (s *SeparatedIntentScanner) Close() { s.iter.Close() } + +// LegacyIntentScanner is an IntentScanner that assumers intents might +// not be separated. +// +// MVCCIterator Contract: +// +// The provided MVCCIterator must observe all intents in the Processor's keyspan. +// An important implication of this is that if the iterator is a +// TimeBoundIterator, its MinTimestamp cannot be above the keyspan's largest +// known resolved timestamp, if one has ever been recorded. If one has never +// been recorded, the TimeBoundIterator cannot have any lower bound. +// +type LegacyIntentScanner struct { + iter storage.SimpleMVCCIterator +} + +// NewLegacyIntentScanner returns an IntentScanner appropriate for use +// when the separated intents migration has not yet completed. +func NewLegacyIntentScanner(iter storage.SimpleMVCCIterator) IntentScanner { + return &LegacyIntentScanner{iter: iter} +} +// ConsumeIntents implements the IntentScanner interface. +func (l *LegacyIntentScanner) ConsumeIntents( + ctx context.Context, start roachpb.Key, end roachpb.Key, consumer eventConsumer, +) error { + startKey := storage.MakeMVCCMetadataKey(start) + endKey := storage.MakeMVCCMetadataKey(end) // Iterate through all keys using NextKey. This will look at the first MVCC // version for each key. We're only looking for MVCCMetadata versions, which // will always be the first version of a key if it exists, so its fine that // we skip over all other versions of keys. var meta enginepb.MVCCMetadata - for s.it.SeekGE(startKey); ; s.it.NextKey() { - if ok, err := s.it.Valid(); err != nil { + for l.iter.SeekGE(startKey); ; l.iter.NextKey() { + if ok, err := l.iter.Valid(); err != nil { return err - } else if !ok || !s.it.UnsafeKey().Less(endKey) { + } else if !ok || !l.iter.UnsafeKey().Less(endKey) { break } // If the key is not a metadata key, ignore it. - unsafeKey := s.it.UnsafeKey() + unsafeKey := l.iter.UnsafeKey() if unsafeKey.IsValue() { continue } // Found a metadata key. Unmarshal. - if err := protoutil.Unmarshal(s.it.UnsafeValue(), &meta); err != nil { + if err := protoutil.Unmarshal(l.iter.UnsafeValue(), &meta); err != nil { return errors.Wrapf(err, "unmarshaling mvcc meta: %v", unsafeKey) } // If this is an intent, inform the Processor. if meta.Txn != nil { - var ops [1]enginepb.MVCCLogicalOp - ops[0].SetValue(&enginepb.MVCCWriteIntentOp{ + consumer(enginepb.MVCCWriteIntentOp{ TxnID: meta.Txn.ID, TxnKey: meta.Txn.Key, TxnMinTimestamp: meta.Txn.MinTimestamp, Timestamp: meta.Txn.WriteTimestamp, }) - s.p.sendEvent(event{ops: ops[:]}, 0 /* timeout */) } } return nil } -func (s *initResolvedTSScan) Cancel() { - s.it.Close() -} +// Close implements the IntentScanner interface. +func (l *LegacyIntentScanner) Close() { l.iter.Close() } // TxnPusher is capable of pushing transactions to a new timestamp and // cleaning up the intents of transactions that are found to be committed. diff --git a/pkg/kv/kvserver/rangefeed/task_test.go b/pkg/kv/kvserver/rangefeed/task_test.go index 1717e8bbaca3..5f7a99d0ec2c 100644 --- a/pkg/kv/kvserver/rangefeed/task_test.go +++ b/pkg/kv/kvserver/rangefeed/task_test.go @@ -15,6 +15,7 @@ import ( "sort" "testing" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -22,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -190,66 +192,158 @@ func (s *testIterator) curKV() storage.MVCCKeyValue { func TestInitResolvedTSScan(t *testing.T) { defer leaktest.AfterTest(t)() + startKey := roachpb.RKey("d") + endKey := roachpb.RKey("w") + + makeTxn := func(key string, id uuid.UUID, ts hlc.Timestamp) roachpb.Transaction { + txnMeta := enginepb.TxnMeta{ + Key: []byte(key), + ID: id, + Epoch: 1, + WriteTimestamp: ts, + MinTimestamp: ts, + } + return roachpb.Transaction{ + TxnMeta: txnMeta, + ReadTimestamp: ts, + } + } - // Mock processor. We just needs its eventC. - p := Processor{ - Config: Config{ - Span: roachpb.RSpan{ - Key: roachpb.RKey("d"), - EndKey: roachpb.RKey("w"), - }, - }, - eventC: make(chan *event, 100), + txn1ID := uuid.MakeV4() + txn1TS := hlc.Timestamp{WallTime: 15} + txn1Key := "txnKey1" + txn1 := makeTxn(txn1Key, txn1ID, txn1TS) + + txn2ID := uuid.MakeV4() + txn2TS := hlc.Timestamp{WallTime: 21} + txn2Key := "txnKey2" + txn2 := makeTxn(txn2Key, txn2ID, txn2TS) + + type op struct { + kv storage.MVCCKeyValue + txn *roachpb.Transaction } - // Run an init rts scan over a test iterator with the following keys. - txn1, txn2 := uuid.MakeV4(), uuid.MakeV4() - iter := newTestIterator([]storage.MVCCKeyValue{ - makeKV("a", "val1", 10), - makeInline("b", "val2"), - makeIntent("c", txn1, "txnKey1", 15), - makeProvisionalKV("c", "txnKey1", 15), - makeKV("c", "val3", 11), - makeKV("c", "val4", 9), - makeIntent("d", txn2, "txnKey2", 21), - makeProvisionalKV("d", "txnKey2", 21), - makeKV("d", "val5", 20), - makeKV("d", "val6", 19), - makeInline("g", "val7"), - makeKV("m", "val8", 1), - makeIntent("n", txn1, "txnKey1", 12), - makeProvisionalKV("n", "txnKey1", 12), - makeIntent("r", txn1, "txnKey1", 19), - makeProvisionalKV("r", "txnKey1", 19), - makeKV("r", "val9", 4), - makeIntent("w", txn1, "txnKey1", 3), - makeProvisionalKV("w", "txnKey1", 3), - makeInline("x", "val10"), - makeIntent("z", txn2, "txnKey2", 21), - makeProvisionalKV("z", "txnKey2", 21), - makeKV("z", "val11", 4), - }, nil) - - initScan := newInitResolvedTSScan(&p, iter) - initScan.Run(context.Background()) - require.True(t, iter.closed) + makeEngine := func(enableSeparatedIntents bool) storage.Engine { + ctx := context.Background() + engine := storage.NewInMemForTesting(enableSeparatedIntents) + testData := []op{ + {kv: makeKV("a", "val1", 10)}, + {kv: makeInline("b", "val2")}, + {kv: makeKV("c", "val4", 9)}, + {kv: makeKV("c", "val3", 11)}, + { + txn: &txn1, + kv: makeProvisionalKV("c", "txnKey1", 15), + }, + {kv: makeKV("d", "val6", 19)}, + {kv: makeKV("d", "val5", 20)}, + { + txn: &txn2, + kv: makeProvisionalKV("d", "txnKey2", 21), + }, + {kv: makeInline("g", "val7")}, + {kv: makeKV("m", "val8", 1)}, + { + txn: &txn1, + kv: makeProvisionalKV("n", "txnKey1", 15), + }, + {kv: makeKV("r", "val9", 4)}, + { + txn: &txn1, + kv: makeProvisionalKV("r", "txnKey1", 15), + }, + { + txn: &txn1, + kv: makeProvisionalKV("w", "txnKey1", 15), + }, + {kv: makeInline("x", "val10")}, + {kv: makeKV("z", "val11", 4)}, + { + txn: &txn2, + kv: makeProvisionalKV("z", "txnKey2", 21), + }, + } + for _, op := range testData { + kv := op.kv + err := storage.MVCCPut(ctx, engine, nil, kv.Key.Key, kv.Key.Timestamp, roachpb.Value{RawBytes: kv.Value}, op.txn) + require.NoError(t, err) + } + return engine + } - // Compare the event channel to the expected events. expEvents := []*event{ {ops: []enginepb.MVCCLogicalOp{ - writeIntentOpWithKey(txn2, []byte("txnKey2"), hlc.Timestamp{WallTime: 21}), + writeIntentOpWithKey(txn2ID, []byte("txnKey2"), hlc.Timestamp{WallTime: 21}), }}, {ops: []enginepb.MVCCLogicalOp{ - writeIntentOpWithKey(txn1, []byte("txnKey1"), hlc.Timestamp{WallTime: 12}), + writeIntentOpWithKey(txn1ID, []byte("txnKey1"), hlc.Timestamp{WallTime: 15}), }}, {ops: []enginepb.MVCCLogicalOp{ - writeIntentOpWithKey(txn1, []byte("txnKey1"), hlc.Timestamp{WallTime: 19}), + writeIntentOpWithKey(txn1ID, []byte("txnKey1"), hlc.Timestamp{WallTime: 15}), }}, {initRTS: true}, } - require.Equal(t, len(expEvents), len(p.eventC)) - for _, expEvent := range expEvents { - require.Equal(t, expEvent, <-p.eventC) + + testCases := map[string]struct { + intentScanner func() (IntentScanner, func()) + }{ + "legacy intent scanner": { + intentScanner: func() (IntentScanner, func()) { + engine := makeEngine(true) + iter := engine.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ + UpperBound: endKey.AsRawKey(), + }) + return NewLegacyIntentScanner(iter), func() { engine.Close() } + }, + }, + "legacy intent scanner with interleaved intents": { + intentScanner: func() (IntentScanner, func()) { + engine := makeEngine(false) + iter := engine.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ + UpperBound: endKey.AsRawKey(), + }) + return NewLegacyIntentScanner(iter), func() { engine.Close() } + }, + }, + "separated intent scanner": { + intentScanner: func() (IntentScanner, func()) { + engine := makeEngine(true) + require.True(t, engine.IsSeparatedIntentsEnabledForTesting(context.Background())) + lowerBound, _ := keys.LockTableSingleKey(startKey.AsRawKey(), nil) + upperBound, _ := keys.LockTableSingleKey(endKey.AsRawKey(), nil) + iter := engine.NewEngineIterator(storage.IterOptions{ + LowerBound: lowerBound, + UpperBound: upperBound, + }) + return NewSeparatedIntentScanner(iter), func() { engine.Close() } + }, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + // Mock processor. We just needs its eventC. + p := Processor{ + Config: Config{ + Span: roachpb.RSpan{ + Key: startKey, + EndKey: endKey, + }, + }, + eventC: make(chan *event, 100), + } + isc, cleanup := tc.intentScanner() + defer cleanup() + initScan := newInitResolvedTSScan(&p, isc) + initScan.Run(context.Background()) + // Compare the event channel to the expected events. + assert.Equal(t, len(expEvents), len(p.eventC)) + for _, expEvent := range expEvents { + assert.Equal(t, expEvent, <-p.eventC) + } + + }) } } diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index e780afbc5708..c9b3d3cc22ba 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -378,23 +378,31 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( p = rangefeed.NewProcessor(cfg) // Start it with an iterator to initialize the resolved timestamp. - rtsIter := func() storage.SimpleMVCCIterator { + rtsIter := func() rangefeed.IntentScanner { // Assert that we still hold the raftMu when this is called to ensure // that the rtsIter reads from the current snapshot. The replica // synchronizes with the rangefeed Processor calling this function by // waiting for the Register call below to return. r.raftMu.AssertHeld() - return r.Engine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ + + onlySeparatedIntents := r.store.cfg.Settings.Version.IsActive(ctx, clusterversion.PostSeparatedIntentsMigration) + + if onlySeparatedIntents { + lowerBound, _ := keys.LockTableSingleKey(desc.StartKey.AsRawKey(), nil) + upperBound, _ := keys.LockTableSingleKey(desc.EndKey.AsRawKey(), nil) + iter := r.Engine().NewEngineIterator(storage.IterOptions{ + LowerBound: lowerBound, + UpperBound: upperBound, + }) + return rangefeed.NewSeparatedIntentScanner(iter) + + } + iter := r.Engine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ UpperBound: desc.EndKey.AsRawKey(), - // TODO(nvanbenschoten): To facilitate fast restarts of rangefeed - // we should periodically persist the resolved timestamp so that we - // can initialize the rangefeed using an iterator that only needs to - // observe timestamps back to the last recorded resolved timestamp. - // This is safe because we know that there are no unresolved intents - // at times before a resolved timestamp. - // MinTimestampHint: r.ResolvedTimestamp, }) + return rangefeed.NewLegacyIntentScanner(iter) } + p.Start(r.store.Stopper(), rtsIter) // Register with the processor *before* we attach its reference to the