Skip to content

Commit

Permalink
improve actqueue efficiency
Browse files Browse the repository at this point in the history
  • Loading branch information
Liuhaai committed Jul 13, 2022
1 parent 047c989 commit e022a51
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 167 deletions.
4 changes: 2 additions & 2 deletions actpool/actpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func TestActPool_removeConfirmedActs(t *testing.T) {
require.NoError(acct.AddBalance(big.NewInt(100000000000000000)))

return 0, nil
}).Times(8)
}).Times(5)
sf.EXPECT().Height().Return(uint64(1), nil).AnyTimes()
ctx := genesis.WithGenesisContext(context.Background(), genesis.Default)
require.NoError(ap.Add(ctx, tsf1))
Expand Down Expand Up @@ -1011,7 +1011,7 @@ func TestActPool_GetSize(t *testing.T) {
require.NoError(acct.AddBalance(big.NewInt(100000000000000000)))

return 0, nil
}).Times(8)
}).Times(5)
sf.EXPECT().Height().Return(uint64(1), nil).AnyTimes()
ctx := genesis.WithGenesisContext(context.Background(), genesis.Default)
require.NoError(ap.Add(ctx, tsf1))
Expand Down
225 changes: 122 additions & 103 deletions actpool/actqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,14 @@ func (h *noncePriorityQueue) Pop() interface{} {
// ActQueue is the interface of actQueue
type ActQueue interface {
Put(action.SealedEnvelope) error
FilterNonce(uint64) []action.SealedEnvelope
UpdateQueue(uint64) []action.SealedEnvelope
CleanConfirmedAct() []action.SealedEnvelope
UpdateQueue() []action.SealedEnvelope
SetPendingNonce(uint64)
PendingNonce() uint64
SetPendingBalance(*big.Int)
AccountNonce() uint64
SetAccountBalance(*big.Int)
PendingBalance() *big.Int
AccountBalance() *big.Int
Len() int
Empty() bool
PendingActs(context.Context) []action.SealedEnvelope
Expand All @@ -85,22 +87,28 @@ type actQueue struct {
index noncePriorityQueue
// Current pending nonce tracking previous actions that can be committed to the next block for the account
pendingNonce uint64
// Current pending balance for the account
pendingBalance *big.Int
// Pending balance map
pendingBalance map[uint64]*big.Int
// Current account nonce
accountNonce uint64
// Current account balance
accountBalance *big.Int
clock clock.Clock
ttl time.Duration
mu sync.RWMutex
}

// NewActQueue create a new action queue
func NewActQueue(ap *actPool, address string, ops ...ActQueueOption) ActQueue {
func NewActQueue(ap *actPool, address string, pendingNonce uint64, balance *big.Int, ops ...ActQueueOption) ActQueue {
aq := &actQueue{
ap: ap,
address: address,
items: make(map[uint64]action.SealedEnvelope),
index: noncePriorityQueue{},
pendingNonce: uint64(1), // Taking coinbase Action into account, pendingNonce should start with 1
pendingBalance: big.NewInt(0),
pendingBalance: make(map[uint64]*big.Int),
pendingNonce: pendingNonce,
accountNonce: pendingNonce,
accountBalance: new(big.Int).Set(balance),
clock: clock.New(),
ttl: 0,
}
Expand All @@ -115,36 +123,79 @@ func (q *actQueue) Put(act action.SealedEnvelope) error {
q.mu.Lock()
defer q.mu.Unlock()
nonce := act.Nonce()

if cost, _ := act.Cost(); q.getPendingBalanceAtNonce(nonce).Cmp(cost) < 0 {
return action.ErrInsufficientFunds
}

if actInPool, exist := q.items[nonce]; exist {
// act of higher gas price cut in line
if act.GasPrice().Cmp(actInPool.GasPrice()) != 1 {
// act of higher gas price can cut in line
if nonce < q.pendingNonce && act.GasPrice().Cmp(actInPool.GasPrice()) != 1 {
return action.ErrReplaceUnderpriced
}
// update action in q.items and q.index
q.items[nonce] = act
for i, x := range q.index {
if x.nonce == nonce {
for i := range q.index {
if q.index[i].nonce == nonce {
q.index[i].deadline = q.clock.Now().Add(q.ttl)
break
}
}
q.updateFromNonce(nonce)
return nil
}
heap.Push(&q.index, &nonceWithTTL{nonce: nonce, deadline: q.clock.Now().Add(q.ttl)})
q.items[nonce] = act
if nonce == q.pendingNonce {
q.updateFromNonce(q.pendingNonce)
}
return nil
}

// FilterNonce removes all actions from the map with a nonce lower than the given threshold
func (q *actQueue) FilterNonce(threshold uint64) []action.SealedEnvelope {
func (q *actQueue) getPendingBalanceAtNonce(nonce uint64) *big.Int {
if nonce > q.pendingNonce {
return q.getPendingBalanceAtNonce(q.pendingNonce)
}
if _, exist := q.pendingBalance[nonce]; !exist {
return new(big.Int).Set(q.accountBalance)
}
return new(big.Int).Set(q.pendingBalance[nonce])
}

func (q *actQueue) updateFromNonce(start uint64) {
if start > q.pendingNonce {
return
}

for balance := q.getPendingBalanceAtNonce(start); ; start++ {
act, exist := q.items[start]
if !exist {
break
}

cost, _ := act.Cost()
if balance.Cmp(cost) < 0 {
break
}

balance = new(big.Int).Sub(balance, cost)
q.pendingBalance[start+1] = new(big.Int).Set(balance)
}

q.pendingNonce = start
}

// CleanConfirmedAct removes all actions from the map with a nonce lower than account's nonce
func (q *actQueue) CleanConfirmedAct() []action.SealedEnvelope {
q.mu.Lock()
defer q.mu.Unlock()
var removed []action.SealedEnvelope
// Pop off priority queue and delete corresponding entries from map until the threshold is reached
for q.index.Len() > 0 && (q.index)[0].nonce < threshold {
for q.index.Len() > 0 && (q.index)[0].nonce < q.accountNonce {
nonce := heap.Pop(&q.index).(*nonceWithTTL).nonce
removed = append(removed, q.items[nonce])
delete(q.items, nonce)
delete(q.pendingBalance, nonce)
}
return removed
}
Expand All @@ -160,8 +211,13 @@ func (q *actQueue) cleanTimeout() []action.SealedEnvelope {
)
for i := 0; i < size; {
if timeNow.After(q.index[i].deadline) {
removedFromQueue = append(removedFromQueue, q.items[q.index[i].nonce])
delete(q.items, q.index[i].nonce)
nonce := q.index[i].nonce
if nonce < q.pendingNonce {
q.pendingNonce = nonce
}
removedFromQueue = append(removedFromQueue, q.items[nonce])
delete(q.items, nonce)
delete(q.pendingBalance, nonce)
q.index[i] = q.index[size-1]
size--
continue
Expand All @@ -175,52 +231,13 @@ func (q *actQueue) cleanTimeout() []action.SealedEnvelope {
}

// UpdateQueue updates the pending nonce and balance of the queue
func (q *actQueue) UpdateQueue(nonce uint64) []action.SealedEnvelope {
func (q *actQueue) UpdateQueue() []action.SealedEnvelope {
q.mu.Lock()
defer q.mu.Unlock()
// First remove all timed out actions
removedFromQueue := q.cleanTimeout()

// Now, starting from the current pending nonce, incrementally find the next pending nonce
// while updating pending balance if actions are payable
for ; ; nonce++ {
_, exist := q.items[nonce]
if !exist {
break
}
if !q.enoughBalance(q.items[nonce], true) {
break
}
}
q.pendingNonce = nonce

// Find the index of new pending nonce within the queue
sort.Sort(q.index)
i := 0
for ; i < q.index.Len(); i++ {
if q.index[i].nonce >= nonce {
break
}
}
// Case I: An unpayable action has been found while updating pending nonce/balance
// Remove all the subsequent actions in the queue starting from the index of new pending nonce
if _, exist := q.items[nonce]; exist {
removedFromQueue = append(removedFromQueue, q.removeActs(i)...)
return removedFromQueue
}

// Case II: All actions are payable while updating pending nonce/balance
// Check all the subsequent actions in the queue starting from the index of new pending nonce
// Find the nonce index of the first unpayable action
// Remove all the subsequent actions in the queue starting from that index
for ; i < q.index.Len(); i++ {
nonce = q.index[i].nonce
act := q.items[nonce]
if !q.enoughBalance(act, false) {
break
}
}
removedFromQueue = append(removedFromQueue, q.removeActs(i)...)
q.updateFromNonce(q.pendingNonce)
return removedFromQueue
}

Expand All @@ -229,6 +246,7 @@ func (q *actQueue) SetPendingNonce(nonce uint64) {
q.mu.Lock()
defer q.mu.Unlock()
q.pendingNonce = nonce
q.accountNonce = nonce
}

// PendingNonce returns the current pending nonce of the queue
Expand All @@ -238,18 +256,33 @@ func (q *actQueue) PendingNonce() uint64 {
return q.pendingNonce
}

// SetPendingBalance sets pending balance for the queue
func (q *actQueue) SetPendingBalance(balance *big.Int) {
// AccountNonce returns the current account nonce
func (q *actQueue) AccountNonce() uint64 {
q.mu.RLock()
defer q.mu.RUnlock()
return q.accountNonce
}

// SetAccountBalance sets account balance for the queue
func (q *actQueue) SetAccountBalance(balance *big.Int) {
q.mu.Lock()
defer q.mu.Unlock()
q.pendingBalance = balance
q.accountBalance.Set(balance)
q.pendingBalance = make(map[uint64]*big.Int)
}

// PendingBalance returns the current pending balance of the queue
func (q *actQueue) PendingBalance() *big.Int {
q.mu.RLock()
defer q.mu.RUnlock()
return q.pendingBalance
return q.getPendingBalanceAtNonce(q.pendingNonce)
}

// AccountBalance returns the current account balance
func (q *actQueue) AccountBalance() *big.Int {
q.mu.RLock()
defer q.mu.RUnlock()
return new(big.Int).Set(q.accountBalance)
}

// Len returns the length of the action map
Expand All @@ -263,15 +296,24 @@ func (q *actQueue) Len() int {
func (q *actQueue) Empty() bool {
q.mu.RLock()
defer q.mu.RUnlock()
return q.Len() == 0
return len(q.items) == 0
}

// Reset makes the queue into a dummy queue
func (q *actQueue) Reset() {
q.mu.Lock()
defer q.mu.Unlock()
q.items = make(map[uint64]action.SealedEnvelope)
q.index = noncePriorityQueue{}
q.pendingNonce = 0
q.accountBalance = big.NewInt(0)
}

// PendingActs creates a consecutive nonce-sorted slice of actions
func (q *actQueue) PendingActs(ctx context.Context) []action.SealedEnvelope {
if q.Len() == 0 {
return nil
}
acts := make([]action.SealedEnvelope, 0, len(q.items))
addr, err := address.FromString(q.address)
if err != nil {
log.L().Error("Error when getting the address", zap.String("address", q.address), zap.Error(err))
Expand All @@ -282,14 +324,27 @@ func (q *actQueue) PendingActs(ctx context.Context) []action.SealedEnvelope {
log.L().Error("Error when getting the nonce", zap.String("address", q.address), zap.Error(err))
return nil
}
nonce := confirmedState.PendingNonce()

var (
nonce = confirmedState.PendingNonce()
balance = new(big.Int).Set(confirmedState.Balance)
acts = make([]action.SealedEnvelope, 0, len(q.items))
)
q.mu.RLock()
defer q.mu.RUnlock()
for ; ; nonce++ {
if _, exist := q.items[nonce]; !exist {
act, exist := q.items[nonce]
if !exist {
break
}
acts = append(acts, q.items[nonce])

cost, _ := act.Cost()
if balance.Cmp(cost) < 0 {
break
}

balance = new(big.Int).Sub(balance, cost)
acts = append(acts, act)
}
return acts
}
Expand All @@ -308,39 +363,3 @@ func (q *actQueue) AllActs() []action.SealedEnvelope {
}
return acts
}

// removeActs removes all the actions starting at idx from queue
func (q *actQueue) removeActs(idx int) []action.SealedEnvelope {
removedFromQueue := make([]action.SealedEnvelope, 0)
for i := idx; i < q.index.Len(); i++ {
removedFromQueue = append(removedFromQueue, q.items[q.index[i].nonce])
delete(q.items, q.index[i].nonce)
}
q.index = q.index[:idx]
heap.Init(&q.index)
return removedFromQueue
}

// enoughBalance helps check whether queue's pending balance is sufficient for the given action
func (q *actQueue) enoughBalance(act action.SealedEnvelope, updateBalance bool) bool {
cost, _ := act.Cost()
if q.pendingBalance.Cmp(cost) < 0 {
return false
}

if updateBalance {
q.pendingBalance.Sub(q.pendingBalance, cost)
}

return true
}

// Reset resets the queue
func (q *actQueue) Reset() {
q.mu.Lock()
defer q.mu.Unlock()
q.items = make(map[uint64]action.SealedEnvelope)
q.index = noncePriorityQueue{}
q.pendingNonce = 1
q.pendingBalance = big.NewInt(0)
}
Loading

0 comments on commit e022a51

Please sign in to comment.