From 018092c443145c8d2b8556b26959b8945fd30885 Mon Sep 17 00:00:00 2001 From: battlmonstr Date: Tue, 16 Jan 2024 13:06:16 +0100 Subject: [PATCH] mdbx: race conditions in MdbxKV.Close (#8409) In the previous code WaitGroup db.wg.Add(), Wait() and db.closed were not treated in sync. In particular, it was theoretically possible to first check closed, then set closed and Wait, and then call wg.Add() while waiting (leading to WaitGroup panic). In theory it was also possible that db.env.BeginTxn() is called on a closed or nil db.env, because db.wg.Add() was called only after BeginTxn (db.wg.Wait() could already return). WaitGroup is replaced with a Cond variable. Now it is not possible to increase the active transactions count on a closed database. It is also not possible to call BeginTxn on a closed database. --- erigon-lib/kv/mdbx/kv_mdbx.go | 74 +++++++++++++++++++++++++++++------ 1 file changed, 61 insertions(+), 13 deletions(-) diff --git a/erigon-lib/kv/mdbx/kv_mdbx.go b/erigon-lib/kv/mdbx/kv_mdbx.go index cd465b593b9..62d4a940935 100644 --- a/erigon-lib/kv/mdbx/kv_mdbx.go +++ b/erigon-lib/kv/mdbx/kv_mdbx.go @@ -34,7 +34,6 @@ import ( "github.com/c2h5oh/datasize" "github.com/erigontech/mdbx-go/mdbx" stack2 "github.com/go-stack/stack" - "github.com/ledgerwatch/erigon-lib/mmap" "github.com/ledgerwatch/log/v3" "golang.org/x/exp/maps" "golang.org/x/sync/semaphore" @@ -44,6 +43,7 @@ import ( "github.com/ledgerwatch/erigon-lib/kv" "github.com/ledgerwatch/erigon-lib/kv/iter" "github.com/ledgerwatch/erigon-lib/kv/order" + "github.com/ledgerwatch/erigon-lib/mmap" ) const NonExistingDBI kv.DBI = 999_999_999 @@ -385,15 +385,20 @@ func (opts MdbxOpts) Open(ctx context.Context) (kv.RwDB, error) { targetSemCount := int64(runtime.GOMAXPROCS(-1) * 16) opts.roTxsLimiter = semaphore.NewWeighted(targetSemCount) // 1 less than max to allow unlocking to happen } + + txsCountMutex := &sync.Mutex{} + db := &MdbxKV{ opts: opts, env: env, log: opts.log, - wg: &sync.WaitGroup{}, buckets: kv.TableCfg{}, txSize: dirtyPagesLimit * opts.pageSize, roTxsLimiter: opts.roTxsLimiter, + txsCountMutex: txsCountMutex, + txsAllDoneOnCloseCond: sync.NewCond(txsCountMutex), + leakDetector: dbg.NewLeakDetector("db."+opts.label.String(), dbg.SlowTx()), } @@ -457,7 +462,6 @@ func (opts MdbxOpts) MustOpen() kv.RwDB { type MdbxKV struct { log log.Logger env *mdbx.Env - wg *sync.WaitGroup buckets kv.TableCfg roTxsLimiter *semaphore.Weighted // does limit amount of concurrent Ro transactions - in most casess runtime.NumCPU() is good value for this channel capacity - this channel can be shared with other components (like Decompressor) opts MdbxOpts @@ -465,6 +469,10 @@ type MdbxKV struct { closed atomic.Bool path string + txsCount uint + txsCountMutex *sync.Mutex + txsAllDoneOnCloseCond *sync.Cond + leakDetector *dbg.LeakDetector } @@ -507,13 +515,49 @@ func (db *MdbxKV) openDBIs(buckets []string) error { }) } +func (db *MdbxKV) trackTxBegin() bool { + db.txsCountMutex.Lock() + defer db.txsCountMutex.Unlock() + + isOpen := !db.closed.Load() + if isOpen { + db.txsCount++ + } + return isOpen +} + +func (db *MdbxKV) trackTxEnd() { + db.txsCountMutex.Lock() + defer db.txsCountMutex.Unlock() + + if db.txsCount > 0 { + db.txsCount-- + } else { + panic("MdbxKV: unmatched trackTxEnd") + } + + if (db.txsCount == 0) && db.closed.Load() { + db.txsAllDoneOnCloseCond.Signal() + } +} + +func (db *MdbxKV) waitTxsAllDoneOnClose() { + db.txsCountMutex.Lock() + defer db.txsCountMutex.Unlock() + + for (db.txsCount > 0) || !db.closed.Load() { + db.txsAllDoneOnCloseCond.Wait() + } +} + // Close closes db // All transactions must be closed before closing the database. func (db *MdbxKV) Close() { if ok := db.closed.CompareAndSwap(false, true); !ok { return } - db.wg.Wait() + db.waitTxsAllDoneOnClose() + db.env.Close() db.env = nil @@ -526,10 +570,6 @@ func (db *MdbxKV) Close() { } func (db *MdbxKV) BeginRo(ctx context.Context) (txn kv.Tx, err error) { - if db.closed.Load() { - return nil, fmt.Errorf("db closed") - } - // don't try to acquire if the context is already done select { case <-ctx.Done(): @@ -538,8 +578,13 @@ func (db *MdbxKV) BeginRo(ctx context.Context) (txn kv.Tx, err error) { // otherwise carry on } + if !db.trackTxBegin() { + return nil, fmt.Errorf("db closed") + } + // will return nil err if context is cancelled (may appear to acquire the semaphore) if semErr := db.roTxsLimiter.Acquire(ctx, 1); semErr != nil { + db.trackTxEnd() return nil, semErr } @@ -548,6 +593,7 @@ func (db *MdbxKV) BeginRo(ctx context.Context) (txn kv.Tx, err error) { // on error, or if there is whatever reason that we don't return a tx, // we need to free up the limiter slot, otherwise it could lead to deadlocks db.roTxsLimiter.Release(1) + db.trackTxEnd() } }() @@ -555,7 +601,7 @@ func (db *MdbxKV) BeginRo(ctx context.Context) (txn kv.Tx, err error) { if err != nil { return nil, fmt.Errorf("%w, label: %s, trace: %s", err, db.opts.label.String(), stack2.Trace().String()) } - db.wg.Add(1) + return &MdbxTx{ ctx: ctx, db: db, @@ -579,16 +625,18 @@ func (db *MdbxKV) beginRw(ctx context.Context, flags uint) (txn kv.RwTx, err err default: } - if db.closed.Load() { + if !db.trackTxBegin() { return nil, fmt.Errorf("db closed") } + runtime.LockOSThread() tx, err := db.env.BeginTxn(nil, flags) if err != nil { runtime.UnlockOSThread() // unlock only in case of error. normal flow is "defer .Rollback()" + db.trackTxEnd() return nil, fmt.Errorf("%w, lable: %s, trace: %s", err, db.opts.label.String(), stack2.Trace().String()) } - db.wg.Add(1) + return &MdbxTx{ db: db, tx: tx, @@ -830,7 +878,7 @@ func (tx *MdbxTx) Commit() error { } defer func() { tx.tx = nil - tx.db.wg.Done() + tx.db.trackTxEnd() if tx.readOnly { tx.db.roTxsLimiter.Release(1) } else { @@ -881,7 +929,7 @@ func (tx *MdbxTx) Rollback() { } defer func() { tx.tx = nil - tx.db.wg.Done() + tx.db.trackTxEnd() if tx.readOnly { tx.db.roTxsLimiter.Release(1) } else {