Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TxPool] Reduce lock time in resetAccounts #459

Merged
merged 14 commits into from
Mar 30, 2022
102 changes: 94 additions & 8 deletions txpool/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,46 @@ func (m *accountsMap) allTxs(includeEnqueued bool) (
return
}

// resetWithNonce updates all accounts with the new nonce and clears any stale transactions.
// May signal a promotion request if the account is eligible after pruning.
func (m *accountsMap) resetWithNonce(newNonces map[types.Address]uint64, promoteCh chan<- promoteRequest) (
allPrunedPromoted,
allPrunedEnqueued []*types.Transaction,
) {
// prune each account with the new nonce
m.Range(func(key, value interface{}) bool {
addr, _ := key.(types.Address) //nolint:forcetypeassert, nolintlint
account, _ := value.(*account) //nolint:forcetypeassert, nolintlint

newNonce, ok := newNonces[addr]
if !ok {
// no updates for this account
return true
}

// prune stale txs
prunedPromoted, prunedEnqueued := account.reset(newNonce, promoteCh)

// update result
allPrunedPromoted = append(
allPrunedPromoted,
prunedPromoted...,
)

allPrunedEnqueued = append(
allPrunedEnqueued,
prunedEnqueued...,
)

return true
})
Kourin1996 marked this conversation as resolved.
Show resolved Hide resolved

return
}

// An account is the core structure for processing
// transactions from a specific address. The nextNonce
// field is what separetes the enqueued from promoted:
// field is what separates the enqueued from promoted transactions:
//
// 1. enqueued - transactions higher than the nextNonce
// 2. promoted - transactions lower than the nextNonce
Expand All @@ -162,6 +199,52 @@ func (a *account) setNonce(nonce uint64) {
atomic.StoreUint64(&a.nextNonce, nonce)
}

// reset aligns the account with the new nonce
// by pruning all transactions with nonce lesser than new.
// After pruning, a promotion may be signaled if the first
// enqueued transaction matches the new nonce.
func (a *account) reset(nonce uint64, promoteCh chan<- promoteRequest) (
prunedPromoted,
prunedEnqueued []*types.Transaction,
) {
a.promoted.lock(true)
defer a.promoted.unlock()

// prune the promoted txs
prunedPromoted = append(
prunedPromoted,
a.promoted.prune(nonce)...,
)

if nonce <= a.getNonce() {
// only the promoted queue needed pruning
return
}

a.enqueued.lock(true)
defer a.enqueued.unlock()

// prune the enqueued txs
prunedEnqueued = append(
prunedEnqueued,
a.enqueued.prune(nonce)...,
)

// update nonce expected for this account
a.setNonce(nonce)

// it is important to signal promotion while
// the locks are held to ensure no other
// handler will mutate the account
if first := a.enqueued.peek(); first != nil &&
first.Nonce == nonce {
// first enqueued tx is expected -> signal promotion
promoteCh <- promoteRequest{account: first.From}
}

return
}

// enqueue attempts tp push the transaction onto the enqueued queue.
func (a *account) enqueue(tx *types.Transaction) error {
a.enqueued.lock(true)
Expand All @@ -183,7 +266,7 @@ func (a *account) enqueue(tx *types.Transaction) error {
// Eligible transactions are all sequential in order of nonce
// and the first one has to have nonce less (or equal) to the account's
// nextNonce.
func (a *account) promote() (uint64, []types.Hash) {
func (a *account) promote() []*types.Transaction {
a.promoted.lock(true)
a.enqueued.lock(true)

Expand All @@ -192,17 +275,19 @@ func (a *account) promote() (uint64, []types.Hash) {
a.promoted.unlock()
}()

// sanity check
currentNonce := a.getNonce()
if a.enqueued.length() == 0 ||
a.enqueued.peek().Nonce > currentNonce {
// nothing to promote
return 0, nil
return nil
}

promoted := uint64(0)
promotedTxnHashes := make([]types.Hash, 0)
promoted := make([]*types.Transaction, 0)
nextNonce := a.enqueued.peek().Nonce

// move all promotable txs (enqueued txs that are sequential in nonce)
// to the account's promoted queue
for {
tx := a.enqueued.peek()
if tx == nil ||
Expand All @@ -215,11 +300,12 @@ func (a *account) promote() (uint64, []types.Hash) {

// push to promoted
a.promoted.push(tx)
promotedTxnHashes = append(promotedTxnHashes, tx.Hash)

// update counters
nextNonce += 1
promoted += 1

// update return result
promoted = append(promoted, tx)
}

// only update the nonce map if the new nonce
Expand All @@ -228,5 +314,5 @@ func (a *account) promote() (uint64, []types.Hash) {
a.setNonce(nextNonce)
}

return promoted, promotedTxnHashes
return promoted
}
2 changes: 0 additions & 2 deletions txpool/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func (q *accountQueue) unlock() {
// with nonce lower than given.
func (q *accountQueue) prune(nonce uint64) (
pruned []*types.Transaction,
prunedHashes []types.Hash,
) {
for {
tx := q.peek()
Expand All @@ -60,7 +59,6 @@ func (q *accountQueue) prune(nonce uint64) (

tx = q.pop()
pruned = append(pruned, tx)
prunedHashes = append(prunedHashes, tx.Hash)
}

return
Expand Down
93 changes: 30 additions & 63 deletions txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,12 +633,12 @@ func (p *TxPool) handlePromoteRequest(req promoteRequest) {
account := p.accounts.get(addr)

// promote enqueued txs
promoted, promotedHashes := account.promote()
promoted := account.promote()
p.logger.Debug("promote request", "promoted", promoted, "addr", addr.String())

// update metrics
p.metrics.PendingTxs.Add(float64(promoted))
p.eventManager.signalEvent(proto.EventType_PROMOTED, promotedHashes...)
p.metrics.PendingTxs.Add(float64(len(promoted)))
p.eventManager.signalEvent(proto.EventType_PROMOTED, toHash(promoted...)...)
}

// addGossipTx handles receiving transactions
Expand All @@ -664,71 +664,29 @@ func (p *TxPool) addGossipTx(obj interface{}) {
}
}

// resetAccounts updates existing accounts with the new nonce.
// resetAccounts updates existing accounts with the new nonce and prunes stale transactions.
func (p *TxPool) resetAccounts(stateNonces map[types.Address]uint64) {
for addr, nonce := range stateNonces {
if !p.accounts.exists(addr) {
// unknown account
continue
}

p.resetAccount(addr, nonce)
}
}

// resetAccount aligns the account's state with the given nonce,
// pruning any present stale transaction. If, afterwards, the account
// is eligible for promotion, a promoteRequest is signaled.
func (p *TxPool) resetAccount(addr types.Address, nonce uint64) {
account := p.accounts.get(addr)

// lock promoted
account.promoted.lock(true)
defer account.promoted.unlock()

// prune promoted
pruned, prunedHashes := account.promoted.prune(nonce)

// update pool state
p.index.remove(pruned...)
p.gauge.decrease(slotsRequired(pruned...))

p.eventManager.signalEvent(
proto.EventType_PRUNED_PROMOTED,
prunedHashes...,
)
allPrunedPromoted, allPrunedEnqueued := p.accounts.resetWithNonce(stateNonces, p.promoteReqCh)

// update metrics
p.metrics.PendingTxs.Add(float64(-1 * len(pruned)))
// update state
if len(allPrunedPromoted) > 0 {
p.index.remove(allPrunedPromoted...)
p.gauge.decrease(slotsRequired(allPrunedPromoted...))
p.eventManager.signalEvent(
proto.EventType_PRUNED_PROMOTED,
toHash(allPrunedPromoted...)...,
)

if nonce <= account.getNonce() {
// only the promoted queue needed pruning
return
p.metrics.PendingTxs.Add(float64(-1 * len(allPrunedPromoted)))
}

// lock enqueued
account.enqueued.lock(true)
defer account.enqueued.unlock()

// prune enqueued
pruned, prunedHashes = account.enqueued.prune(nonce)

// update pool state
p.index.remove(pruned...)
p.gauge.decrease(slotsRequired(pruned...))

// update next nonce
account.setNonce(nonce)

p.eventManager.signalEvent(
proto.EventType_PRUNED_ENQUEUED,
prunedHashes...,
)

if first := account.enqueued.peek(); first != nil &&
first.Nonce == nonce {
// first enqueued tx is expected -> signal promotion
p.promoteReqCh <- promoteRequest{account: addr}
if len(allPrunedEnqueued) > 0 {
p.index.remove(allPrunedEnqueued...)
p.gauge.decrease(slotsRequired(allPrunedEnqueued...))
p.eventManager.signalEvent(
proto.EventType_PRUNED_ENQUEUED,
toHash(allPrunedEnqueued...)...,
)
}
}

Expand All @@ -749,3 +707,12 @@ func (p *TxPool) createAccountOnce(newAddr types.Address) *account {
func (p *TxPool) Length() uint64 {
return p.accounts.promoted()
}

// toHash returns the hash(es) of given transaction(s)
func toHash(txs ...*types.Transaction) (hashes []types.Hash) {
for _, tx := range txs {
hashes = append(hashes, tx.Hash)
}

return
}
43 changes: 31 additions & 12 deletions txpool/txpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,9 @@ func TestResetAccount(t *testing.T) {
assert.Equal(t, uint64(0), pool.accounts.get(addr1).enqueued.length())
assert.Equal(t, uint64(len(test.txs)), pool.accounts.get(addr1).promoted.length())

pool.resetAccount(addr1, test.newNonce)
pool.resetAccounts(map[types.Address]uint64{
addr1: test.newNonce,
})

assert.Equal(t, test.expected.slots, pool.gauge.read())
assert.Equal(t, // enqueued
Expand Down Expand Up @@ -785,10 +787,14 @@ func TestResetAccount(t *testing.T) {
assert.Equal(t, uint64(0), pool.accounts.get(addr1).promoted.length())

if test.signal {
go pool.resetAccount(addr1, test.newNonce)
go pool.resetAccounts(map[types.Address]uint64{
addr1: test.newNonce,
})
pool.handlePromoteRequest(<-pool.promoteReqCh)
} else {
pool.resetAccount(addr1, test.newNonce)
pool.resetAccounts(map[types.Address]uint64{
addr1: test.newNonce,
})
}

assert.Equal(t, test.expected.slots, pool.gauge.read())
Expand Down Expand Up @@ -940,10 +946,14 @@ func TestResetAccount(t *testing.T) {
pool.handlePromoteRequest(req)

if test.signal {
go pool.resetAccount(addr1, test.newNonce)
go pool.resetAccounts(map[types.Address]uint64{
addr1: test.newNonce,
})
pool.handlePromoteRequest(<-pool.promoteReqCh)
} else {
pool.resetAccount(addr1, test.newNonce)
pool.resetAccounts(map[types.Address]uint64{
addr1: test.newNonce,
})
}

assert.Equal(t, test.expected.slots, pool.gauge.read())
Expand Down Expand Up @@ -1109,13 +1119,13 @@ func TestResetAccounts_Promoted(t *testing.T) {
allTxs :=
map[types.Address][]*types.Transaction{
addr1: {
newTx(addr1, 0, 1),
newTx(addr1, 1, 1),
newTx(addr1, 0, 1), // will be pruned
newTx(addr1, 1, 1), // will be pruned
newTx(addr1, 2, 1),
newTx(addr1, 3, 1),
},
addr2: {
newTx(addr2, 0, 1),
newTx(addr2, 0, 1), // will be pruned
newTx(addr2, 1, 1),
},
addr3: {
Expand All @@ -1124,6 +1134,7 @@ func TestResetAccounts_Promoted(t *testing.T) {
newTx(addr3, 2, 1),
},
addr4: {
// all txs will be pruned
newTx(addr4, 0, 1),
newTx(addr4, 1, 1),
newTx(addr4, 2, 1),
Expand Down Expand Up @@ -1177,10 +1188,7 @@ func TestResetAccounts_Promoted(t *testing.T) {
for _, tx := range txs {
totalTx++

go func(tx *types.Transaction) {
err := pool.addTx(local, tx)
assert.NoError(t, err)
}(tx)
assert.NoError(t, pool.addTx(local, tx))
}
}

Expand All @@ -1191,8 +1199,19 @@ func TestResetAccounts_Promoted(t *testing.T) {
assert.Len(t, waitForEvents(ctx, promotedSubscription, totalTx), totalTx)
pool.eventManager.cancelSubscription(promotedSubscription.subscriptionID)

prunedSubscription := pool.eventManager.subscribe(
[]proto.EventType{
proto.EventType_PRUNED_PROMOTED,
})

pool.resetAccounts(newNonces)

ctx, cancelFn = context.WithTimeout(context.Background(), time.Second*10)
defer cancelFn()

assert.Len(t, waitForEvents(ctx, prunedSubscription, 8), 8)
pool.eventManager.cancelSubscription(prunedSubscription.subscriptionID)

assert.Equal(t, expected.slots, pool.gauge.read())

for addr := range expected.accounts {
Expand Down