Skip to content

Commit

Permalink
Merge #111123
Browse files Browse the repository at this point in the history
111123: rangefeed: remove obsolete LegacyIntentScanner r=aliher1911 a=aliher1911

LegacyIntentScanner was only used in tests to prep data. This PR removes it and replaces its use with properly populated engine instead.

Epic: none
Fixes: #108278

Release note: None

Co-authored-by: Oleg Afanasyev <oleg@cockroachlabs.com>
  • Loading branch information
craig[bot] and aliher1911 committed Sep 25, 2023
2 parents 43326b2 + 083fa93 commit cb7c66f
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 195 deletions.
137 changes: 102 additions & 35 deletions pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,25 @@ func rangeFeedCheckpoint(span roachpb.Span, ts hlc.Timestamp) *kvpb.RangeFeedEve
})
}

type storeOp struct {
kv storage.MVCCKeyValue
txn *roachpb.Transaction
}

func makeTestEngineWithData(ops []storeOp) (storage.Engine, error) {
ctx := context.Background()
engine := storage.NewDefaultInMemForTesting()
for _, op := range ops {
kv := op.kv
err := storage.MVCCPut(ctx, engine, kv.Key.Key, kv.Key.Timestamp, roachpb.Value{RawBytes: kv.Value}, storage.MVCCWriteOptions{Txn: op.txn})
if err != nil {
engine.Close()
return nil, err
}
}
return engine, nil
}

const testProcessorEventCCap = 16
const testProcessorEventCTimeout = 10 * time.Millisecond

Expand Down Expand Up @@ -238,9 +257,13 @@ func withMetrics(m *Metrics) option {
}
}

func withRtsIter(rtsIter storage.SimpleMVCCIterator) option {
func withRtsScanner(scanner IntentScanner) option {
return func(config *testConfig) {
config.isc = makeIntentScannerConstructor(rtsIter)
if scanner != nil {
config.isc = func() IntentScanner {
return scanner
}
}
}
}

Expand Down Expand Up @@ -268,11 +291,52 @@ func withSpan(span roachpb.RSpan) option {
}
}

func makeIntentScannerConstructor(rtsIter storage.SimpleMVCCIterator) IntentScannerConstructor {
if rtsIter == nil {
return nil
// blockingScanner is a test intent scanner that allows test to track lifecycle
// of tasks.
// 1. it will always block on startup and will wait for block to be closed to
// proceed
// 2. when closed it will close done channel to signal completion
type blockingScanner struct {
wrapped IntentScanner

block chan interface{}
done chan interface{}
}

func (s *blockingScanner) ConsumeIntents(
ctx context.Context, startKey roachpb.Key, endKey roachpb.Key, consumer eventConsumer,
) error {
if s.block != nil {
select {
case <-s.block:
case <-ctx.Done():
return ctx.Err()
}
}
return func() IntentScanner { return NewLegacyIntentScanner(rtsIter) }
return s.wrapped.ConsumeIntents(ctx, startKey, endKey, consumer)
}

func (s *blockingScanner) Close() {
s.wrapped.Close()
close(s.done)
}

func makeIntentScanner(data []storeOp, span roachpb.RSpan) (*blockingScanner, func(), error) {
engine, err := makeTestEngineWithData(data)
if err != nil {
return nil, nil, err
}
scanner, err := NewSeparatedIntentScanner(engine, span)
if err != nil {
return nil, nil, err
}
return &blockingScanner{
wrapped: scanner,
block: make(chan interface{}),
done: make(chan interface{}),
}, func() {
engine.Close()
}, nil
}

func newTestProcessor(
Expand Down Expand Up @@ -828,32 +892,36 @@ func TestProcessorInitializeResolvedTimestamp(t *testing.T) {
defer leaktest.AfterTest(t)()

testutils.RunValues(t, "proc type", testTypes, func(t *testing.T, pt procType) {
txn1, txn2 := uuid.MakeV4(), uuid.MakeV4()
rtsIter := newTestIterator([]storage.MVCCKeyValue{
makeKV("a", "val1", 10),
makeIntent("c", txn1, "txnKey1", 15),
makeProvisionalKV("c", "txnKey1", 15),
makeKV("c", "val3", 11),
makeKV("c", "val4", 9),
makeIntent("d", txn2, "txnKey2", 21),
makeProvisionalKV("d", "txnKey2", 21),
makeKV("d", "val5", 20),
makeKV("d", "val6", 19),
makeKV("m", "val8", 1),
makeIntent("n", txn1, "txnKey1", 12),
makeProvisionalKV("n", "txnKey1", 12),
makeIntent("r", txn1, "txnKey1", 19),
makeProvisionalKV("r", "txnKey1", 19),
makeKV("r", "val9", 4),
makeIntent("w", txn1, "txnKey1", 3),
makeProvisionalKV("w", "txnKey1", 3),
makeIntent("z", txn2, "txnKey2", 21),
makeProvisionalKV("z", "txnKey2", 21),
makeKV("z", "val11", 4),
}, nil)
rtsIter.block = make(chan struct{})

p, h, stopper := newTestProcessor(t, withRtsIter(rtsIter), withProcType(pt))
txn1 := makeTxn("txn1", uuid.MakeV4(), isolation.Serializable, hlc.Timestamp{})
txn2 := makeTxn("txn2", uuid.MakeV4(), isolation.Serializable, hlc.Timestamp{})
txnWithTs := func(txn roachpb.Transaction, ts int64) *roachpb.Transaction {
txnTs := hlc.Timestamp{WallTime: ts}
txn.TxnMeta.MinTimestamp = txnTs
txn.TxnMeta.WriteTimestamp = txnTs
txn.ReadTimestamp = txnTs
return &txn
}
data := []storeOp{
{kv: makeKV("a", "val1", 10)},
{kv: makeKV("c", "val4", 9)},
{kv: makeKV("c", "val3", 11)},
{kv: makeProvisionalKV("c", "txnKey1", 15), txn: txnWithTs(txn1, 15)},
{kv: makeKV("d", "val6", 19)},
{kv: makeKV("d", "val5", 20)},
{kv: makeProvisionalKV("d", "txnKey2", 21), txn: txnWithTs(txn2, 21)},
{kv: makeKV("m", "val8", 1)},
{kv: makeProvisionalKV("n", "txnKey1", 12), txn: txnWithTs(txn1, 12)},
{kv: makeKV("r", "val9", 4)},
{kv: makeProvisionalKV("r", "txnKey1", 19), txn: txnWithTs(txn1, 19)},
{kv: makeProvisionalKV("w", "txnKey1", 3), txn: txnWithTs(txn1, 3)},
{kv: makeKV("z", "val11", 4)},
{kv: makeProvisionalKV("z", "txnKey2", 21), txn: txnWithTs(txn2, 21)},
}
scanner, cleanup, err := makeIntentScanner(data, roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("zz")})
require.NoError(t, err, "failed to prepare test data")
defer cleanup()

p, h, stopper := newTestProcessor(t, withRtsScanner(scanner), withProcType(pt))
ctx := context.Background()
defer stopper.Stop(ctx)

Expand Down Expand Up @@ -897,9 +965,8 @@ func TestProcessorInitializeResolvedTimestamp(t *testing.T) {
require.Equal(t, hlc.Timestamp{}, h.rts.Get())

// Let the scan proceed.
close(rtsIter.block)
<-rtsIter.done
require.True(t, rtsIter.closed)
close(scanner.block)
<-scanner.done

// Synchronize the event channel then verify that the resolved timestamp is
// initialized and that it's blocked on the oldest unresolved intent's txn
Expand Down
73 changes: 0 additions & 73 deletions pkg/kv/kvserver/rangefeed/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,79 +170,6 @@ func (s *SeparatedIntentScanner) ConsumeIntents(
// Close implements the IntentScanner interface.
func (s *SeparatedIntentScanner) Close() { s.iter.Close() }

// LegacyIntentScanner is an IntentScanner that assumes intents might
// not be separated.
//
// MVCCIterator Contract:
//
// The provided MVCCIterator must observe all intents in the Processor's keyspan.
// An important implication of this is that if the iterator is a
// TimeBoundIterator, its MinTimestamp cannot be above the keyspan's largest
// known resolved timestamp, if one has ever been recorded. If one has never
// been recorded, the TimeBoundIterator cannot have any lower bound.
//
// The LegacyIntentScanner is unused outside of tests and will be removed as
// part of #108278.
type LegacyIntentScanner struct {
iter storage.SimpleMVCCIterator
}

// NewLegacyIntentScanner returns an IntentScanner appropriate for use
// when the separated intents migration has not yet completed.
func NewLegacyIntentScanner(iter storage.SimpleMVCCIterator) IntentScanner {
return &LegacyIntentScanner{iter: iter}
}

// ConsumeIntents implements the IntentScanner interface.
func (l *LegacyIntentScanner) ConsumeIntents(
ctx context.Context, start roachpb.Key, end roachpb.Key, consumer eventConsumer,
) error {
startKey := storage.MakeMVCCMetadataKey(start)
endKey := storage.MakeMVCCMetadataKey(end)
// Iterate through all keys using NextKey. This will look at the first MVCC
// version for each key. We're only looking for MVCCMetadata versions, which
// will always be the first version of a key if it exists, so its fine that
// we skip over all other versions of keys.
var meta enginepb.MVCCMetadata
for l.iter.SeekGE(startKey); ; l.iter.NextKey() {
if ok, err := l.iter.Valid(); err != nil {
return err
} else if !ok || !l.iter.UnsafeKey().Less(endKey) {
break
}

// If the key is not a metadata key, ignore it.
unsafeKey := l.iter.UnsafeKey()
if unsafeKey.IsValue() {
continue
}

// Found a metadata key. Unmarshal.
v, err := l.iter.UnsafeValue()
if err != nil {
return err
}
if err := protoutil.Unmarshal(v, &meta); err != nil {
return errors.Wrapf(err, "unmarshaling mvcc meta: %v", unsafeKey)
}

// If this is an intent, inform the Processor.
if meta.Txn != nil {
consumer(enginepb.MVCCWriteIntentOp{
TxnID: meta.Txn.ID,
TxnKey: meta.Txn.Key,
TxnIsoLevel: meta.Txn.IsoLevel,
TxnMinTimestamp: meta.Txn.MinTimestamp,
Timestamp: meta.Txn.WriteTimestamp,
})
}
}
return nil
}

// Close implements the IntentScanner interface.
func (l *LegacyIntentScanner) Close() { l.iter.Close() }

// TxnPusher is capable of pushing transactions to a new timestamp and
// cleaning up the intents of transactions that are found to be committed.
type TxnPusher interface {
Expand Down
Loading

0 comments on commit cb7c66f

Please sign in to comment.