Skip to content

Commit

Permalink
[TxPool] Reduce lock time in resetAccounts (#459)
Browse files Browse the repository at this point in the history
* implement reset() in account and refactor resetAccounts()

* txpool_test.go: send txs in current goroutine
  • Loading branch information
dbrajovic authored Mar 30, 2022
1 parent 6c8db85 commit cc56e29
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 79 deletions.
65 changes: 57 additions & 8 deletions txpool/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (m *accountsMap) allTxs(includeEnqueued bool) (

// An account is the core structure for processing
// transactions from a specific address. The nextNonce
// field is what separetes the enqueued from promoted:
// field is what separates the enqueued from promoted transactions:
//
// 1. enqueued - transactions higher than the nextNonce
// 2. promoted - transactions lower than the nextNonce
Expand All @@ -162,6 +162,52 @@ func (a *account) setNonce(nonce uint64) {
atomic.StoreUint64(&a.nextNonce, nonce)
}

// reset aligns the account with the new nonce
// by pruning all transactions with nonce lesser than new.
// After pruning, a promotion may be signaled if the first
// enqueued transaction matches the new nonce.
func (a *account) reset(nonce uint64, promoteCh chan<- promoteRequest) (
prunedPromoted,
prunedEnqueued []*types.Transaction,
) {
a.promoted.lock(true)
defer a.promoted.unlock()

// prune the promoted txs
prunedPromoted = append(
prunedPromoted,
a.promoted.prune(nonce)...,
)

if nonce <= a.getNonce() {
// only the promoted queue needed pruning
return
}

a.enqueued.lock(true)
defer a.enqueued.unlock()

// prune the enqueued txs
prunedEnqueued = append(
prunedEnqueued,
a.enqueued.prune(nonce)...,
)

// update nonce expected for this account
a.setNonce(nonce)

// it is important to signal promotion while
// the locks are held to ensure no other
// handler will mutate the account
if first := a.enqueued.peek(); first != nil &&
first.Nonce == nonce {
// first enqueued tx is expected -> signal promotion
promoteCh <- promoteRequest{account: first.From}
}

return
}

// enqueue attempts tp push the transaction onto the enqueued queue.
func (a *account) enqueue(tx *types.Transaction) error {
a.enqueued.lock(true)
Expand All @@ -183,7 +229,7 @@ func (a *account) enqueue(tx *types.Transaction) error {
// Eligible transactions are all sequential in order of nonce
// and the first one has to have nonce less (or equal) to the account's
// nextNonce.
func (a *account) promote() (uint64, []types.Hash) {
func (a *account) promote() []*types.Transaction {
a.promoted.lock(true)
a.enqueued.lock(true)

Expand All @@ -192,17 +238,19 @@ func (a *account) promote() (uint64, []types.Hash) {
a.promoted.unlock()
}()

// sanity check
currentNonce := a.getNonce()
if a.enqueued.length() == 0 ||
a.enqueued.peek().Nonce > currentNonce {
// nothing to promote
return 0, nil
return nil
}

promoted := uint64(0)
promotedTxnHashes := make([]types.Hash, 0)
promoted := make([]*types.Transaction, 0)
nextNonce := a.enqueued.peek().Nonce

// move all promotable txs (enqueued txs that are sequential in nonce)
// to the account's promoted queue
for {
tx := a.enqueued.peek()
if tx == nil ||
Expand All @@ -215,11 +263,12 @@ func (a *account) promote() (uint64, []types.Hash) {

// push to promoted
a.promoted.push(tx)
promotedTxnHashes = append(promotedTxnHashes, tx.Hash)

// update counters
nextNonce += 1
promoted += 1

// update return result
promoted = append(promoted, tx)
}

// only update the nonce map if the new nonce
Expand All @@ -228,5 +277,5 @@ func (a *account) promote() (uint64, []types.Hash) {
a.setNonce(nextNonce)
}

return promoted, promotedTxnHashes
return promoted
}
2 changes: 0 additions & 2 deletions txpool/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func (q *accountQueue) unlock() {
// with nonce lower than given.
func (q *accountQueue) prune(nonce uint64) (
pruned []*types.Transaction,
prunedHashes []types.Hash,
) {
for {
tx := q.peek()
Expand All @@ -60,7 +59,6 @@ func (q *accountQueue) prune(nonce uint64) (

tx = q.pop()
pruned = append(pruned, tx)
prunedHashes = append(prunedHashes, tx.Hash)
}

return
Expand Down
103 changes: 46 additions & 57 deletions txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,12 +633,12 @@ func (p *TxPool) handlePromoteRequest(req promoteRequest) {
account := p.accounts.get(addr)

// promote enqueued txs
promoted, promotedHashes := account.promote()
promoted := account.promote()
p.logger.Debug("promote request", "promoted", promoted, "addr", addr.String())

// update metrics
p.metrics.PendingTxs.Add(float64(promoted))
p.eventManager.signalEvent(proto.EventType_PROMOTED, promotedHashes...)
p.metrics.PendingTxs.Add(float64(len(promoted)))
p.eventManager.signalEvent(proto.EventType_PROMOTED, toHash(promoted...)...)
}

// addGossipTx handles receiving transactions
Expand All @@ -664,71 +664,51 @@ func (p *TxPool) addGossipTx(obj interface{}) {
}
}

// resetAccounts updates existing accounts with the new nonce.
// resetAccounts updates existing accounts with the new nonce and prunes stale transactions.
func (p *TxPool) resetAccounts(stateNonces map[types.Address]uint64) {
for addr, nonce := range stateNonces {
var (
allPrunedPromoted []*types.Transaction
allPrunedEnqueued []*types.Transaction
)

// clear all accounts of stale txs
for addr, newNonce := range stateNonces {
if !p.accounts.exists(addr) {
// unknown account
// no updates for this account
continue
}

p.resetAccount(addr, nonce)
}
}

// resetAccount aligns the account's state with the given nonce,
// pruning any present stale transaction. If, afterwards, the account
// is eligible for promotion, a promoteRequest is signaled.
func (p *TxPool) resetAccount(addr types.Address, nonce uint64) {
account := p.accounts.get(addr)

// lock promoted
account.promoted.lock(true)
defer account.promoted.unlock()

// prune promoted
pruned, prunedHashes := account.promoted.prune(nonce)

// update pool state
p.index.remove(pruned...)
p.gauge.decrease(slotsRequired(pruned...))
account := p.accounts.get(addr)
prunedPromoted, prunedEnqueued := account.reset(newNonce, p.promoteReqCh)

p.eventManager.signalEvent(
proto.EventType_PRUNED_PROMOTED,
prunedHashes...,
)

// update metrics
p.metrics.PendingTxs.Add(float64(-1 * len(pruned)))

if nonce <= account.getNonce() {
// only the promoted queue needed pruning
return
// append pruned
allPrunedPromoted = append(allPrunedPromoted, prunedPromoted...)
allPrunedEnqueued = append(allPrunedEnqueued, prunedEnqueued...)
}

// lock enqueued
account.enqueued.lock(true)
defer account.enqueued.unlock()

// prune enqueued
pruned, prunedHashes = account.enqueued.prune(nonce)

// update pool state
p.index.remove(pruned...)
p.gauge.decrease(slotsRequired(pruned...))
// pool cleanup callback
cleanup := func(stale ...*types.Transaction) {
p.index.remove(stale...)
p.gauge.decrease(slotsRequired(stale...))
}

// update next nonce
account.setNonce(nonce)
// prune pool state
if len(allPrunedPromoted) > 0 {
cleanup(allPrunedPromoted...)
p.eventManager.signalEvent(
proto.EventType_PRUNED_PROMOTED,
toHash(allPrunedPromoted...)...,
)

p.eventManager.signalEvent(
proto.EventType_PRUNED_ENQUEUED,
prunedHashes...,
)
p.metrics.PendingTxs.Add(float64(-1 * len(allPrunedPromoted)))
}

if first := account.enqueued.peek(); first != nil &&
first.Nonce == nonce {
// first enqueued tx is expected -> signal promotion
p.promoteReqCh <- promoteRequest{account: addr}
if len(allPrunedEnqueued) > 0 {
cleanup(allPrunedEnqueued...)
p.eventManager.signalEvent(
proto.EventType_PRUNED_ENQUEUED,
toHash(allPrunedEnqueued...)...,
)
}
}

Expand All @@ -749,3 +729,12 @@ func (p *TxPool) createAccountOnce(newAddr types.Address) *account {
func (p *TxPool) Length() uint64 {
return p.accounts.promoted()
}

// toHash returns the hash(es) of given transaction(s)
func toHash(txs ...*types.Transaction) (hashes []types.Hash) {
for _, tx := range txs {
hashes = append(hashes, tx.Hash)
}

return
}
43 changes: 31 additions & 12 deletions txpool/txpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,9 @@ func TestResetAccount(t *testing.T) {
assert.Equal(t, uint64(0), pool.accounts.get(addr1).enqueued.length())
assert.Equal(t, uint64(len(test.txs)), pool.accounts.get(addr1).promoted.length())

pool.resetAccount(addr1, test.newNonce)
pool.resetAccounts(map[types.Address]uint64{
addr1: test.newNonce,
})

assert.Equal(t, test.expected.slots, pool.gauge.read())
assert.Equal(t, // enqueued
Expand Down Expand Up @@ -785,10 +787,14 @@ func TestResetAccount(t *testing.T) {
assert.Equal(t, uint64(0), pool.accounts.get(addr1).promoted.length())

if test.signal {
go pool.resetAccount(addr1, test.newNonce)
go pool.resetAccounts(map[types.Address]uint64{
addr1: test.newNonce,
})
pool.handlePromoteRequest(<-pool.promoteReqCh)
} else {
pool.resetAccount(addr1, test.newNonce)
pool.resetAccounts(map[types.Address]uint64{
addr1: test.newNonce,
})
}

assert.Equal(t, test.expected.slots, pool.gauge.read())
Expand Down Expand Up @@ -940,10 +946,14 @@ func TestResetAccount(t *testing.T) {
pool.handlePromoteRequest(req)

if test.signal {
go pool.resetAccount(addr1, test.newNonce)
go pool.resetAccounts(map[types.Address]uint64{
addr1: test.newNonce,
})
pool.handlePromoteRequest(<-pool.promoteReqCh)
} else {
pool.resetAccount(addr1, test.newNonce)
pool.resetAccounts(map[types.Address]uint64{
addr1: test.newNonce,
})
}

assert.Equal(t, test.expected.slots, pool.gauge.read())
Expand Down Expand Up @@ -1109,13 +1119,13 @@ func TestResetAccounts_Promoted(t *testing.T) {
allTxs :=
map[types.Address][]*types.Transaction{
addr1: {
newTx(addr1, 0, 1),
newTx(addr1, 1, 1),
newTx(addr1, 0, 1), // will be pruned
newTx(addr1, 1, 1), // will be pruned
newTx(addr1, 2, 1),
newTx(addr1, 3, 1),
},
addr2: {
newTx(addr2, 0, 1),
newTx(addr2, 0, 1), // will be pruned
newTx(addr2, 1, 1),
},
addr3: {
Expand All @@ -1124,6 +1134,7 @@ func TestResetAccounts_Promoted(t *testing.T) {
newTx(addr3, 2, 1),
},
addr4: {
// all txs will be pruned
newTx(addr4, 0, 1),
newTx(addr4, 1, 1),
newTx(addr4, 2, 1),
Expand Down Expand Up @@ -1177,10 +1188,7 @@ func TestResetAccounts_Promoted(t *testing.T) {
for _, tx := range txs {
totalTx++

go func(tx *types.Transaction) {
err := pool.addTx(local, tx)
assert.NoError(t, err)
}(tx)
assert.NoError(t, pool.addTx(local, tx))
}
}

Expand All @@ -1191,8 +1199,19 @@ func TestResetAccounts_Promoted(t *testing.T) {
assert.Len(t, waitForEvents(ctx, promotedSubscription, totalTx), totalTx)
pool.eventManager.cancelSubscription(promotedSubscription.subscriptionID)

prunedSubscription := pool.eventManager.subscribe(
[]proto.EventType{
proto.EventType_PRUNED_PROMOTED,
})

pool.resetAccounts(newNonces)

ctx, cancelFn = context.WithTimeout(context.Background(), time.Second*10)
defer cancelFn()

assert.Len(t, waitForEvents(ctx, prunedSubscription, 8), 8)
pool.eventManager.cancelSubscription(prunedSubscription.subscriptionID)

assert.Equal(t, expected.slots, pool.gauge.read())

for addr := range expected.accounts {
Expand Down

0 comments on commit cc56e29

Please sign in to comment.