diff --git a/pkg/kv/kvserver/batch_spanset_test.go b/pkg/kv/kvserver/batch_spanset_test.go index f1bb127ec950..6709c1e241a7 100644 --- a/pkg/kv/kvserver/batch_spanset_test.go +++ b/pkg/kv/kvserver/batch_spanset_test.go @@ -543,7 +543,7 @@ func TestSpanSetMVCCResolveWriteIntentRange(t *testing.T) { Status: roachpb.PENDING, } if _, _, err := storage.MVCCResolveWriteIntentRange( - ctx, batch, nil /* ms */, intent, 0, + ctx, batch, nil /* ms */, intent, 0, false, ); err != nil { t.Fatal(err) } diff --git a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go index cfe507d7b448..f18854303717 100644 --- a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go +++ b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go @@ -492,8 +492,9 @@ func resolveLocalLocks( externalLocks = append(externalLocks, outSpans...) if inSpan != nil { update.Span = *inSpan + // TODO: don't hardwire onlySeparatedIntents=false num, resumeSpan, err := storage.MVCCResolveWriteIntentRange( - ctx, readWriter, ms, update, resolveAllowance) + ctx, readWriter, ms, update, resolveAllowance, false) if err != nil { return err } diff --git a/pkg/kv/kvserver/batcheval/cmd_resolve_intent_range.go b/pkg/kv/kvserver/batcheval/cmd_resolve_intent_range.go index edaf08d387ae..786cb1ea597b 100644 --- a/pkg/kv/kvserver/batcheval/cmd_resolve_intent_range.go +++ b/pkg/kv/kvserver/batcheval/cmd_resolve_intent_range.go @@ -44,8 +44,17 @@ func ResolveIntentRange( update := args.AsLockUpdate() + onlySeparatedIntents := false + stats := cArgs.EvalCtx.GetMVCCStats() + if stats.ContainsEstimates == 0 && stats.IntentCount == stats.SeparatedIntentCount { + // Stats incorrectness manifested as there being non-zero interleaved + // intents, can leave unresolved interleaved intents for a committed + // transaction whose transaction record is garbage collected (which would + // cause those intents to be incorrectly rolled back). + onlySeparatedIntents = true + } numKeys, resumeSpan, err := storage.MVCCResolveWriteIntentRange( - ctx, readWriter, ms, update, h.MaxSpanRequestKeys) + ctx, readWriter, ms, update, h.MaxSpanRequestKeys, onlySeparatedIntents) if err != nil { return result.Result{}, err } diff --git a/pkg/storage/bench_test.go b/pkg/storage/bench_test.go index 1db285c3a0d1..d70677163a4c 100644 --- a/pkg/storage/bench_test.go +++ b/pkg/storage/bench_test.go @@ -144,19 +144,34 @@ func BenchmarkExportToSst(b *testing.B) { const numIntentKeys = 1000 func setupKeysWithIntent( - b *testing.B, eng Engine, numVersions int, numFlushedVersions int, resolveAll bool, + b *testing.B, + eng Engine, + numVersions int, + numFlushedVersions int, + resolveAll bool, + lockUpdateTxnHasLatestVersionStride int, + resolveIntentForLatestVersionWhenNonLockUpdateTxn bool, ) roachpb.LockUpdate { txnIDCount := 2 * numVersions - val := []byte("value") - var lockUpdate roachpb.LockUpdate - for i := 1; i <= numVersions; i++ { + adjustTxnID := func(txnID int) int { // Assign txn IDs in a deterministic way that will mimic the end result of // random assignment -- the live intent is centered between dead intents, // when we have separated intents. - txnID := i - if i%2 == 0 { + if txnID%2 == 0 { txnID = txnIDCount - txnID } + return txnID + } + txnIDWithLatestVersion := adjustTxnID(numVersions) + otherTxnWithLatestVersion := txnIDCount + 2 + otherTxnUUID := uuid.FromUint128(uint128.FromInts(0, uint64(otherTxnWithLatestVersion))) + val := []byte("value") + var rvLockUpdate roachpb.LockUpdate + for i := 1; i <= numVersions; i++ { + // Assign txn IDs in a deterministic way that will mimic the end result of + // random assignment -- the live intent is centered between dead intents, + // when we have separated intents. + txnID := adjustTxnID(i) txnUUID := uuid.FromUint128(uint128.FromInts(0, uint64(txnID))) ts := hlc.Timestamp{WallTime: int64(i)} txn := roachpb.Transaction{ @@ -170,24 +185,62 @@ func setupKeysWithIntent( ReadTimestamp: ts, GlobalUncertaintyLimit: ts, } + lockUpdate := roachpb.LockUpdate{ + Txn: txn.TxnMeta, + Status: roachpb.COMMITTED, + } + var otherTxn roachpb.Transaction + var otherLockUpdate roachpb.LockUpdate + if txnID == txnIDWithLatestVersion { + rvLockUpdate = lockUpdate + if lockUpdateTxnHasLatestVersionStride != 1 { + otherTxn = roachpb.Transaction{ + TxnMeta: enginepb.TxnMeta{ + ID: otherTxnUUID, + Key: []byte("foo"), + WriteTimestamp: ts, + MinTimestamp: ts, + }, + Status: roachpb.PENDING, + ReadTimestamp: ts, + GlobalUncertaintyLimit: ts, + } + otherLockUpdate = roachpb.LockUpdate{ + Txn: otherTxn.TxnMeta, + Status: roachpb.COMMITTED, + } + } + } value := roachpb.Value{RawBytes: val} batch := eng.NewBatch() for j := 0; j < numIntentKeys; j++ { + putTxn := &txn + if txnID == txnIDWithLatestVersion && j%lockUpdateTxnHasLatestVersionStride != 0 { + putTxn = &otherTxn + } key := makeKey(nil, j) - require.NoError(b, MVCCPut(context.Background(), batch, nil, key, ts, value, &txn)) + require.NoError(b, MVCCPut(context.Background(), batch, nil, key, ts, value, putTxn)) } require.NoError(b, batch.Commit(true)) batch.Close() - lockUpdate = roachpb.LockUpdate{ - Txn: txn.TxnMeta, - Status: roachpb.COMMITTED, - } - if i < numVersions || resolveAll { + if i < numVersions || resolveAll || resolveIntentForLatestVersionWhenNonLockUpdateTxn { batch := eng.NewBatch() for j := 0; j < numIntentKeys; j++ { key := makeKey(nil, j) - lockUpdate.Key = key - found, err := MVCCResolveWriteIntent(context.Background(), batch, nil, lockUpdate) + lu := lockUpdate + latestVersionNonLockUpdateTxn := false + if txnID == txnIDWithLatestVersion && j%lockUpdateTxnHasLatestVersionStride != 0 { + lu = otherLockUpdate + latestVersionNonLockUpdateTxn = true + } + lu.Key = key + if i == numVersions && !resolveAll && !latestVersionNonLockUpdateTxn { + // Only here because of + // resolveIntentForLatestVersionWhenNonLockUpdateTxn, and this key + // is not one that should be resolved. + continue + } + found, err := MVCCResolveWriteIntent(context.Background(), batch, nil, lu) require.Equal(b, true, found) require.NoError(b, err) } @@ -198,7 +251,7 @@ func setupKeysWithIntent( require.NoError(b, eng.Flush()) } } - return lockUpdate + return rvLockUpdate } // BenchmarkIntentScan compares separated and interleaved intents, when @@ -216,7 +269,8 @@ func BenchmarkIntentScan(b *testing.B) { eng := setupMVCCInMemPebbleWithSettings( b, makeSettingsForSeparatedIntents(false, sep)) numFlushedVersions := (percentFlushed * numVersions) / 100 - setupKeysWithIntent(b, eng, numVersions, numFlushedVersions, false /* resolveAll */) + setupKeysWithIntent(b, eng, numVersions, numFlushedVersions, false, /* resolveAll */ + 1, false /* resolveIntentForLatestVersionWhenNotLockUpdate */) lower := makeKey(nil, 0) iter := eng.NewMVCCIterator(MVCCKeyAndIntentsIterKind, IterOptions{ LowerBound: lower, @@ -280,7 +334,8 @@ func BenchmarkScanAllIntentsResolved(b *testing.B) { eng := setupMVCCInMemPebbleWithSettings( b, makeSettingsForSeparatedIntents(false, sep)) numFlushedVersions := (percentFlushed * numVersions) / 100 - setupKeysWithIntent(b, eng, numVersions, numFlushedVersions, true /* resolveAll */) + setupKeysWithIntent(b, eng, numVersions, numFlushedVersions, true, /* resolveAll */ + 1, false /* resolveIntentForLatestVersionWhenNotLockUpdate */) lower := makeKey(nil, 0) var iter MVCCIterator var buf []byte @@ -351,7 +406,8 @@ func BenchmarkScanOneAllIntentsResolved(b *testing.B) { eng := setupMVCCInMemPebbleWithSettings( b, makeSettingsForSeparatedIntents(false, sep)) numFlushedVersions := (percentFlushed * numVersions) / 100 - setupKeysWithIntent(b, eng, numVersions, numFlushedVersions, true /* resolveAll */) + setupKeysWithIntent(b, eng, numVersions, numFlushedVersions, true, /* resolveAll */ + 1, false /* resolveIntentForLatestVersionWhenNotLockUpdate */) lower := makeKey(nil, 0) upper := makeKey(nil, numIntentKeys) buf := append([]byte(nil), lower...) @@ -403,7 +459,9 @@ func BenchmarkIntentResolution(b *testing.B) { eng := setupMVCCInMemPebbleWithSettings( b, makeSettingsForSeparatedIntents(false, sep)) numFlushedVersions := (percentFlushed * numVersions) / 100 - lockUpdate := setupKeysWithIntent(b, eng, numVersions, numFlushedVersions, false /* resolveAll */) + lockUpdate := setupKeysWithIntent(b, eng, numVersions, numFlushedVersions, + false /* resolveAll */, 1, + false /* resolveIntentForLatestVersionWhenNotLockUpdate */) keys := make([]roachpb.Key, numIntentKeys) for i := range keys { keys[i] = makeKey(nil, i) @@ -440,45 +498,63 @@ func BenchmarkIntentRangeResolution(b *testing.B) { for _, sep := range []bool{false, true} { b.Run(fmt.Sprintf("separated=%t", sep), func(b *testing.B) { - for _, numVersions := range []int{10, 100, 200, 400} { + for _, numVersions := range []int{10, 100, 400} { b.Run(fmt.Sprintf("versions=%d", numVersions), func(b *testing.B) { - for _, percentFlushed := range []int{0, 50, 80, 90, 100} { - b.Run(fmt.Sprintf("percent-flushed=%d", percentFlushed), func(b *testing.B) { - eng := setupMVCCInMemPebbleWithSettings( - b, makeSettingsForSeparatedIntents(false, sep)) - numFlushedVersions := (percentFlushed * numVersions) / 100 - lockUpdate := setupKeysWithIntent(b, eng, numVersions, numFlushedVersions, false /* resolveAll */) - keys := make([]roachpb.Key, numIntentKeys+1) - for i := range keys { - keys[i] = makeKey(nil, i) + for _, sparseness := range []int{1, 100, 1000} { + b.Run(fmt.Sprintf("sparseness=%d", sparseness), func(b *testing.B) { + otherTxnUnresolvedIntentsCases := []bool{false, true} + if sparseness == 1 { + // Every intent is owned by the main txn. + otherTxnUnresolvedIntentsCases = []bool{false} } - keys[numIntentKeys] = makeKey(nil, numIntentKeys) - batch := eng.NewBatch() - numKeysPerRange := 100 - numRanges := numIntentKeys / numKeysPerRange - b.ResetTimer() - for i := 0; i < b.N; i++ { - if i > 0 && i%numRanges == 0 { - // Wrapped around. - b.StopTimer() - batch.Close() - batch = eng.NewBatch() - b.StartTimer() - } - rangeNum := i % numRanges - lockUpdate.Key = keys[rangeNum*numKeysPerRange] - lockUpdate.EndKey = keys[(rangeNum+1)*numKeysPerRange] - resolved, span, err := MVCCResolveWriteIntentRange( - context.Background(), batch, nil, lockUpdate, 1000 /* max */) - if err != nil { - b.Fatal(err) - } - if resolved != int64(numKeysPerRange) { - b.Fatalf("expected to resolve %d, actual %d", numKeysPerRange, resolved) - } - if span != nil { - b.Fatal("unexpected resume span") - } + for _, haveOtherTxnUnresolvedIntents := range otherTxnUnresolvedIntentsCases { + b.Run(fmt.Sprintf("other-txn-intents=%t", haveOtherTxnUnresolvedIntents), func(b *testing.B) { + for _, percentFlushed := range []int{0, 50, 100} { + b.Run(fmt.Sprintf("percent-flushed=%d", percentFlushed), func(b *testing.B) { + eng := setupMVCCInMemPebbleWithSettings( + b, makeSettingsForSeparatedIntents(false, sep)) + numFlushedVersions := (percentFlushed * numVersions) / 100 + lockUpdate := setupKeysWithIntent(b, eng, numVersions, numFlushedVersions, + false /* resolveAll */, sparseness, !haveOtherTxnUnresolvedIntents) + keys := make([]roachpb.Key, numIntentKeys+1) + for i := range keys { + keys[i] = makeKey(nil, i) + } + batch := eng.NewBatch() + numKeysPerRange := 100 + numRanges := numIntentKeys / numKeysPerRange + var resolvedCount int64 + expectedResolvedCount := int64(numIntentKeys / sparseness) + b.ResetTimer() + for i := 0; i < b.N; i++ { + if i > 0 && i%numRanges == 0 { + // Wrapped around. + b.StopTimer() + if resolvedCount != expectedResolvedCount { + b.Fatalf("expected to resolve %d, actual %d", + expectedResolvedCount, resolvedCount) + } + resolvedCount = 0 + batch.Close() + batch = eng.NewBatch() + b.StartTimer() + } + rangeNum := i % numRanges + lockUpdate.Key = keys[rangeNum*numKeysPerRange] + lockUpdate.EndKey = keys[(rangeNum+1)*numKeysPerRange] + resolved, span, err := MVCCResolveWriteIntentRange( + context.Background(), batch, nil, lockUpdate, 1000 /* max */, sep) + if err != nil { + b.Fatal(err) + } + resolvedCount += resolved + if span != nil { + b.Fatal("unexpected resume span") + } + } + }) + } + }) } }) } diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index d7a5f1396591..aa35eda0bd2d 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -2759,13 +2759,155 @@ func unsafeNextVersion(iter MVCCIterator, latestKey MVCCKey) (MVCCKey, []byte, b return unsafeKey, iter.UnsafeValue(), true, nil } +// iterForKeyVersions provides a subset of the functionality of MVCCIterator. +// The expected use-case is when the iter is already positioned at the intent +// (if one exists) for a particular key, or some version, and positioning +// operators like SeekGE, Next are only being used to find other versions for +// that key, and never to find intents for other keys. A full-fledged +// MVCCIterator can be used here. The methods below have the same behavior as +// in MVCCIterator, but the caller should never call SeekGE with an empty +// MVCCKey.Timestamp. Additionally, Next must be preceded by at least one call +// to SeekGE. +type iterForKeyVersions interface { + Valid() (bool, error) + SeekGE(key MVCCKey) + Next() + UnsafeKey() MVCCKey + UnsafeValue() []byte + ValueProto(msg protoutil.Message) error + IsCurIntentSeparated() bool +} + +// separatedIntentAndVersionIter is an implementation of iterForKeyVersions +// used for ranged intent resolution. The MVCCIterator used by it is of +// MVCCKeyIterKind. The caller attempting to do ranged intent resolution uses +// seekEngineKey, nextEngineKey to iterate over the lock table, and for each +// lock/intent that needs to be resolved passes this iterator to +// mvccResolveWriteIntent. The MVCCIterator is positioned lazily, only if +// needed -- the fast path for intent resolution when a transaction is +// committing does not need to position the MVCCIterator. +type separatedIntentAndVersionIter struct { + engineIter EngineIterator + mvccIter MVCCIterator + + // Already parsed meta, when the starting position is at an intent. + meta *enginepb.MVCCMetadata + atMVCCIter bool + engineIterValid bool + engineIterErr error + intentKey roachpb.Key +} + +var _ iterForKeyVersions = &separatedIntentAndVersionIter{} + +func (s *separatedIntentAndVersionIter) seekEngineKeyGE(key EngineKey) { + s.atMVCCIter = false + s.meta = nil + s.engineIterValid, s.engineIterErr = s.engineIter.SeekEngineKeyGE(key) + if s.engineIterValid { + engineKey, err := s.engineIter.UnsafeEngineKey() + if err != nil { + s.engineIterErr = err + s.engineIterValid = false + return + } + if s.intentKey, err = keys.DecodeLockTableSingleKey(engineKey.Key); err != nil { + s.engineIterErr = err + s.engineIterValid = false + return + } + } +} + +func (s *separatedIntentAndVersionIter) nextEngineKey() { + s.atMVCCIter = false + s.meta = nil + s.engineIterValid, s.engineIterErr = s.engineIter.NextEngineKey() +} + +func (s *separatedIntentAndVersionIter) Valid() (bool, error) { + if s.atMVCCIter { + return s.mvccIter.Valid() + } + return s.engineIterValid, s.engineIterErr +} + +func (s *separatedIntentAndVersionIter) SeekGE(key MVCCKey) { + if !key.IsValue() { + panic(errors.AssertionFailedf("SeekGE only permitted for values")) + } + s.mvccIter.SeekGE(key) + s.atMVCCIter = true +} + +func (s *separatedIntentAndVersionIter) Next() { + if !s.atMVCCIter { + panic(errors.AssertionFailedf("Next not preceded by SeekGE")) + } + s.mvccIter.Next() +} + +func (s *separatedIntentAndVersionIter) UnsafeKey() MVCCKey { + if s.atMVCCIter { + return s.mvccIter.UnsafeKey() + } + return MVCCKey{Key: s.intentKey} +} + +func (s *separatedIntentAndVersionIter) UnsafeValue() []byte { + if s.atMVCCIter { + return s.mvccIter.UnsafeValue() + } + return s.engineIter.UnsafeValue() +} + +func (s *separatedIntentAndVersionIter) ValueProto(msg protoutil.Message) error { + if s.atMVCCIter { + return s.mvccIter.ValueProto(msg) + } + meta, ok := msg.(*enginepb.MVCCMetadata) + if ok && meta == s.meta { + // Already parsed. + return nil + } + v := s.engineIter.UnsafeValue() + return protoutil.Unmarshal(v, msg) +} + +func (s *separatedIntentAndVersionIter) IsCurIntentSeparated() bool { + if s.atMVCCIter { + panic(errors.AssertionFailedf("IsCurIntentSeparated called when not positioned at intent")) + } + return true +} + +func mvccGetIntent( + iter iterForKeyVersions, metaKey MVCCKey, meta *enginepb.MVCCMetadata, +) (ok bool, isSeparated bool, keyBytes, valBytes int64, err error) { + if ok, err := iter.Valid(); !ok { + return false, false, 0, 0, err + } + unsafeKey := iter.UnsafeKey() + if !unsafeKey.Key.Equal(metaKey.Key) { + return false, false, 0, 0, nil + } + if unsafeKey.IsValue() { + return false, false, 0, 0, nil + } + if err := iter.ValueProto(meta); err != nil { + return false, false, 0, 0, err + } + return true, iter.IsCurIntentSeparated(), int64(unsafeKey.EncodedSize()), + int64(len(iter.UnsafeValue())), nil +} + // mvccResolveWriteIntent is the core logic for resolving an intent. // REQUIRES: iter is already seeked to intent.Key. // Returns whether an intent was found and resolved, false otherwise. func mvccResolveWriteIntent( ctx context.Context, rw ReadWriter, - iter MVCCIterator, + iter iterForKeyVersions, ms *enginepb.MVCCStats, intent roachpb.LockUpdate, buf *putBuffer, @@ -2773,7 +2915,7 @@ func mvccResolveWriteIntent( metaKey := MakeMVCCMetadataKey(intent.Key) meta := &buf.meta ok, isIntentSeparated, origMetaKeySize, origMetaValSize, err := - mvccGetMetadata(iter, metaKey, true /* iterAlreadyPositioned */, meta) + mvccGetIntent(iter, metaKey, meta) if err != nil { return false, err } @@ -3005,11 +3147,28 @@ func mvccResolveWriteIntent( Key: intent.Key, }) - unsafeNextKey, unsafeNextValue, ok, err := unsafeNextVersion(iter, latestKey) - if err != nil { - return false, err + nextKey := latestKey.Next() + ok = false + var unsafeNextKey MVCCKey + var unsafeNextValue []byte + if nextKey.IsValue() { + iter.SeekGE(nextKey) + ok, err = iter.Valid() + if err != nil { + return false, err + } + if ok && iter.UnsafeKey().Key.Equal(latestKey.Key) { + unsafeNextKey = iter.UnsafeKey() + if !unsafeNextKey.IsValue() { + return false, errors.Errorf("expected an MVCC value key: %s", unsafeNextKey) + } + unsafeNextValue = iter.UnsafeValue() + } else { + ok = false + } + iter = nil // prevent accidental use below } - iter = nil // prevent accidental use below + // Else stepped to next key, so !ok if !ok { // If there is no other version, we should just clean up the key entirely. @@ -3134,43 +3293,101 @@ func (b IterAndBuf) Cleanup() { // intent span as the resume span. Returns the number of intents resolved and a // resume span if the max keys limit was exceeded. func MVCCResolveWriteIntentRange( - ctx context.Context, rw ReadWriter, ms *enginepb.MVCCStats, intent roachpb.LockUpdate, max int64, + ctx context.Context, + rw ReadWriter, + ms *enginepb.MVCCStats, + intent roachpb.LockUpdate, + max int64, + onlySeparatedIntents bool, ) (int64, *roachpb.Span, error) { if max < 0 { resumeSpan := intent.Span // don't inline or `intent` would escape to heap return 0, &resumeSpan, nil } - iterAndBuf := GetIterAndBuf(rw, IterOptions{UpperBound: intent.EndKey}) - defer iterAndBuf.Cleanup() - - encKey := MakeMVCCMetadataKey(intent.Key) - encEndKey := MakeMVCCMetadataKey(intent.EndKey) - nextKey := encKey + var putBuf *putBuffer + // Exactly one of sepIter and mvccIter is non-nil. + var sepIter *separatedIntentAndVersionIter + var mvccIter MVCCIterator + var iter iterForKeyVersions + if onlySeparatedIntents { + ltStart, _ := keys.LockTableSingleKey(intent.Key, nil) + ltEnd, _ := keys.LockTableSingleKey(intent.EndKey, nil) + engineIter := rw.NewEngineIterator(IterOptions{LowerBound: ltStart, UpperBound: ltEnd}) + iterAndBuf := + GetBufUsingIter(rw.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: intent.EndKey})) + defer func() { + engineIter.Close() + iterAndBuf.Cleanup() + }() + putBuf = iterAndBuf.buf + sepIter = &separatedIntentAndVersionIter{ + engineIter: engineIter, + mvccIter: iterAndBuf.iter, + } + iter = sepIter + sepIter.seekEngineKeyGE(EngineKey{Key: ltStart}) + } else { + iterAndBuf := GetIterAndBuf(rw, IterOptions{UpperBound: intent.EndKey}) + defer iterAndBuf.Cleanup() + putBuf = iterAndBuf.buf + mvccIter = iterAndBuf.iter + iter = mvccIter + } + nextKey := MakeMVCCMetadataKey(intent.Key) + intentEndKey := intent.EndKey + intent.EndKey = nil var keyBuf []byte num := int64(0) - intent.EndKey = nil - for { if max > 0 && num == max { - return num, &roachpb.Span{Key: nextKey.Key, EndKey: encEndKey.Key}, nil + return num, &roachpb.Span{Key: nextKey.Key, EndKey: intentEndKey}, nil } - - iterAndBuf.iter.SeekGE(nextKey) - if ok, err := iterAndBuf.iter.Valid(); err != nil { - return 0, nil, err - } else if !ok || !iterAndBuf.iter.UnsafeKey().Less(encEndKey) { - // No more keys exists in the given range. - break + var key MVCCKey + if sepIter != nil { + if valid, err := sepIter.Valid(); err != nil { + return 0, nil, err + } else if !valid { + // No more intents in the given range. + break + } + meta := &putBuf.meta + if err := sepIter.ValueProto(meta); err != nil { + return 0, nil, err + } + if meta.Txn == nil { + return 0, nil, errors.Errorf("intent with no txn") + } + if intent.Txn.ID == meta.Txn.ID { + // Stash the parsed meta so don't need to parse it again in + // mvccResolveWriteIntent. This parsing can be ~10% of the + // resolution cost in some benchmarks. + sepIter.meta = meta + // Manually copy the underlying bytes of the unsafe key. This + // construction reuses keyBuf across iterations. + key = sepIter.UnsafeKey() + keyBuf = append(keyBuf[:0], key.Key...) + key.Key = keyBuf + } else { + sepIter.nextEngineKey() + continue + } + } else { + mvccIter.SeekGE(nextKey) + if valid, err := mvccIter.Valid(); err != nil { + return 0, nil, err + } else if !valid { + // No more keys exists in the given range. + break + } + key = mvccIter.UnsafeKey() + // Manually copy the underlying bytes of the unsafe key. This + // construction reuses keyBuf across iterations. + keyBuf = append(keyBuf[:0], key.Key...) + key.Key = keyBuf } - // Manually copy the underlying bytes of the unsafe key. This construction - // reuses keyBuf across iterations. - key := iterAndBuf.iter.UnsafeKey() - keyBuf = append(keyBuf[:0], key.Key...) - key.Key = keyBuf - var err error var ok bool if !key.IsValue() { @@ -3179,7 +3396,7 @@ func MVCCResolveWriteIntentRange( // that is owned by this txn (if there is any such intent). The logic in // this function will need to be adjusted. intent.Key = key.Key - ok, err = mvccResolveWriteIntent(ctx, rw, iterAndBuf.iter, ms, intent, iterAndBuf.buf) + ok, err = mvccResolveWriteIntent(ctx, rw, iter, ms, intent, putBuf) } if err != nil { log.Warningf(ctx, "failed to resolve intent for key %q: %+v", key.Key, err) @@ -3187,15 +3404,18 @@ func MVCCResolveWriteIntentRange( num++ } + if sepIter != nil { + sepIter.nextEngineKey() + // We could also compute a tighter nextKey here if we wanted to. + } // nextKey is already a metadata key... nextKey.Key = key.Key.Next() - if !nextKey.Less(encEndKey) { + if nextKey.Key.Compare(intentEndKey) >= 0 { // ... but we don't want to Seek to a key outside of the range as we validate // those span accesses (see TestSpanSetMVCCResolveWriteIntentRangeUsingIter). break } } - return num, nil, nil } diff --git a/pkg/storage/mvcc_logical_ops_test.go b/pkg/storage/mvcc_logical_ops_test.go index 40f13d7a4ead..87cf66a81a55 100644 --- a/pkg/storage/mvcc_logical_ops_test.go +++ b/pkg/storage/mvcc_logical_ops_test.go @@ -79,14 +79,14 @@ func TestMVCCOpLogWriter(t *testing.T) { roachpb.MakeLockUpdate( &txn1CommitTS, roachpb.Span{Key: testKey1, EndKey: testKey2.Next()}), - math.MaxInt64); err != nil { + math.MaxInt64, false); err != nil { t.Fatal(err) } if _, _, err := MVCCResolveWriteIntentRange(ctx, ol, nil, roachpb.MakeLockUpdate( &txn1CommitTS, roachpb.Span{Key: localKey, EndKey: localKey.Next()}), - math.MaxInt64); err != nil { + math.MaxInt64, false); err != nil { t.Fatal(err) } diff --git a/pkg/storage/mvcc_stats_test.go b/pkg/storage/mvcc_stats_test.go index 147cacbafd25..ab4e147580ad 100644 --- a/pkg/storage/mvcc_stats_test.go +++ b/pkg/storage/mvcc_stats_test.go @@ -1614,7 +1614,7 @@ func TestMVCCStatsRandomized(t *testing.T) { } else { max := s.rng.Int63n(5) desc += fmt.Sprintf(", max=%d", max) - if _, _, err := MVCCResolveWriteIntentRange(ctx, s.eng, s.MS, s.intentRange(status), max); err != nil { + if _, _, err := MVCCResolveWriteIntentRange(ctx, s.eng, s.MS, s.intentRange(status), max, false); err != nil { return desc + ": " + err.Error() } } diff --git a/pkg/storage/mvcc_test.go b/pkg/storage/mvcc_test.go index 224ad408d934..1ca7dd101bda 100644 --- a/pkg/storage/mvcc_test.go +++ b/pkg/storage/mvcc_test.go @@ -3693,7 +3693,7 @@ func TestMVCCResolveWithDiffEpochs(t *testing.T) { t.Fatal(err) } num, _, err := MVCCResolveWriteIntentRange(ctx, engine, nil, - roachpb.MakeLockUpdate(txn1e2Commit, roachpb.Span{Key: testKey1, EndKey: testKey2.Next()}), 2) + roachpb.MakeLockUpdate(txn1e2Commit, roachpb.Span{Key: testKey1, EndKey: testKey2.Next()}), 2, false) if err != nil { t.Fatal(err) } @@ -3894,7 +3894,7 @@ func TestMVCCResolveTxnRange(t *testing.T) { num, resumeSpan, err := MVCCResolveWriteIntentRange(ctx, engine, nil, roachpb.MakeLockUpdate(txn1Commit, roachpb.Span{Key: testKey1, EndKey: testKey4.Next()}), - math.MaxInt64) + math.MaxInt64, false) if err != nil { t.Fatal(err) } @@ -3979,7 +3979,7 @@ func TestMVCCResolveTxnRangeResume(t *testing.T) { // Resolve up to 5 intents. num, resumeSpan, err := MVCCResolveWriteIntentRange(ctx, engine, nil, roachpb.MakeLockUpdate(txn1Commit, roachpb.Span{Key: roachpb.Key("00"), EndKey: roachpb.Key("30")}), - 5) + 5, false) if err != nil { t.Fatal(err) }