Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

memdb: prevent iterator invalidation #1563

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
4 changes: 2 additions & 2 deletions examples/gcworker/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand All @@ -30,7 +30,7 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd/client v0.0.0-20241220053006-461b86adc78d // indirect
github.com/tikv/pd/client v0.0.0-20250107032658-5c4ab57d68de // indirect
github.com/twmb/murmur3 v1.1.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
Expand Down
4 changes: 2 additions & 2 deletions examples/rawkv/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand All @@ -30,7 +30,7 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd/client v0.0.0-20241220053006-461b86adc78d // indirect
github.com/tikv/pd/client v0.0.0-20250107032658-5c4ab57d68de // indirect
github.com/twmb/murmur3 v1.1.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
Expand Down
4 changes: 2 additions & 2 deletions examples/txnkv/1pc_txn/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand All @@ -30,7 +30,7 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd/client v0.0.0-20241220053006-461b86adc78d // indirect
github.com/tikv/pd/client v0.0.0-20250107032658-5c4ab57d68de // indirect
github.com/twmb/murmur3 v1.1.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
Expand Down
4 changes: 2 additions & 2 deletions examples/txnkv/async_commit/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand All @@ -30,7 +30,7 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd/client v0.0.0-20241220053006-461b86adc78d // indirect
github.com/tikv/pd/client v0.0.0-20250107032658-5c4ab57d68de // indirect
github.com/twmb/murmur3 v1.1.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
Expand Down
4 changes: 2 additions & 2 deletions examples/txnkv/delete_range/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand All @@ -30,7 +30,7 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd/client v0.0.0-20241220053006-461b86adc78d // indirect
github.com/tikv/pd/client v0.0.0-20250107032658-5c4ab57d68de // indirect
github.com/twmb/murmur3 v1.1.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
Expand Down
4 changes: 2 additions & 2 deletions examples/txnkv/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand All @@ -30,7 +30,7 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd/client v0.0.0-20241220053006-461b86adc78d // indirect
github.com/tikv/pd/client v0.0.0-20250107032658-5c4ab57d68de // indirect
github.com/twmb/murmur3 v1.1.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
Expand Down
4 changes: 2 additions & 2 deletions examples/txnkv/pessimistic_txn/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand All @@ -30,7 +30,7 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd/client v0.0.0-20241220053006-461b86adc78d // indirect
github.com/tikv/pd/client v0.0.0-20250107032658-5c4ab57d68de // indirect
github.com/twmb/murmur3 v1.1.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
Expand Down
4 changes: 2 additions & 2 deletions examples/txnkv/unsafedestoryrange/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c // indirect
github.com/pingcap/failpoint v0.0.0-20220801062533-2eaa32854a6c // indirect
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect
github.com/pingcap/kvproto v0.0.0-20241120071417-b5b7843d9037 // indirect
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand All @@ -30,7 +30,7 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a // indirect
github.com/tikv/pd/client v0.0.0-20241220053006-461b86adc78d // indirect
github.com/tikv/pd/client v0.0.0-20250107032658-5c4ab57d68de // indirect
github.com/twmb/murmur3 v1.1.3 // indirect
go.etcd.io/etcd/api/v3 v3.5.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.10 // indirect
Expand Down
16 changes: 16 additions & 0 deletions internal/unionstore/arena/arena.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ import (
"encoding/binary"
"math"

"github.com/tikv/client-go/v2/internal/logutil"
"github.com/tikv/client-go/v2/kv"
"go.uber.org/atomic"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -223,6 +225,20 @@ func (cp *MemDBCheckpoint) IsSamePosition(other *MemDBCheckpoint) bool {
return cp.blocks == other.blocks && cp.offsetInBlock == other.offsetInBlock
}

// LessThan compares two checkpoints.
func (cp *MemDBCheckpoint) LessThan(cp2 *MemDBCheckpoint) bool {
ekexium marked this conversation as resolved.
Show resolved Hide resolved
if cp == nil || cp2 == nil {
logutil.BgLogger().Panic("unexpected nil checkpoint", zap.Any("cp", cp), zap.Any("cp2", cp2))
}
if cp.blocks < cp2.blocks {
return true
}
if cp.blocks == cp2.blocks && cp.offsetInBlock < cp2.offsetInBlock {
return true
}
return false
}

func (a *MemdbArena) Checkpoint() MemDBCheckpoint {
snap := MemDBCheckpoint{
blockSize: a.blockSize,
Expand Down
24 changes: 24 additions & 0 deletions internal/unionstore/art/art.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ type ART struct {
lastTraversedNode atomic.Uint64
hitCount atomic.Uint64
missCount atomic.Uint64

// The counter of every write operation, used to invalidate iterators that were created before the write operation.
// The purpose of the counter is to check interleaving of write and read operations (via iterator).
// It does not protect against data race. If it happens, there must be a bug in the caller code.
// invariant: no concurrent access to it
WriteSeqNo int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The WriteSeqNo seems not to have the concurrency invariance like SnapshotSeqNo below, should atomic variable be used for it?

Copy link
Contributor Author

@ekexium ekexium Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of the sequence number is to check interleaving read and write operations, but not concurrent access. Even if we change it to an atomic variable, concurrent read and write can still corrupt the iterator.
If there exists data race on this variable, there must be a bug on the caller side.
I've updated the comment to explain the rationale.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to sort out the current issues:

  1. memdb has concurrent usage scenarios, such as pessimistic lock setting key flags while membuf read is performing a memdb snapshot read.
  2. The underlying implementation structure of memdb does not support concurrency and needs to be properly used by the upper layer with mutex protection. This is the issue being addressed by the PR at executor: in-txn statement read from MemBuffer's snapshot pingcap/tidb#59219.
  3. The situation where the memdb iterator is invalidated by interleaving writes, which is the purpose of introducing a mechanism like write sequence in this PR.

Implementation level:

  1. lastTraversedNode, hitCount, and missCount, several internal states of ART, are implemented as atomic variables. These states are modified by write operations, and the WriteSeqNo below will also be modified by write operations, requiring the upper layer to handle concurrency correctly.

This part of the code implementation is confusing, should we make the internal states of ART to be single-threaded, while allowing concurrent operations and protection to be handled at the upper layer of MemDB? So there is no need to consider data races or introduce atomic variables in the ART code.
/cc @you06

Copy link
Contributor Author

@ekexium ekexium Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One difference between WriteSeqNo and lastTraversedNode is that the latter one is updated during read operations. Concurrent read operations are allowed (by design?). So even if we acquire an Rlock of memdb for every read operation lastTraversedNode still needs to be implemented by an atomic variable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the traverse in snapshot-get do not cache the last-traversed node(maybe it's not worth cache for a snapshot-get), I think the art.lastTraversedNode can also be a non-atomic variable, but hitCount and missCount should be atomic.

addr, lf := snap.tree.traverse(key, false)

// Increased by 1 when an operation that may affect the content returned by "snapshot" (i.e. stage[0]) happens.
// It's used to invalidate snapshot iterators.
// invariant: no concurrent access to it
SnapshotSeqNo int
}

func New() *ART {
Expand Down Expand Up @@ -115,6 +125,7 @@ func (t *ART) Set(key artKey, value []byte, ops ...kv.FlagsOp) error {
}
}

t.WriteSeqNo++
if len(t.stages) == 0 {
t.dirty = true
}
Expand Down Expand Up @@ -479,6 +490,10 @@ func (t *ART) RevertToCheckpoint(cp *arena.MemDBCheckpoint) {
t.allocator.vlogAllocator.RevertToCheckpoint(t, cp)
t.allocator.vlogAllocator.Truncate(cp)
t.allocator.vlogAllocator.OnMemChange()
t.WriteSeqNo++
if len(t.stages) == 0 || t.stages[0].LessThan(cp) {
t.SnapshotSeqNo++
}
}

func (t *ART) Stages() []arena.MemDBCheckpoint {
Expand All @@ -498,7 +513,9 @@ func (t *ART) Release(h int) {
if h != len(t.stages) {
panic("cannot release staging buffer")
}
t.WriteSeqNo++
if h == 1 {
t.SnapshotSeqNo++
tail := t.checkpoint()
if !t.stages[0].IsSamePosition(&tail) {
t.dirty = true
Expand All @@ -519,6 +536,11 @@ func (t *ART) Cleanup(h int) {
panic(fmt.Sprintf("cannot cleanup staging buffer, h=%v, len(tree.stages)=%v", h, len(t.stages)))
}

t.WriteSeqNo++
if h == 1 {
t.SnapshotSeqNo++
}

cp := &t.stages[h-1]
if !t.vlogInvalid {
curr := t.checkpoint()
Expand All @@ -542,6 +564,8 @@ func (t *ART) Reset() {
t.allocator.nodeAllocator.Reset()
t.allocator.vlogAllocator.Reset()
t.lastTraversedNode.Store(arena.NullU64Addr)
t.SnapshotSeqNo++
t.WriteSeqNo++
}

// DiscardValues releases the memory used by all values.
Expand Down
40 changes: 37 additions & 3 deletions internal/unionstore/art/art_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (
"fmt"
"sort"

"github.com/tikv/client-go/v2/internal/logutil"
"go.uber.org/zap"

"github.com/pkg/errors"
"github.com/tikv/client-go/v2/internal/unionstore/arena"
"github.com/tikv/client-go/v2/kv"
Expand Down Expand Up @@ -56,6 +59,7 @@ func (t *ART) iter(lowerBound, upperBound []byte, reverse, includeFlags bool) (*
// this avoids the initial value of currAddr equals to endAddr.
currAddr: arena.BadAddr,
endAddr: arena.NullAddr,
seqNo: t.WriteSeqNo,
}
it.init(lowerBound, upperBound)
if !it.valid {
Expand All @@ -76,12 +80,41 @@ type Iterator struct {
currLeaf *artLeaf
currAddr arena.MemdbArenaAddr
endAddr arena.MemdbArenaAddr

// only when seqNo == art.seqNo, the iterator is valid.
seqNo int
// ignoreSeqNo is used to ignore the seqNo check, used for snapshot iter before its full deprecation.
ignoreSeqNo bool
}

func (it *Iterator) checkSeqNo() {
if it.seqNo != it.tree.WriteSeqNo && !it.ignoreSeqNo {
logutil.BgLogger().Panic(
"seqNo mismatch",
zap.Int("it seqNo", it.seqNo),
zap.Int("art seqNo", it.tree.WriteSeqNo),
zap.Stack("stack"),
)
}
}

func (it *Iterator) Valid() bool {
it.checkSeqNo()
return it.valid
}

func (it *Iterator) Key() []byte {
it.checkSeqNo()
return it.currLeaf.GetKey()
}

func (it *Iterator) Flags() kv.KeyFlags {
it.checkSeqNo()
return it.currLeaf.GetKeyFlags()
}

func (it *Iterator) Valid() bool { return it.valid }
func (it *Iterator) Key() []byte { return it.currLeaf.GetKey() }
func (it *Iterator) Flags() kv.KeyFlags { return it.currLeaf.GetKeyFlags() }
func (it *Iterator) Value() []byte {
it.checkSeqNo()
if it.currLeaf.vLogAddr.IsNull() {
return nil
}
Expand All @@ -102,6 +135,7 @@ func (it *Iterator) Next() error {
// iterate is finished
return errors.New("Art: iterator is finished")
}
it.checkSeqNo()
if it.currAddr == it.endAddr {
it.valid = false
return nil
Expand Down
3 changes: 3 additions & 0 deletions internal/unionstore/art/art_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/tikv/client-go/v2/internal/unionstore/arena"
)

// getSnapshot returns the "snapshot" for snapshotGetter or snapshotIterator, which is usually the snapshot
// of stage[0]
func (t *ART) getSnapshot() arena.MemDBCheckpoint {
if len(t.stages) > 0 {
return t.stages[0]
Expand Down Expand Up @@ -49,6 +51,7 @@ func (t *ART) newSnapshotIterator(start, end []byte, desc bool) *SnapIter {
if err != nil {
panic(err)
}
inner.ignoreSeqNo = true
it := &SnapIter{
Iterator: inner,
cp: t.getSnapshot(),
Expand Down
Loading
Loading