Skip to content

Commit

Permalink
EVM-716 Check if the error is produced for duplicate TXs
Browse files Browse the repository at this point in the history
Co-authored-by: Igor Crevar <crewce@gmail.com>
  • Loading branch information
dusan-maksimovic and igorcrevar committed Jul 3, 2023
1 parent 655d15d commit 1084d99
Show file tree
Hide file tree
Showing 7 changed files with 975 additions and 365 deletions.
99 changes: 78 additions & 21 deletions txpool/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ import (
type accountsMap struct {
sync.Map

count uint64

count uint64
maxEnqueuedLimit uint64
}

Expand All @@ -22,6 +21,7 @@ func (m *accountsMap) initOnce(addr types.Address, nonce uint64) *account {
a, loaded := m.LoadOrStore(addr, &account{
enqueued: newAccountQueue(),
promoted: newAccountQueue(),
nonceToTx: newNonceToTxLookup(),
maxEnqueued: m.maxEnqueuedLimit,
nextNonce: nonce,
})
Expand Down Expand Up @@ -136,6 +136,43 @@ func (m *accountsMap) allTxs(includeEnqueued bool) (
return
}

type nonceToTxLookup struct {
mapping map[uint64]*types.Transaction
mutex sync.Mutex
}

func newNonceToTxLookup() *nonceToTxLookup {
return &nonceToTxLookup{
mapping: make(map[uint64]*types.Transaction),
}
}

func (m *nonceToTxLookup) lock() {
m.mutex.Lock()
}

func (m *nonceToTxLookup) unlock() {
m.mutex.Unlock()
}

func (m *nonceToTxLookup) get(nonce uint64) *types.Transaction {
return m.mapping[nonce]
}

func (m *nonceToTxLookup) set(tx *types.Transaction) {
m.mapping[tx.Nonce] = tx
}

func (m *nonceToTxLookup) reset() {
m.mapping = make(map[uint64]*types.Transaction)
}

func (m *nonceToTxLookup) remove(txs ...*types.Transaction) {
for _, tx := range txs {
delete(m.mapping, tx.Nonce)
}
}

// An account is the core structure for processing
// transactions from a specific address. The nextNonce
// field is what separates the enqueued from promoted transactions:
Expand All @@ -147,10 +184,13 @@ func (m *accountsMap) allTxs(includeEnqueued bool) (
// a promoteRequest is signaled for this account
// indicating the account's enqueued transaction(s)
// are ready to be moved to the promoted queue.
// lock order is important! promoted.lock(true), enqueued.lock(true), nonceToTx.lock()
type account struct {
enqueued, promoted *accountQueue
nextNonce uint64
demotions uint64
nonceToTx *nonceToTxLookup

nextNonce uint64
demotions uint64
// the number of consecutive blocks that don't contain account's transaction
skips uint64

Expand Down Expand Up @@ -192,21 +232,27 @@ func (a *account) reset(nonce uint64, promoteCh chan<- promoteRequest) (
prunedEnqueued []*types.Transaction,
) {
a.promoted.lock(true)
defer a.promoted.unlock()
a.enqueued.lock(true)
a.nonceToTx.lock()

defer func() {
a.nonceToTx.unlock()
a.enqueued.unlock()
a.promoted.unlock()
}()

// prune the promoted txs
prunedPromoted = a.promoted.prune(nonce)
a.nonceToTx.remove(prunedPromoted...)

if nonce <= a.getNonce() {
// only the promoted queue needed pruning
return
}

a.enqueued.lock(true)
defer a.enqueued.unlock()

// prune the enqueued txs
prunedEnqueued = a.enqueued.prune(nonce)
a.nonceToTx.remove(prunedEnqueued...)

// update nonce expected for this account
a.setNonce(nonce)
Expand All @@ -222,24 +268,31 @@ func (a *account) reset(nonce uint64, promoteCh chan<- promoteRequest) (
return
}

// enqueue attempts tp push the transaction onto the enqueued queue.
func (a *account) enqueue(tx *types.Transaction) error {
a.enqueued.lock(true)
defer a.enqueued.unlock()
// enqueue push the transaction onto the enqueued queue or replace it
func (a *account) enqueue(tx *types.Transaction, replace bool) {
replaceInQueue := func(queue minNonceQueue) bool {
for i, x := range queue {
if x.Nonce == tx.Nonce {
queue[i] = tx // replace

if a.enqueued.length() == a.maxEnqueued {
return ErrMaxEnqueuedLimitReached
}
return true
}
}

// reject low nonce tx
if tx.Nonce < a.getNonce() {
return ErrNonceTooLow
return false
}

// enqueue tx
a.enqueued.push(tx)
a.nonceToTx.set(tx)

return nil
if !replace {
a.enqueued.push(tx)
} else {
// first -> try to replace in enqueued
if !replaceInQueue(a.enqueued.queue) {
// .. then try to replace in promoted
replaceInQueue(a.promoted.queue)
}
}
}

// Promote moves eligible transactions from enqueued to promoted.
Expand Down Expand Up @@ -295,6 +348,10 @@ func (a *account) promote() (promoted []*types.Transaction, pruned []*types.Tran
a.setNonce(nextNonce)
}

a.nonceToTx.lock()
a.nonceToTx.remove(pruned...)
a.nonceToTx.unlock()

return
}

Expand Down
2 changes: 1 addition & 1 deletion txpool/event_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (em *eventManager) subscribe(eventTypes []proto.EventType) *subscribeResult
eventTypes: eventTypes,
outputCh: make(chan *proto.TxPoolEvent),
doneCh: make(chan struct{}),
notifyCh: make(chan struct{}, 1),
notifyCh: make(chan struct{}, 10),
eventStore: &eventQueue{
events: make([]*proto.TxPoolEvent, 0),
},
Expand Down
4 changes: 2 additions & 2 deletions txpool/event_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ func (es *eventSubscription) eventSupported(eventType proto.EventType) bool {
// close stops the event subscription
func (es *eventSubscription) close() {
close(es.doneCh)
close(es.outputCh)
close(es.notifyCh)
}

// runLoop is the main loop that listens for notifications and handles the event / close signals
func (es *eventSubscription) runLoop() {
defer close(es.outputCh)

for {
select {
case <-es.doneCh: // Break if a close signal has been received
Expand Down
11 changes: 6 additions & 5 deletions txpool/queue_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,11 @@ func (q *minNonceQueue) Push(x interface{}) {
}

func (q *minNonceQueue) Pop() interface{} {
old := q
n := len(*old)
x := (*old)[n-1]
*q = (*old)[0 : n-1]
old := *q
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
*q = old[0 : n-1]

return x
return item
}
5 changes: 5 additions & 0 deletions txpool/slot_gauge.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ func (g *slotGauge) highPressure() bool {
return g.read() > (highPressureMark*g.max)/100
}

// free slots returns how many slots are currently available
func (g *slotGauge) freeSlots() uint64 {
return g.max - g.read()
}

// slotsRequired calculates the number of slots required for given transaction(s).
func slotsRequired(txs ...*types.Transaction) uint64 {
slots := uint64(0)
Expand Down
Loading

0 comments on commit 1084d99

Please sign in to comment.