Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade txpool module to increase tps #43

Merged
merged 1 commit into from
Mar 21, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 35 additions & 10 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,19 @@ var (
reheapTimer = metrics.NewRegisteredTimer("txpool/reheap", nil)
)

var (
addrsPool = sync.Pool{
New: func() interface{} {
return make([]common.Address, 0, 8)
},
}
addrBeatPool = sync.Pool{
New: func() interface{} {
return make(addressesByHeartbeat, 0, 8)
},
}
)

// TxStatus is the current status of a transaction as seen by the pool.
type TxStatus uint

Expand Down Expand Up @@ -267,6 +280,8 @@ type TxPool struct {
wg sync.WaitGroup // tracks loop, scheduleReorgLoop
initDoneCh chan struct{} // is closed once the pool is initialized (for tests)

spammers *prque.Prque

changesSinceReorg int // A counter for how many drops we've performed in-between reorg.
}

Expand Down Expand Up @@ -298,6 +313,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
reorgShutdownCh: make(chan struct{}),
initDoneCh: make(chan struct{}),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
spammers: prque.New(nil),
}
pool.locals = newAccountSet(pool.signer)
for _, addr := range config.Locals {
Expand Down Expand Up @@ -1166,13 +1182,14 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt
}
}
// Reset needs promote for all addresses
promoteAddrs = make([]common.Address, 0, len(pool.queue))
promoteAddrs = addrsPool.Get().([]common.Address)
for addr := range pool.queue {
promoteAddrs = append(promoteAddrs, addr)
}
}
// Check for pending transactions for every account that sent new ones
promoted := pool.promoteExecutables(promoteAddrs)
defer addrsPool.Put(promoteAddrs[:0])

// If a new block appeared, validate the pool of pending transactions. This will
// remove any transaction that has been included in the block or was invalidated
Expand Down Expand Up @@ -1387,18 +1404,19 @@ func (pool *TxPool) truncatePending() {

pendingBeforeCap := pending
// Assemble a spam order to penalize large transactors first
spammers := prque.New(nil)
pool.spammers.Reset()
for addr, list := range pool.pending {
// Only evict transactions from high rollers
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
spammers.Push(addr, int64(list.Len()))
pool.spammers.Push(addr, int64(list.Len()))
}
}
// Gradually drop transactions from offenders
offenders := []common.Address{}
for pending > pool.config.GlobalSlots && !spammers.Empty() {
offenders := addrsPool.Get().([]common.Address)
defer addrsPool.Put(offenders[:0])
for pending > pool.config.GlobalSlots && !pool.spammers.Empty() {
// Retrieve the next offender if not local address
offender, _ := spammers.Pop()
offender, _ := pool.spammers.Pop()
offenders = append(offenders, offender.(common.Address))

// Equalize balances until all the same or below threshold
Expand Down Expand Up @@ -1471,7 +1489,8 @@ func (pool *TxPool) truncateQueue() {
}

// Sort all accounts with queued transactions by heartbeat
addresses := make(addressesByHeartbeat, 0, len(pool.queue))
addresses := addrBeatPool.Get().(addressesByHeartbeat)
defer addrBeatPool.Put(addresses[:0])
for addr := range pool.queue {
if !pool.locals.contains(addr) { // don't drop locals
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
Expand Down Expand Up @@ -1620,7 +1639,10 @@ func (as *accountSet) containsTx(tx *types.Transaction) bool {
// add inserts a new address into the set to track.
func (as *accountSet) add(addr common.Address) {
as.accounts[addr] = struct{}{}
as.cache = nil
if as.cache != nil {
addrsPool.Put((*as.cache)[:0])
as.cache = nil
}
}

// addTx adds the sender of tx into the set.
Expand All @@ -1634,7 +1656,7 @@ func (as *accountSet) addTx(tx *types.Transaction) {
// reuse. The returned slice should not be changed!
func (as *accountSet) flatten() []common.Address {
if as.cache == nil {
accounts := make([]common.Address, 0, len(as.accounts))
accounts := addrsPool.Get().([]common.Address)
for account := range as.accounts {
accounts = append(accounts, account)
}
Expand All @@ -1648,7 +1670,10 @@ func (as *accountSet) merge(other *accountSet) {
for addr := range other.accounts {
as.accounts[addr] = struct{}{}
}
as.cache = nil
if as.cache != nil {
addrsPool.Put((*as.cache)[:0])
as.cache = nil
}
}

// txLookup is used internally by TxPool to track transactions while allowing
Expand Down