From f6be2f5ef385490721860d44709ab373834d88b3 Mon Sep 17 00:00:00 2001 From: Liuhaai Date: Thu, 9 Jun 2022 12:17:35 -0700 Subject: [PATCH] improve actqueue efficiency --- actpool/actpool_test.go | 16 +-- actpool/actqueue.go | 242 +++++++++++++++++++++------------------ actpool/actqueue_test.go | 59 +++------- actpool/queueworker.go | 33 +++--- 4 files changed, 170 insertions(+), 180 deletions(-) diff --git a/actpool/actpool_test.go b/actpool/actpool_test.go index a0209482ab..b05b3496f2 100644 --- a/actpool/actpool_test.go +++ b/actpool/actpool_test.go @@ -396,7 +396,7 @@ func TestActPool_removeConfirmedActs(t *testing.T) { acct.Balance = big.NewInt(100000000000000000) return 0, nil - }).Times(8) + }).Times(5) require.NoError(ap.Add(context.Background(), tsf1)) require.NoError(ap.Add(context.Background(), tsf2)) require.NoError(ap.Add(context.Background(), tsf3)) @@ -416,7 +416,7 @@ func TestActPool_removeConfirmedActs(t *testing.T) { }).Times(1) ap.Reset() require.Equal(0, ap.allActions.Count()) - require.True(ap.worker[ap.allocatedWorker(addr)].GetQueue(addr).Empty()) + require.True(ap.worker[ap.allocatedWorker(addr)].GetQueue(addr).IsEmpty()) } func TestActPool_Reset(t *testing.T) { @@ -988,7 +988,7 @@ func TestActPool_GetSize(t *testing.T) { acct.Balance = big.NewInt(100000000000000000) return 0, nil - }).Times(8) + }).Times(5) require.NoError(ap.Add(context.Background(), tsf1)) require.NoError(ap.Add(context.Background(), tsf2)) require.NoError(ap.Add(context.Background(), tsf3)) @@ -1103,7 +1103,7 @@ func getPendingBalance(ap *actPool, addrStr string) (*big.Int, error) { return nil, err } if queue := ap.worker[ap.allocatedWorker(addr)].GetQueue(addr); queue != nil { - return queue.PendingBalance(), nil + return queue.PendingBalanceAt(queue.PendingNonce()), nil } state, err := accountutil.AccountState(ap.sf, addr) if err != nil { @@ -1122,14 +1122,6 @@ func getActPoolCfg() config.ActPool { } } -func actionMap2Slice(actMap map[string][]action.SealedEnvelope) []action.SealedEnvelope { - acts := make([]action.SealedEnvelope, 0) - for _, parts := range actMap { - acts = append(acts, parts...) - } - return acts -} - func lenPendingActionMap(acts map[string][]action.SealedEnvelope) int { l := 0 for _, part := range acts { diff --git a/actpool/actqueue.go b/actpool/actqueue.go index 290555dc0f..e43f2c0c49 100644 --- a/actpool/actqueue.go +++ b/actpool/actqueue.go @@ -61,14 +61,16 @@ func (h *noncePriorityQueue) Pop() interface{} { // ActQueue is the interface of actQueue type ActQueue interface { Put(action.SealedEnvelope) error - FilterNonce(uint64) []action.SealedEnvelope - UpdateQueue(uint64) []action.SealedEnvelope - SetPendingNonce(uint64) + CleanConfirmedAct() []action.SealedEnvelope + UpdateQueue() []action.SealedEnvelope + ConfirmedNonce() uint64 PendingNonce() uint64 - SetPendingBalance(*big.Int) - PendingBalance() *big.Int + SetConfirmedNonce(uint64) + AccountBalance() *big.Int + PendingBalanceAt(uint64) *big.Int + SetAccountBalance(*big.Int) Len() int - Empty() bool + IsEmpty() bool PendingActs() []action.SealedEnvelope AllActs() []action.SealedEnvelope Reset() @@ -84,11 +86,15 @@ type actQueue struct { index noncePriorityQueue // Current pending nonce tracking previous actions that can be committed to the next block for the account pendingNonce uint64 - // Current pending balance for the account - pendingBalance *big.Int + // Current account nonce + confirmedNonce uint64 + // Current account balance + accountBalance *big.Int clock clock.Clock ttl time.Duration mu sync.RWMutex + + balance map[uint64]*big.Int } // NewActQueue create a new action queue @@ -98,8 +104,9 @@ func NewActQueue(ap *actPool, address string, ops ...ActQueueOption) ActQueue { address: address, items: make(map[uint64]action.SealedEnvelope), index: noncePriorityQueue{}, + balance: make(map[uint64]*big.Int), pendingNonce: uint64(1), // Taking coinbase Action into account, pendingNonce should start with 1 - pendingBalance: big.NewInt(0), + accountBalance: big.NewInt(0), clock: clock.New(), ttl: 0, } @@ -114,36 +121,83 @@ func (q *actQueue) Put(act action.SealedEnvelope) error { nonce := act.Nonce() q.mu.Lock() defer q.mu.Unlock() + + if cost, _ := act.Cost(); q.getPreBalance(nonce).Cmp(cost) < 0 { + return action.ErrInsufficientFunds + } + if actInPool, exist := q.items[nonce]; exist { - // act of higher gas price cut in line - if act.GasPrice().Cmp(actInPool.GasPrice()) != 1 { - return action.ErrReplaceUnderpriced + oldActCost, _ := actInPool.Cost() + if nonce != q.pendingNonce || (nonce == q.pendingNonce && q.balance[nonce].Cmp(oldActCost) >= 0) { + // act of higher gas price cut in line + if act.GasPrice().Cmp(actInPool.GasPrice()) != 1 { + return action.ErrReplaceUnderpriced + } } // update action in q.items and q.index q.items[nonce] = act - for i, x := range q.index { - if x.nonce == nonce { + for i := range q.index { + if q.index[i].nonce == nonce { q.index[i].deadline = q.clock.Now().Add(q.ttl) break } } + q.updateFrom(nonce) return nil } heap.Push(&q.index, &nonceWithTTL{nonce: nonce, deadline: q.clock.Now().Add(q.ttl)}) q.items[nonce] = act + if nonce == q.pendingNonce { + q.updateFrom(q.pendingNonce) + } return nil } -// FilterNonce removes all actions from the map with a nonce lower than the given threshold -func (q *actQueue) FilterNonce(threshold uint64) []action.SealedEnvelope { +func (q *actQueue) getPreBalance(nonce uint64) *big.Int { + if nonce == 0 { + return new(big.Int).Set(q.accountBalance) + } + if nonce > q.pendingNonce { + return q.getPreBalance(q.pendingNonce) + } + if _, exist := q.items[nonce-1]; !exist { + return new(big.Int).Set(q.accountBalance) + } + return new(big.Int).Set(q.balance[nonce-1]) +} + +func (q *actQueue) updateFrom(start uint64) { + balance := q.getPreBalance(start) + + for ; ; start++ { + act, exist := q.items[start] + if !exist { + break + } + + cost, _ := act.Cost() + if balance.Cmp(cost) < 0 { + break + } + + balance = new(big.Int).Sub(balance, cost) + q.balance[start] = new(big.Int).Set(balance) + } + + q.pendingNonce = start +} + +// CleanConfirmedAct removes all actions from the map with a nonce lower than the given threshold +func (q *actQueue) CleanConfirmedAct() []action.SealedEnvelope { var removed []action.SealedEnvelope q.mu.Lock() defer q.mu.Unlock() // Pop off priority queue and delete corresponding entries from map until the threshold is reached - for q.index.Len() > 0 && (q.index)[0].nonce < threshold { + for q.index.Len() > 0 && (q.index)[0].nonce <= q.confirmedNonce { nonce := heap.Pop(&q.index).(*nonceWithTTL).nonce removed = append(removed, q.items[nonce]) delete(q.items, nonce) + delete(q.balance, nonce) } return removed } @@ -159,8 +213,13 @@ func (q *actQueue) cleanTimeout() []action.SealedEnvelope { ) for i := 0; i < size; { if timeNow.After(q.index[i].deadline) { - removedFromQueue = append(removedFromQueue, q.items[q.index[i].nonce]) - delete(q.items, q.index[i].nonce) + nonce := q.index[i].nonce + if nonce < q.pendingNonce { + q.pendingNonce = nonce + } + removedFromQueue = append(removedFromQueue, q.items[nonce]) + delete(q.items, nonce) + delete(q.balance, nonce) q.index[i] = q.index[size-1] size-- continue @@ -174,60 +233,22 @@ func (q *actQueue) cleanTimeout() []action.SealedEnvelope { } // UpdateQueue updates the pending nonce and balance of the queue -func (q *actQueue) UpdateQueue(nonce uint64) []action.SealedEnvelope { +func (q *actQueue) UpdateQueue() []action.SealedEnvelope { q.mu.Lock() defer q.mu.Unlock() // First remove all timed out actions removedFromQueue := q.cleanTimeout() - // Now, starting from the current pending nonce, incrementally find the next pending nonce - // while updating pending balance if actions are payable - for ; ; nonce++ { - _, exist := q.items[nonce] - if !exist { - break - } - if !q.enoughBalance(q.items[nonce], true) { - break - } - } - q.pendingNonce = nonce - - // Find the index of new pending nonce within the queue - sort.Sort(q.index) - i := 0 - for ; i < q.index.Len(); i++ { - if q.index[i].nonce >= nonce { - break - } - } - // Case I: An unpayable action has been found while updating pending nonce/balance - // Remove all the subsequent actions in the queue starting from the index of new pending nonce - if _, exist := q.items[nonce]; exist { - removedFromQueue = append(removedFromQueue, q.removeActs(i)...) - return removedFromQueue - } - - // Case II: All actions are payable while updating pending nonce/balance - // Check all the subsequent actions in the queue starting from the index of new pending nonce - // Find the nonce index of the first unpayable action - // Remove all the subsequent actions in the queue starting from that index - for ; i < q.index.Len(); i++ { - nonce = q.index[i].nonce - act := q.items[nonce] - if !q.enoughBalance(act, false) { - break - } - } - removedFromQueue = append(removedFromQueue, q.removeActs(i)...) + q.updateFrom(q.pendingNonce) return removedFromQueue } -// SetPendingNonce sets pending nonce for the queue -func (q *actQueue) SetPendingNonce(nonce uint64) { +// SetConfirmedNonce sets pending nonce for the queue +func (q *actQueue) SetConfirmedNonce(nonce uint64) { q.mu.Lock() defer q.mu.Unlock() - q.pendingNonce = nonce + q.confirmedNonce = nonce + q.pendingNonce = nonce + 1 } // PendingNonce returns the current pending nonce of the queue @@ -237,18 +258,32 @@ func (q *actQueue) PendingNonce() uint64 { return q.pendingNonce } +// ConfirmedNonce returns the current confirmed nonce of the queue +func (q *actQueue) ConfirmedNonce() uint64 { + q.mu.RLock() + defer q.mu.RUnlock() + return q.confirmedNonce +} + // SetPendingBalance sets pending balance for the queue -func (q *actQueue) SetPendingBalance(balance *big.Int) { +func (q *actQueue) SetAccountBalance(balance *big.Int) { q.mu.Lock() defer q.mu.Unlock() - q.pendingBalance = balance + q.accountBalance.Set(balance) } // PendingBalance returns the current pending balance of the queue -func (q *actQueue) PendingBalance() *big.Int { +func (q *actQueue) AccountBalance() *big.Int { + q.mu.RLock() + defer q.mu.RUnlock() + return new(big.Int).Set(q.accountBalance) +} + +// PendingBalanceAt returns the pending balance at nonce +func (q *actQueue) PendingBalanceAt(nonce uint64) *big.Int { q.mu.RLock() defer q.mu.RUnlock() - return q.pendingBalance + return q.getPreBalance(nonce) } // Len returns the length of the action map @@ -258,11 +293,22 @@ func (q *actQueue) Len() int { return len(q.items) } -// Empty returns whether the queue of actions is empty or not -func (q *actQueue) Empty() bool { +// IsEmpty returns whether the queue of actions is empty or not +func (q *actQueue) IsEmpty() bool { q.mu.RLock() defer q.mu.RUnlock() - return q.Len() == 0 + return len(q.items) == 0 +} + +// Reset resets the queue +func (q *actQueue) Reset() { + q.mu.Lock() + defer q.mu.Unlock() + q.items = make(map[uint64]action.SealedEnvelope) + q.index = noncePriorityQueue{} + q.pendingNonce = 1 + q.confirmedNonce = 0 + q.accountBalance = big.NewInt(0) } // PendingActs creates a consecutive nonce-sorted slice of actions @@ -270,7 +316,6 @@ func (q *actQueue) PendingActs() []action.SealedEnvelope { if q.Len() == 0 { return nil } - acts := make([]action.SealedEnvelope, 0, len(q.items)) addr, err := address.FromString(q.address) if err != nil { log.L().Error("Error when getting the address", zap.String("address", q.address), zap.Error(err)) @@ -281,14 +326,27 @@ func (q *actQueue) PendingActs() []action.SealedEnvelope { log.L().Error("Error when getting the nonce", zap.String("address", q.address), zap.Error(err)) return nil } - nonce := confirmedState.Nonce + 1 + + var ( + nonce = confirmedState.Nonce + 1 + balance = confirmedState.Balance + acts = make([]action.SealedEnvelope, 0, len(q.items)) + ) q.mu.RLock() defer q.mu.RUnlock() for ; ; nonce++ { - if _, exist := q.items[nonce]; !exist { + act, exist := q.items[nonce] + if !exist { + break + } + + cost, _ := act.Cost() + if balance.Cmp(cost) < 0 { break } - acts = append(acts, q.items[nonce]) + + balance = new(big.Int).Sub(balance, cost) + acts = append(acts, act) } return acts } @@ -307,39 +365,3 @@ func (q *actQueue) AllActs() []action.SealedEnvelope { } return acts } - -// removeActs removes all the actions starting at idx from queue -func (q *actQueue) removeActs(idx int) []action.SealedEnvelope { - removedFromQueue := make([]action.SealedEnvelope, 0) - for i := idx; i < q.index.Len(); i++ { - removedFromQueue = append(removedFromQueue, q.items[q.index[i].nonce]) - delete(q.items, q.index[i].nonce) - } - q.index = q.index[:idx] - heap.Init(&q.index) - return removedFromQueue -} - -// enoughBalance helps check whether queue's pending balance is sufficient for the given action -func (q *actQueue) enoughBalance(act action.SealedEnvelope, updateBalance bool) bool { - cost, _ := act.Cost() - if q.pendingBalance.Cmp(cost) < 0 { - return false - } - - if updateBalance { - q.pendingBalance.Sub(q.pendingBalance, cost) - } - - return true -} - -// Reset resets the queue -func (q *actQueue) Reset() { - q.mu.Lock() - defer q.mu.Unlock() - q.items = make(map[uint64]action.SealedEnvelope) - q.index = noncePriorityQueue{} - q.pendingNonce = 1 - q.pendingBalance = big.NewInt(0) -} diff --git a/actpool/actqueue_test.go b/actpool/actqueue_test.go index 74e515826f..bc7cc63876 100644 --- a/actpool/actqueue_test.go +++ b/actpool/actqueue_test.go @@ -26,6 +26,10 @@ import ( "github.com/iotexproject/iotex-core/test/mock/mock_chainmanager" ) +const ( + maxBalance = 1e7 +) + func TestNoncePriorityQueue(t *testing.T) { require := require.New(t) pq := noncePriorityQueue{} @@ -59,6 +63,7 @@ func TestNoncePriorityQueue(t *testing.T) { func TestActQueuePut(t *testing.T) { require := require.New(t) q := NewActQueue(nil, "").(*actQueue) + q.SetAccountBalance(big.NewInt(maxBalance)) tsf1, err := action.SignedTransfer(_addr2, _priKey1, 2, big.NewInt(100), nil, uint64(0), big.NewInt(1)) require.NoError(err) require.NoError(q.Put(tsf1)) @@ -84,6 +89,7 @@ func TestActQueuePut(t *testing.T) { func TestActQueueFilterNonce(t *testing.T) { require := require.New(t) q := NewActQueue(nil, "").(*actQueue) + q.SetAccountBalance(big.NewInt(maxBalance)) tsf1, err := action.SignedTransfer(_addr2, _priKey1, 1, big.NewInt(1), nil, uint64(0), big.NewInt(0)) require.NoError(err) tsf2, err := action.SignedTransfer(_addr2, _priKey1, 2, big.NewInt(1), nil, uint64(0), big.NewInt(0)) @@ -93,7 +99,8 @@ func TestActQueueFilterNonce(t *testing.T) { require.NoError(q.Put(tsf1)) require.NoError(q.Put(tsf2)) require.NoError(q.Put(tsf3)) - q.FilterNonce(uint64(3)) + q.SetConfirmedNonce(2) + q.CleanConfirmedAct() require.Equal(1, len(q.items)) require.Equal(uint64(3), q.index[0].nonce) require.Equal(tsf3, q.items[q.index[0].nonce]) @@ -102,25 +109,25 @@ func TestActQueueFilterNonce(t *testing.T) { func TestActQueueUpdateNonce(t *testing.T) { require := require.New(t) q := NewActQueue(nil, "").(*actQueue) + q.SetAccountBalance(big.NewInt(1010)) tsf1, err := action.SignedTransfer(_addr2, _priKey1, 1, big.NewInt(1), nil, uint64(0), big.NewInt(0)) require.NoError(err) tsf2, err := action.SignedTransfer(_addr2, _priKey1, 3, big.NewInt(1000), nil, uint64(0), big.NewInt(0)) require.NoError(err) - tsf3, err := action.SignedTransfer(_addr2, _priKey1, 4, big.NewInt(10000), nil, uint64(0), big.NewInt(0)) + tsf3, err := action.SignedTransfer(_addr2, _priKey1, 4, big.NewInt(1000), nil, uint64(0), big.NewInt(0)) require.NoError(err) - tsf4, err := action.SignedTransfer(_addr2, _priKey1, 6, big.NewInt(100000), nil, uint64(0), big.NewInt(0)) + tsf4, err := action.SignedTransfer(_addr2, _priKey1, 6, big.NewInt(1000), nil, uint64(0), big.NewInt(0)) require.NoError(err) - tsf5, err := action.SignedTransfer(_addr2, _priKey1, 2, big.NewInt(100000), nil, uint64(0), big.NewInt(0)) + tsf5, err := action.SignedTransfer(_addr2, _priKey1, 2, big.NewInt(1000), nil, uint64(0), big.NewInt(0)) require.NoError(err) require.NoError(q.Put(tsf1)) require.NoError(q.Put(tsf2)) require.NoError(q.Put(tsf3)) require.NoError(q.Put(tsf4)) - q.pendingBalance = big.NewInt(1000) require.NoError(q.Put(tsf5)) - removed := q.UpdateQueue(uint64(2)) - require.Equal(uint64(2), q.pendingNonce) - require.Equal([]action.SealedEnvelope{tsf5, tsf2, tsf3, tsf4}, removed) + removed := q.UpdateQueue() + require.Equal(uint64(3), q.pendingNonce) + require.Equal(0, len(removed)) } func TestActQueuePendingActs(t *testing.T) { @@ -130,10 +137,12 @@ func TestActQueuePendingActs(t *testing.T) { sf := mock_chainmanager.NewMockStateReader(ctrl) sf.EXPECT().State(gomock.Any(), gomock.Any()).Do(func(accountState *state.Account, _ protocol.StateOption) { accountState.Nonce = uint64(1) + accountState.Balance = big.NewInt(maxBalance) }).Return(uint64(0), nil).Times(1) ap, err := NewActPool(sf, cfg.ActPool, EnableExperimentalActions()) require.NoError(err) q := NewActQueue(ap.(*actPool), identityset.Address(0).String()).(*actQueue) + q.SetAccountBalance(big.NewInt(maxBalance)) tsf1, err := action.SignedTransfer(_addr2, _priKey1, 2, big.NewInt(100), nil, uint64(0), big.NewInt(0)) require.NoError(err) tsf2, err := action.SignedTransfer(_addr2, _priKey1, 3, big.NewInt(100), nil, uint64(0), big.NewInt(0)) @@ -157,6 +166,7 @@ func TestActQueuePendingActs(t *testing.T) { func TestActQueueAllActs(t *testing.T) { require := require.New(t) q := NewActQueue(nil, "").(*actQueue) + q.SetAccountBalance(big.NewInt(maxBalance)) tsf1, err := action.SignedTransfer(_addr2, _priKey1, 1, big.NewInt(1000), nil, uint64(0), big.NewInt(0)) require.NoError(err) tsf3, err := action.SignedTransfer(_addr2, _priKey1, 3, big.NewInt(1000), nil, uint64(0), big.NewInt(0)) @@ -167,41 +177,10 @@ func TestActQueueAllActs(t *testing.T) { require.Equal([]action.SealedEnvelope{tsf1, tsf3}, actions) } -func TestActQueueRemoveActs(t *testing.T) { - require := require.New(t) - q := NewActQueue(nil, "").(*actQueue) - tsf1, err := action.SignedTransfer(_addr2, _priKey1, 1, big.NewInt(100), nil, uint64(0), big.NewInt(0)) - require.NoError(err) - tsf2, err := action.SignedTransfer(_addr2, _priKey1, 2, big.NewInt(100), nil, uint64(0), big.NewInt(0)) - require.NoError(err) - tsf3, err := action.SignedTransfer(_addr2, _priKey1, 3, big.NewInt(100), nil, uint64(0), big.NewInt(0)) - require.NoError(err) - require.NoError(q.Put(tsf1)) - require.NoError(q.Put(tsf2)) - require.NoError(q.Put(tsf3)) - removed := q.removeActs(0) - require.Equal(0, len(q.index)) - require.Equal(0, len(q.items)) - require.Equal([]action.SealedEnvelope{tsf1, tsf2, tsf3}, removed) - - tsf4, err := action.SignedTransfer(_addr2, _priKey1, 4, big.NewInt(10000), nil, uint64(0), big.NewInt(0)) - require.NoError(err) - tsf5, err := action.SignedTransfer(_addr2, _priKey1, 5, big.NewInt(100000), nil, uint64(0), big.NewInt(0)) - require.NoError(err) - tsf6, err := action.SignedTransfer(_addr2, _priKey1, 6, big.NewInt(100000), nil, uint64(0), big.NewInt(0)) - require.NoError(err) - require.NoError(q.Put(tsf4)) - require.NoError(q.Put(tsf5)) - require.NoError(q.Put(tsf6)) - removed = q.removeActs(1) - require.Equal(1, len(q.index)) - require.Equal(1, len(q.items)) - require.Equal([]action.SealedEnvelope{tsf5, tsf6}, removed) -} - func TestActQueueTimeOutAction(t *testing.T) { c := clock.NewMock() q := NewActQueue(nil, "", WithClock(c), WithTimeOut(3*time.Minute)) + q.SetAccountBalance(big.NewInt(maxBalance)) tsf1, err := action.SignedTransfer(_addr2, _priKey1, 1, big.NewInt(100), nil, uint64(0), big.NewInt(0)) require.NoError(t, err) tsf2, err := action.SignedTransfer(_addr2, _priKey1, 3, big.NewInt(100), nil, uint64(0), big.NewInt(0)) diff --git a/actpool/queueworker.go b/actpool/queueworker.go index b425d65793..48aa131b43 100644 --- a/actpool/queueworker.go +++ b/actpool/queueworker.go @@ -117,17 +117,16 @@ func (worker *queueWorker) Handle(job workerJob) { } func (worker *queueWorker) getConfirmedState(sender address.Address) (uint64, *big.Int, error) { - // TODO: modifying in the next pr queue := worker.accountActs[sender.String()] - confirmedState, err := accountutil.AccountState(worker.ap.sf, sender) - if err != nil { - return 0, nil, err - } // account state isn't cached in the actpool if queue == nil { + confirmedState, err := accountutil.AccountState(worker.ap.sf, sender) + if err != nil { + return 0, nil, err + } return confirmedState.Nonce, confirmedState.Balance, nil } - return confirmedState.Nonce, queue.PendingBalance(), nil + return queue.ConfirmedNonce(), queue.AccountBalance(), nil } func (worker *queueWorker) checkSelpWithState(act *action.SealedEnvelope, confirmedNonce uint64, confirmedBalance *big.Int) error { @@ -167,8 +166,8 @@ func (worker *queueWorker) putAction(sender string, act action.SealedEnvelope, c if queue == nil { queue = NewActQueue(worker.ap, sender, WithTimeOut(worker.ap.cfg.ActionExpiry)) - queue.SetPendingNonce(confirmedNonce + 1) - queue.SetPendingBalance(confirmedBalance) + queue.SetConfirmedNonce(confirmedNonce) + queue.SetAccountBalance(confirmedBalance) worker.mu.Lock() worker.accountActs[sender] = queue worker.mu.Unlock() @@ -181,8 +180,6 @@ func (worker *queueWorker) putAction(sender string, act action.SealedEnvelope, c return err } - queue.UpdateQueue(queue.PendingNonce()) // TODO: to be removed - return nil } @@ -196,7 +193,7 @@ func (worker *queueWorker) removeEmptyAccounts() { worker.emptyAccounts.Range(func(key, _ interface{}) error { sender := key.(string) - if worker.accountActs[sender].Empty() { + if worker.accountActs[sender].IsEmpty() { delete(worker.accountActs, sender) } return nil @@ -218,14 +215,14 @@ func (worker *queueWorker) Reset() { worker.emptyAccounts.Set(from, struct{}{}) continue } - queue.SetPendingNonce(confirmedState.Nonce + 1) - queue.SetPendingBalance(confirmedState.Balance) + queue.SetConfirmedNonce(confirmedState.Nonce) + queue.SetAccountBalance(confirmedState.Balance) // Remove all actions that are committed to new block - acts := queue.FilterNonce(queue.PendingNonce()) - acts2 := queue.UpdateQueue(queue.PendingNonce()) + acts := queue.CleanConfirmedAct() + acts2 := queue.UpdateQueue() worker.ap.removeInvalidActs(append(acts, acts2...)) // Delete the queue entry if it becomes empty - if queue.Empty() { + if queue.IsEmpty() { worker.emptyAccounts.Set(from, struct{}{}) } } @@ -238,11 +235,11 @@ func (worker *queueWorker) PendingAction() []*pendingActions { worker.mu.RLock() defer worker.mu.RUnlock() for from, queue := range worker.accountActs { - if queue.Empty() { + if queue.IsEmpty() { continue } // Remove the actions that are already timeout - acts := queue.UpdateQueue(queue.PendingNonce()) + acts := queue.UpdateQueue() worker.ap.removeInvalidActs(acts) actionArr = append(actionArr, &pendingActions{ sender: from,