Skip to content

Commit

Permalink
mdbx: race conditions in MdbxKV.Close (#8409)
Browse files Browse the repository at this point in the history
In the previous code WaitGroup db.wg.Add(), Wait() and db.closed were not treated in sync.
In particular, it was theoretically possible to first check closed, then set closed and Wait, and then call wg.Add() while waiting
(leading to WaitGroup panic).
In theory it was also possible that db.env.BeginTxn() is called on a closed or nil db.env,
because db.wg.Add() was called only after BeginTxn (db.wg.Wait() could already return).

WaitGroup is replaced with a Cond variable.
Now it is not possible to increase the active transactions count on a closed database.
It is also not possible to call BeginTxn on a closed database.
  • Loading branch information
battlmonstr committed Jan 16, 2024
1 parent e979d79 commit 018092c
Showing 1 changed file with 61 additions and 13 deletions.
74 changes: 61 additions & 13 deletions erigon-lib/kv/mdbx/kv_mdbx.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/c2h5oh/datasize"
"github.com/erigontech/mdbx-go/mdbx"
stack2 "github.com/go-stack/stack"
"github.com/ledgerwatch/erigon-lib/mmap"
"github.com/ledgerwatch/log/v3"
"golang.org/x/exp/maps"
"golang.org/x/sync/semaphore"
Expand All @@ -44,6 +43,7 @@ import (
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/iter"
"github.com/ledgerwatch/erigon-lib/kv/order"
"github.com/ledgerwatch/erigon-lib/mmap"
)

const NonExistingDBI kv.DBI = 999_999_999
Expand Down Expand Up @@ -385,15 +385,20 @@ func (opts MdbxOpts) Open(ctx context.Context) (kv.RwDB, error) {
targetSemCount := int64(runtime.GOMAXPROCS(-1) * 16)
opts.roTxsLimiter = semaphore.NewWeighted(targetSemCount) // 1 less than max to allow unlocking to happen
}

txsCountMutex := &sync.Mutex{}

db := &MdbxKV{
opts: opts,
env: env,
log: opts.log,
wg: &sync.WaitGroup{},
buckets: kv.TableCfg{},
txSize: dirtyPagesLimit * opts.pageSize,
roTxsLimiter: opts.roTxsLimiter,

txsCountMutex: txsCountMutex,
txsAllDoneOnCloseCond: sync.NewCond(txsCountMutex),

leakDetector: dbg.NewLeakDetector("db."+opts.label.String(), dbg.SlowTx()),
}

Expand Down Expand Up @@ -457,14 +462,17 @@ func (opts MdbxOpts) MustOpen() kv.RwDB {
type MdbxKV struct {
log log.Logger
env *mdbx.Env
wg *sync.WaitGroup
buckets kv.TableCfg
roTxsLimiter *semaphore.Weighted // does limit amount of concurrent Ro transactions - in most casess runtime.NumCPU() is good value for this channel capacity - this channel can be shared with other components (like Decompressor)
opts MdbxOpts
txSize uint64
closed atomic.Bool
path string

txsCount uint
txsCountMutex *sync.Mutex
txsAllDoneOnCloseCond *sync.Cond

leakDetector *dbg.LeakDetector
}

Expand Down Expand Up @@ -507,13 +515,49 @@ func (db *MdbxKV) openDBIs(buckets []string) error {
})
}

func (db *MdbxKV) trackTxBegin() bool {
db.txsCountMutex.Lock()
defer db.txsCountMutex.Unlock()

isOpen := !db.closed.Load()
if isOpen {
db.txsCount++
}
return isOpen
}

func (db *MdbxKV) trackTxEnd() {
db.txsCountMutex.Lock()
defer db.txsCountMutex.Unlock()

if db.txsCount > 0 {
db.txsCount--
} else {
panic("MdbxKV: unmatched trackTxEnd")
}

if (db.txsCount == 0) && db.closed.Load() {
db.txsAllDoneOnCloseCond.Signal()
}
}

func (db *MdbxKV) waitTxsAllDoneOnClose() {
db.txsCountMutex.Lock()
defer db.txsCountMutex.Unlock()

for (db.txsCount > 0) || !db.closed.Load() {
db.txsAllDoneOnCloseCond.Wait()
}
}

// Close closes db
// All transactions must be closed before closing the database.
func (db *MdbxKV) Close() {
if ok := db.closed.CompareAndSwap(false, true); !ok {
return
}
db.wg.Wait()
db.waitTxsAllDoneOnClose()

db.env.Close()
db.env = nil

Expand All @@ -526,10 +570,6 @@ func (db *MdbxKV) Close() {
}

func (db *MdbxKV) BeginRo(ctx context.Context) (txn kv.Tx, err error) {
if db.closed.Load() {
return nil, fmt.Errorf("db closed")
}

// don't try to acquire if the context is already done
select {
case <-ctx.Done():
Expand All @@ -538,8 +578,13 @@ func (db *MdbxKV) BeginRo(ctx context.Context) (txn kv.Tx, err error) {
// otherwise carry on
}

if !db.trackTxBegin() {
return nil, fmt.Errorf("db closed")
}

// will return nil err if context is cancelled (may appear to acquire the semaphore)
if semErr := db.roTxsLimiter.Acquire(ctx, 1); semErr != nil {
db.trackTxEnd()
return nil, semErr
}

Expand All @@ -548,14 +593,15 @@ func (db *MdbxKV) BeginRo(ctx context.Context) (txn kv.Tx, err error) {
// on error, or if there is whatever reason that we don't return a tx,
// we need to free up the limiter slot, otherwise it could lead to deadlocks
db.roTxsLimiter.Release(1)
db.trackTxEnd()
}
}()

tx, err := db.env.BeginTxn(nil, mdbx.Readonly)
if err != nil {
return nil, fmt.Errorf("%w, label: %s, trace: %s", err, db.opts.label.String(), stack2.Trace().String())
}
db.wg.Add(1)

return &MdbxTx{
ctx: ctx,
db: db,
Expand All @@ -579,16 +625,18 @@ func (db *MdbxKV) beginRw(ctx context.Context, flags uint) (txn kv.RwTx, err err
default:
}

if db.closed.Load() {
if !db.trackTxBegin() {
return nil, fmt.Errorf("db closed")
}

runtime.LockOSThread()
tx, err := db.env.BeginTxn(nil, flags)
if err != nil {
runtime.UnlockOSThread() // unlock only in case of error. normal flow is "defer .Rollback()"
db.trackTxEnd()
return nil, fmt.Errorf("%w, lable: %s, trace: %s", err, db.opts.label.String(), stack2.Trace().String())
}
db.wg.Add(1)

return &MdbxTx{
db: db,
tx: tx,
Expand Down Expand Up @@ -830,7 +878,7 @@ func (tx *MdbxTx) Commit() error {
}
defer func() {
tx.tx = nil
tx.db.wg.Done()
tx.db.trackTxEnd()
if tx.readOnly {
tx.db.roTxsLimiter.Release(1)
} else {
Expand Down Expand Up @@ -881,7 +929,7 @@ func (tx *MdbxTx) Rollback() {
}
defer func() {
tx.tx = nil
tx.db.wg.Done()
tx.db.trackTxEnd()
if tx.readOnly {
tx.db.roTxsLimiter.Release(1)
} else {
Expand Down

0 comments on commit 018092c

Please sign in to comment.