From 4c9edbb24f39f1f03aa562e46b4cd4c5d84eb10a Mon Sep 17 00:00:00 2001 From: Liuhaai Date: Wed, 13 Jul 2022 10:50:06 -0700 Subject: [PATCH] opt actpool.Add() --- actpool/actioniterator/actioniterator.go | 3 +- actpool/actpool.go | 408 ++++++++++------------- actpool/actpool_test.go | 230 ++++++------- actpool/actqueue.go | 42 ++- actpool/options.go | 5 + actpool/queueworker.go | 286 ++++++++++++++++ dispatcher/dispatcher.go | 40 +-- 7 files changed, 626 insertions(+), 388 deletions(-) create mode 100644 actpool/queueworker.go diff --git a/actpool/actioniterator/actioniterator.go b/actpool/actioniterator/actioniterator.go index 9d654eb71e..8c932b8dbc 100644 --- a/actpool/actioniterator/actioniterator.go +++ b/actpool/actioniterator/actioniterator.go @@ -69,8 +69,7 @@ func NewActionIterator(accountActs map[string][]action.SealedEnvelope) ActionIte // LoadNext load next action of account of top action func (ai *actionIterator) loadNextActionForTopAccount() { - sender := ai.heads[0].SrcPubkey() - callerAddrStr := sender.Address().String() + callerAddrStr := ai.heads[0].SenderAddress().String() if actions, ok := ai.accountActs[callerAddrStr]; ok && len(actions) > 0 { ai.heads[0], ai.accountActs[callerAddrStr] = actions[0], actions[1:] heap.Fix(&ai.heads, 0) diff --git a/actpool/actpool.go b/actpool/actpool.go index f9ab32da8d..3cfddf7ca5 100644 --- a/actpool/actpool.go +++ b/actpool/actpool.go @@ -9,13 +9,14 @@ import ( "context" "encoding/hex" "sort" - "strings" "sync" + "sync/atomic" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" + "github.com/iotexproject/go-pkgs/cache/ttl" "github.com/iotexproject/go-pkgs/hash" "github.com/iotexproject/iotex-address/address" @@ -29,6 +30,11 @@ import ( "github.com/iotexproject/iotex-core/pkg/tracer" ) +const ( + // move to config + _numWorker = 16 +) + var ( _actpoolMtc = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "iotex_actpool_rejection_metrics", @@ -91,18 +97,23 @@ func EnableExperimentalActions() Option { // actPool implements ActPool interface type actPool struct { - mutex sync.RWMutex cfg Config g genesis.Genesis sf protocol.StateReader - accountActs map[string]ActQueue - accountDesActs map[string]map[hash.Hash256]action.SealedEnvelope - allActions map[hash.Hash256]action.SealedEnvelope + accountDesActs *desActs + allActions *ttl.Cache gasInPool uint64 actionEnvelopeValidators []action.SealedEnvelopeValidator timerFactory *prometheustimer.TimerFactory enableExperimentalActions bool senderBlackList map[string]bool + jobQueue []chan workerJob + worker []*queueWorker +} + +type desActs struct { + mu sync.Mutex + acts map[string]map[hash.Hash256]action.SealedEnvelope } // NewActPool constructs a new actpool @@ -116,14 +127,16 @@ func NewActPool(g genesis.Genesis, sf protocol.StateReader, cfg Config, opts ... senderBlackList[bannedSender] = true } + actsMap, _ := ttl.NewCache() ap := &actPool{ cfg: cfg, g: g, sf: sf, senderBlackList: senderBlackList, - accountActs: make(map[string]ActQueue), - accountDesActs: make(map[string]map[hash.Hash256]action.SealedEnvelope), - allActions: make(map[hash.Hash256]action.SealedEnvelope), + accountDesActs: &desActs{acts: make(map[string]map[hash.Hash256]action.SealedEnvelope)}, + allActions: actsMap, + jobQueue: make([]chan workerJob, _numWorker), + worker: make([]*queueWorker, _numWorker), } for _, opt := range opts { if err := opt(ap); err != nil { @@ -140,9 +153,24 @@ func NewActPool(g genesis.Genesis, sf protocol.StateReader, cfg Config, opts ... return nil, err } ap.timerFactory = timerFactory + + for i := 0; i < _numWorker; i++ { + ap.jobQueue[i] = make(chan workerJob, ap.cfg.MaxNumActsPerAcct) + ap.worker[i] = newQueueWorker(ap, ap.jobQueue[i]) + if err := ap.worker[i].Start(); err != nil { + return nil, err + } + } return ap, nil } +// TODO: add start() and stop() in actpool +// func (ap *actPool) Start() { +// } + +// func (ap *actPool) Stop() { +// } + func (ap *actPool) AddActionEnvelopeValidators(fs ...action.SealedEnvelopeValidator) { ap.actionEnvelopeValidators = append(ap.actionEnvelopeValidators, fs...) } @@ -156,100 +184,150 @@ func (ap *actPool) AddActionEnvelopeValidators(fs ...action.SealedEnvelopeValida // Then starting from the current confirmed nonce, iteratively update pending nonce if nonces are consecutive and pending // balance is sufficient, and remove all the subsequent actions once the pending balance becomes insufficient func (ap *actPool) Reset() { - ap.mutex.Lock() - defer ap.mutex.Unlock() - ap.reset() } -func (ap *actPool) ReceiveBlock(*block.Block) error { - ap.mutex.Lock() - defer ap.mutex.Unlock() +func (ap *actPool) reset() { + var ( + wg sync.WaitGroup + ctx = ap.context(context.Background()) + ) + for i := range ap.worker { + wg.Add(1) + go func(worker *queueWorker) { + defer wg.Done() + worker.Reset(ctx) + }(ap.worker[i]) + } + wg.Wait() +} +func (ap *actPool) ReceiveBlock(*block.Block) error { ap.reset() return nil } -// PendingActionIterator returns an action interator with all accepted actions +// PendingActionMap returns an action interator with all accepted actions func (ap *actPool) PendingActionMap() map[string][]action.SealedEnvelope { - ap.mutex.Lock() - defer ap.mutex.Unlock() - - // Remove the actions that are already timeout - ap.reset() - - ctx := ap.context(context.Background()) - actionMap := make(map[string][]action.SealedEnvelope) - for from, queue := range ap.accountActs { - actionMap[from] = append(actionMap[from], queue.PendingActs(ctx)...) + var ( + wg sync.WaitGroup + actsFromWorker = make([][]*pendingActions, _numWorker) + ctx = ap.context(context.Background()) + totalAccounts = uint64(0) + ) + for i := range ap.worker { + wg.Add(1) + go func(i int) { + defer wg.Done() + actsFromWorker[i] = ap.worker[i].PendingActions(ctx) + atomic.AddUint64(&totalAccounts, uint64(len(actsFromWorker[i]))) + }(i) + } + wg.Wait() + + ret := make(map[string][]action.SealedEnvelope, totalAccounts) + for _, v := range actsFromWorker { + for _, w := range v { + ret[w.sender] = w.acts + } } - return actionMap + return ret } func (ap *actPool) Add(ctx context.Context, act action.SealedEnvelope) error { - ap.mutex.Lock() - defer ap.mutex.Unlock() - ctx, span := tracer.NewSpan(ap.context(ctx), "actPool.Add") defer span.End() + ctx = ap.context(ctx) + + if err := checkSelpData(&act); err != nil { + return err + } + + if err := ap.checkSelpWithoutState(ctx, &act); err != nil { + return err + } // Reject action if pool space is full - if uint64(len(ap.allActions)) >= ap.cfg.MaxNumActsPerPool { + + if uint64(ap.allActions.Count()) >= ap.cfg.MaxNumActsPerPool { _actpoolMtc.WithLabelValues("overMaxNumActsPerPool").Inc() return action.ErrTxPoolOverflow } - span.AddEvent("act.IntrinsicGas") - intrinsicGas, err := act.IntrinsicGas() + + if intrinsicGas, _ := act.IntrinsicGas(); atomic.LoadUint64(&ap.gasInPool)+intrinsicGas > ap.cfg.MaxGasLimitPerPool { + _actpoolMtc.WithLabelValues("overMaxGasLimitPerPool").Inc() + return action.ErrGasLimit + } + + return ap.enqueue(ctx, act) +} + +func checkSelpData(act *action.SealedEnvelope) error { + _, err := act.IntrinsicGas() if err != nil { - _actpoolMtc.WithLabelValues("failedGetIntrinsicGas").Inc() return err } - if ap.gasInPool+intrinsicGas > ap.cfg.MaxGasLimitPerPool { - _actpoolMtc.WithLabelValues("overMaxGasLimitPerPool").Inc() - return action.ErrGasLimit + _, err = act.Hash() + if err != nil { + return err } - hash, err := act.Hash() + _, err = act.Cost() if err != nil { return err } + if act.SrcPubkey() == nil { + return action.ErrAddress + } + return nil +} + +func (ap *actPool) checkSelpWithoutState(ctx context.Context, selp *action.SealedEnvelope) error { + span := tracer.SpanFromContext(ctx) + span.AddEvent("actPool.checkSelpWithoutState") + defer span.End() + + hash, _ := selp.Hash() // Reject action if it already exists in pool - if _, exist := ap.allActions[hash]; exist { + if _, exist := ap.allActions.Get(hash); exist { _actpoolMtc.WithLabelValues("existedAction").Inc() return action.ErrExistedInPool } + // Reject action if the gas price is lower than the threshold - if act.GasPrice().Cmp(ap.cfg.MinGasPrice()) < 0 { + if selp.GasPrice().Cmp(ap.cfg.MinGasPrice()) < 0 { _actpoolMtc.WithLabelValues("gasPriceLower").Inc() + actHash, _ := selp.Hash() log.L().Info("action rejected due to low gas price", - zap.String("actionHash", hex.EncodeToString(hash[:])), - zap.String("gasPrice", act.GasPrice().String())) + zap.String("actionHash", hex.EncodeToString(actHash[:])), + zap.String("gasPrice", selp.GasPrice().String())) return action.ErrUnderpriced } - if err := ap.validate(ctx, act); err != nil { - return err + + if _, ok := ap.senderBlackList[selp.SenderAddress().String()]; ok { + _actpoolMtc.WithLabelValues("blacklisted").Inc() + return errors.Wrap(action.ErrAddress, "action source address is blacklisted") } - caller := act.SenderAddress() - if caller == nil { - return action.ErrAddress + for _, ev := range ap.actionEnvelopeValidators { + span.AddEvent("ev.Validate") + if err := ev.Validate(ctx, *selp); err != nil { + return err + } } - return ap.enqueueAction(ctx, caller, act, hash, act.Nonce()) + return nil } // GetPendingNonce returns pending nonce in pool or confirmed nonce given an account address -func (ap *actPool) GetPendingNonce(addr string) (uint64, error) { - addrStr, err := address.FromString(addr) +func (ap *actPool) GetPendingNonce(addrStr string) (uint64, error) { + addr, err := address.FromString(addrStr) if err != nil { return 0, err } - ap.mutex.RLock() - defer ap.mutex.RUnlock() - - if queue, ok := ap.accountActs[addr]; ok { + if queue := ap.worker[ap.allocatedWorker(addr)].GetQueue(addr); queue != nil { return queue.PendingNonce(), nil } ctx := ap.context(context.Background()) - confirmedState, err := accountutil.AccountState(ctx, ap.sf, addrStr) + confirmedState, err := accountutil.AccountState(ctx, ap.sf, addr) if err != nil { return 0, err } @@ -257,14 +335,19 @@ func (ap *actPool) GetPendingNonce(addr string) (uint64, error) { } // GetUnconfirmedActs returns unconfirmed actions in pool given an account address -func (ap *actPool) GetUnconfirmedActs(addr string) []action.SealedEnvelope { - ap.mutex.RLock() - defer ap.mutex.RUnlock() +func (ap *actPool) GetUnconfirmedActs(addrStr string) []action.SealedEnvelope { + addr, err := address.FromString(addrStr) + if err != nil { + return []action.SealedEnvelope{} + } + var ret []action.SealedEnvelope - if queue, ok := ap.accountActs[addr]; ok { + if queue := ap.worker[ap.allocatedWorker(addr)].GetQueue(addr); queue != nil { ret = queue.AllActs() } - if desMap, ok := ap.accountDesActs[addr]; ok { + ap.accountDesActs.mu.Lock() + defer ap.accountDesActs.mu.Unlock() + if desMap, ok := ap.accountDesActs.acts[addrStr]; ok { if desMap != nil { sortActions := make(SortedActions, 0) for _, v := range desMap { @@ -279,22 +362,16 @@ func (ap *actPool) GetUnconfirmedActs(addr string) []action.SealedEnvelope { // GetActionByHash returns the pending action in pool given action's hash func (ap *actPool) GetActionByHash(hash hash.Hash256) (action.SealedEnvelope, error) { - ap.mutex.RLock() - defer ap.mutex.RUnlock() - - act, ok := ap.allActions[hash] + act, ok := ap.allActions.Get(hash) if !ok { return action.SealedEnvelope{}, errors.Wrapf(action.ErrNotFound, "action hash %x does not exist in pool", hash) } - return act, nil + return act.(action.SealedEnvelope), nil } // GetSize returns the act pool size func (ap *actPool) GetSize() uint64 { - ap.mutex.RLock() - defer ap.mutex.RUnlock() - - return uint64(len(ap.allActions)) + return uint64(ap.allActions.Count()) } // GetCapacity returns the act pool capacity @@ -304,10 +381,7 @@ func (ap *actPool) GetCapacity() uint64 { // GetGasSize returns the act pool gas size func (ap *actPool) GetGasSize() uint64 { - ap.mutex.RLock() - defer ap.mutex.RUnlock() - - return ap.gasInPool + return atomic.LoadUint64(&ap.gasInPool) } // GetGasCapacity returns the act pool gas capacity @@ -316,17 +390,16 @@ func (ap *actPool) GetGasCapacity() uint64 { } func (ap *actPool) Validate(ctx context.Context, selp action.SealedEnvelope) error { - ap.mutex.RLock() - defer ap.mutex.RUnlock() return ap.validate(ctx, selp) } func (ap *actPool) DeleteAction(caller address.Address) { - ap.mutex.RLock() - defer ap.mutex.RUnlock() - pendingActs := ap.accountActs[caller.String()].AllActs() - ap.removeInvalidActs(pendingActs) - delete(ap.accountActs, caller.String()) + worker := ap.worker[ap.allocatedWorker(caller)] + if queue := worker.GetQueue(caller); queue != nil { + pendingActs := queue.AllActs() + ap.removeInvalidActs(pendingActs) + worker.ResetAccount(caller) + } } func (ap *actPool) validate(ctx context.Context, selp action.SealedEnvelope) error { @@ -347,7 +420,7 @@ func (ap *actPool) validate(ctx context.Context, selp action.SealedEnvelope) err if err != nil { return err } - if _, ok := ap.allActions[selpHash]; ok { + if _, ok := ap.allActions.Get(selpHash); ok { return nil } for _, ev := range ap.actionEnvelopeValidators { @@ -360,114 +433,6 @@ func (ap *actPool) validate(ctx context.Context, selp action.SealedEnvelope) err return nil } -// ====================================== -// private functions -// ====================================== -func (ap *actPool) enqueueAction(ctx context.Context, addr address.Address, act action.SealedEnvelope, actHash hash.Hash256, actNonce uint64) error { - span := tracer.SpanFromContext(ctx) - defer span.End() - confirmedState, err := accountutil.AccountState(ctx, ap.sf, addr) - if err != nil { - _actpoolMtc.WithLabelValues("failedToGetNonce").Inc() - return errors.Wrapf(err, "failed to get sender's nonce for action %x", actHash) - } - pendingNonce := confirmedState.PendingNonce() - if actNonce < pendingNonce { - return action.ErrNonceTooLow - } - sender := addr.String() - queue := ap.accountActs[sender] - if queue == nil { - span.AddEvent("new queue") - queue = NewActQueue(ap, sender, WithTimeOut(ap.cfg.ActionExpiry)) - ap.accountActs[sender] = queue - // Initialize pending nonce and balance for new account - queue.SetPendingNonce(pendingNonce) - queue.SetPendingBalance(confirmedState.Balance) - } - - if actNonce-pendingNonce >= ap.cfg.MaxNumActsPerAcct { - // Nonce exceeds current range - log.L().Debug("Rejecting action because nonce is too large.", - log.Hex("hash", actHash[:]), - zap.Uint64("startNonce", pendingNonce), - zap.Uint64("actNonce", actNonce)) - _actpoolMtc.WithLabelValues("nonceTooLarge").Inc() - return action.ErrNonceTooHigh - } - - span.AddEvent("act cost") - cost, err := act.Cost() - if err != nil { - _actpoolMtc.WithLabelValues("failedToGetCost").Inc() - return errors.Wrapf(err, "failed to get cost of action %x", actHash) - } - if queue.PendingBalance().Cmp(cost) < 0 { - // Pending balance is insufficient - _actpoolMtc.WithLabelValues("insufficientBalance").Inc() - log.L().Info("insufficient balance for action", - zap.String("actionHash", hex.EncodeToString(actHash[:])), - zap.String("cost", cost.String()), - zap.String("pendingBalance", queue.PendingBalance().String()), - zap.String("sender", sender), - ) - return action.ErrInsufficientFunds - } - - span.AddEvent("queue put") - if err := queue.Put(act); err != nil { - _actpoolMtc.WithLabelValues("failedPutActQueue").Inc() - log.L().Info("failed put action into ActQueue", - zap.String("actionHash", hex.EncodeToString(actHash[:]))) - return err - } - ap.allActions[actHash] = act - - //add actions to destination map - desAddress, ok := act.Destination() - if ok && !strings.EqualFold(sender, desAddress) { - desQueue := ap.accountDesActs[desAddress] - if desQueue == nil { - ap.accountDesActs[desAddress] = make(map[hash.Hash256]action.SealedEnvelope) - } - ap.accountDesActs[desAddress][actHash] = act - } - - span.AddEvent("act.IntrinsicGas") - intrinsicGas, _ := act.IntrinsicGas() - ap.gasInPool += intrinsicGas - // If the pending nonce equals this nonce, update queue - span.AddEvent("queue.PendingNonce") - nonce := queue.PendingNonce() - if actNonce == nonce { - span.AddEvent("ap.updateAccount") - ap.updateAccount(sender) - } - return nil -} - -// removeConfirmedActs removes processed (committed to block) actions from pool -func (ap *actPool) removeConfirmedActs(ctx context.Context) { - for from, queue := range ap.accountActs { - addr, _ := address.FromString(from) - confirmedState, err := accountutil.AccountState(ctx, ap.sf, addr) - if err != nil { - log.L().Error("Error when removing confirmed actions", zap.Error(err)) - return - } - pendingNonce := confirmedState.PendingNonce() - // Remove all actions that are committed to new block - acts := queue.FilterNonce(pendingNonce) - ap.removeInvalidActs(acts) - //del actions in destination map - ap.deleteAccountDestinationActions(acts...) - // Delete the queue entry if it becomes empty - if queue.Empty() { - delete(ap.accountActs, from) - } - } -} - func (ap *actPool) removeInvalidActs(acts []action.SealedEnvelope) { for _, act := range acts { hash, err := act.Hash() @@ -476,9 +441,9 @@ func (ap *actPool) removeInvalidActs(acts []action.SealedEnvelope) { continue } log.L().Debug("Removed invalidated action.", log.Hex("hash", hash[:])) - delete(ap.allActions, hash) + ap.allActions.Delete(hash) intrinsicGas, _ := act.IntrinsicGas() - ap.subGasFromPool(intrinsicGas) + atomic.AddUint64(&ap.gasInPool, ^uint64(intrinsicGas-1)) //del actions in destination map ap.deleteAccountDestinationActions(act) } @@ -486,66 +451,47 @@ func (ap *actPool) removeInvalidActs(acts []action.SealedEnvelope) { // deleteAccountDestinationActions just for destination map func (ap *actPool) deleteAccountDestinationActions(acts ...action.SealedEnvelope) { + ap.accountDesActs.mu.Lock() + defer ap.accountDesActs.mu.Unlock() for _, act := range acts { hash, err := act.Hash() if err != nil { log.L().Debug("Skipping action due to hash error", zap.Error(err)) continue } - desAddress, ok := act.Destination() - if ok { - dst := ap.accountDesActs[desAddress] + if desAddress, ok := act.Destination(); ok { + dst := ap.accountDesActs.acts[desAddress] if dst != nil { delete(dst, hash) } + if len(dst) == 0 { + delete(ap.accountDesActs.acts, desAddress) + } } } } -// updateAccount updates queue's status and remove invalidated actions from pool if necessary -func (ap *actPool) updateAccount(sender string) { - queue := ap.accountActs[sender] - acts := queue.UpdateQueue(queue.PendingNonce()) - if len(acts) > 0 { - ap.removeInvalidActs(acts) - } - // Delete the queue entry if it becomes empty - if queue.Empty() { - delete(ap.accountActs, sender) - } -} - func (ap *actPool) context(ctx context.Context) context.Context { return genesis.WithGenesisContext(ctx, ap.g) } -func (ap *actPool) reset() { - timer := ap.timerFactory.NewTimer("reset") - defer timer.End() - - ctx := ap.context(context.Background()) - // Remove confirmed actions in actpool - ap.removeConfirmedActs(ctx) - for from, queue := range ap.accountActs { - // Reset pending balance for each account - addr, _ := address.FromString(from) - state, err := accountutil.AccountState(ctx, ap.sf, addr) - if err != nil { - log.L().Error("Error when resetting actpool state.", zap.Error(err)) - return +func (ap *actPool) enqueue(ctx context.Context, act action.SealedEnvelope) error { + var errChan = make(chan error) // unused errChan will be garbage-collected + ap.jobQueue[ap.allocatedWorker(act.SenderAddress())] <- workerJob{ctx, act, errChan} + + for { + select { + case <-ctx.Done(): + log.L().Error("enqueue actpool fails", zap.Error(ctx.Err())) + return ctx.Err() + case ret := <-errChan: + return ret } - queue.SetPendingBalance(state.Balance) - - // Reset pending nonce and remove invalid actions for each account - queue.SetPendingNonce(state.PendingNonce()) - ap.updateAccount(from) } } -func (ap *actPool) subGasFromPool(gas uint64) { - if ap.gasInPool < gas { - ap.gasInPool = 0 - return - } - ap.gasInPool -= gas +func (ap *actPool) allocatedWorker(senderAddr address.Address) int { + senderBytes := senderAddr.Bytes() + var lastByte uint8 = senderBytes[len(senderBytes)-1] + return int(lastByte) % _numWorker } diff --git a/actpool/actpool_test.go b/actpool/actpool_test.go index 35b532ae04..3ec07e4086 100644 --- a/actpool/actpool_test.go +++ b/actpool/actpool_test.go @@ -12,25 +12,23 @@ import ( "testing" "time" - "github.com/iotexproject/iotex-core/actpool/actioniterator" - "github.com/iotexproject/iotex-core/blockchain/genesis" - "github.com/iotexproject/iotex-core/state" - "github.com/iotexproject/iotex-core/test/mock/mock_chainmanager" - "github.com/iotexproject/iotex-core/test/mock/mock_sealed_envelope_validator" - "github.com/golang/mock/gomock" + "github.com/iotexproject/iotex-address/address" "github.com/pkg/errors" "github.com/stretchr/testify/require" - "github.com/iotexproject/iotex-address/address" - "github.com/iotexproject/iotex-core/action" "github.com/iotexproject/iotex-core/action/protocol" "github.com/iotexproject/iotex-core/action/protocol/account" accountutil "github.com/iotexproject/iotex-core/action/protocol/account/util" "github.com/iotexproject/iotex-core/action/protocol/rewarding" + "github.com/iotexproject/iotex-core/actpool/actioniterator" "github.com/iotexproject/iotex-core/blockchain" + "github.com/iotexproject/iotex-core/blockchain/genesis" + "github.com/iotexproject/iotex-core/state" "github.com/iotexproject/iotex-core/test/identityset" + "github.com/iotexproject/iotex-core/test/mock/mock_chainmanager" + "github.com/iotexproject/iotex-core/test/mock/mock_sealed_envelope_validator" "github.com/iotexproject/iotex-core/testutil" ) @@ -42,7 +40,6 @@ const ( var ( _addr1 = identityset.Address(28).String() - _pubKey1 = identityset.PrivateKey(28).PublicKey() _priKey1 = identityset.PrivateKey(28) _addr2 = identityset.Address(29).String() _priKey2 = identityset.PrivateKey(29) @@ -174,22 +171,22 @@ func TestActPool_AddActs(t *testing.T) { require.NoError(ap.Add(ctx, tsf7)) require.NoError(ap.Add(ctx, tsf8)) - pBalance1, _ := ap.getPendingBalance(_addr1) + pBalance1, _ := getPendingBalance(ap, _addr1) require.Equal(uint64(10), pBalance1.Uint64()) - pNonce1, _ := ap.getPendingNonce(_addr1) + pNonce1, _ := ap.GetPendingNonce(_addr1) require.Equal(uint64(5), pNonce1) - pBalance2, _ := ap.getPendingBalance(_addr2) + pBalance2, _ := getPendingBalance(ap, _addr2) require.Equal(uint64(5), pBalance2.Uint64()) - pNonce2, _ := ap.getPendingNonce(_addr2) + pNonce2, _ := ap.GetPendingNonce(_addr2) require.Equal(uint64(2), pNonce2) tsf9, err := action.SignedTransfer(_addr2, _priKey2, uint64(2), big.NewInt(3), []byte{}, uint64(100000), big.NewInt(0)) require.NoError(err) require.NoError(ap.Add(ctx, tsf9)) - pBalance2, _ = ap.getPendingBalance(_addr2) + pBalance2, _ = getPendingBalance(ap, _addr2) require.Equal(uint64(1), pBalance2.Uint64()) - pNonce2, _ = ap.getPendingNonce(_addr2) + pNonce2, _ = ap.GetPendingNonce(_addr2) require.Equal(uint64(4), pNonce2) // Error Case Handling // Case I: Action source address is blacklisted @@ -210,7 +207,7 @@ func TestActPool_AddActs(t *testing.T) { require.NoError(err) nTsfHash, err := nTsf.Hash() require.NoError(err) - ap2.allActions[nTsfHash] = nTsf + ap2.allActions.Set(nTsfHash, nTsf) } err = ap2.Add(ctx, tsf1) require.Equal(action.ErrTxPoolOverflow, errors.Cause(err)) @@ -226,7 +223,7 @@ func TestActPool_AddActs(t *testing.T) { require.NoError(err) nTsfHash, err := nTsf.Hash() require.NoError(err) - ap3.allActions[nTsfHash] = nTsf + ap3.allActions.Set(nTsfHash, nTsf) intrinsicGas, err := nTsf.IntrinsicGas() require.NoError(err) ap3.gasInPool += intrinsicGas @@ -404,8 +401,10 @@ func TestActPool_removeConfirmedActs(t *testing.T) { require.NoError(ap.Add(ctx, tsf3)) require.NoError(ap.Add(ctx, tsf4)) - require.Equal(4, len(ap.allActions)) - require.NotNil(ap.accountActs[_addr1]) + require.Equal(4, ap.allActions.Count()) + addr, err := address.FromString(_addr1) + require.NoError(err) + require.NotNil(ap.worker[ap.allocatedWorker(addr)].GetQueue(addr)) sf.EXPECT().State(gomock.Any(), gomock.Any()).DoAndReturn(func(account interface{}, opts ...protocol.StateOption) (uint64, error) { acct, ok := account.(*state.Account) require.True(ok) @@ -416,9 +415,9 @@ func TestActPool_removeConfirmedActs(t *testing.T) { return 0, nil }).Times(1) - ap.removeConfirmedActs(ctx) - require.Equal(0, len(ap.allActions)) - require.Nil(ap.accountActs[_addr1]) + ap.Reset() + require.Equal(0, ap.allActions.Count()) + require.True(ap.worker[ap.allocatedWorker(addr)].GetQueue(addr).Empty()) } func TestActPool_Reset(t *testing.T) { @@ -542,35 +541,35 @@ func TestActPool_Reset(t *testing.T) { // Check confirmed nonce, pending nonce, and pending balance after adding Tsfs above for each account // ap1 // Addr1 - ap1PNonce1, _ := ap1.getPendingNonce(_addr1) + ap1PNonce1, _ := ap1.GetPendingNonce(_addr1) require.Equal(uint64(3), ap1PNonce1) - ap1PBalance1, _ := ap1.getPendingBalance(_addr1) + ap1PBalance1, _ := getPendingBalance(ap1, _addr1) require.Equal(big.NewInt(20).Uint64(), ap1PBalance1.Uint64()) // Addr2 - ap1PNonce2, _ := ap1.getPendingNonce(_addr2) + ap1PNonce2, _ := ap1.GetPendingNonce(_addr2) require.Equal(uint64(3), ap1PNonce2) - ap1PBalance2, _ := ap1.getPendingBalance(_addr2) + ap1PBalance2, _ := getPendingBalance(ap1, _addr2) require.Equal(big.NewInt(50).Uint64(), ap1PBalance2.Uint64()) // Addr3 - ap1PNonce3, _ := ap1.getPendingNonce(_addr3) + ap1PNonce3, _ := ap1.GetPendingNonce(_addr3) require.Equal(uint64(3), ap1PNonce3) - ap1PBalance3, _ := ap1.getPendingBalance(_addr3) + ap1PBalance3, _ := getPendingBalance(ap1, _addr3) require.Equal(big.NewInt(100).Uint64(), ap1PBalance3.Uint64()) // ap2 // Addr1 - ap2PNonce1, _ := ap2.getPendingNonce(_addr1) + ap2PNonce1, _ := ap2.GetPendingNonce(_addr1) require.Equal(uint64(4), ap2PNonce1) - ap2PBalance1, _ := ap2.getPendingBalance(_addr1) + ap2PBalance1, _ := getPendingBalance(ap2, _addr1) require.Equal(big.NewInt(0).Uint64(), ap2PBalance1.Uint64()) // Addr2 - ap2PNonce2, _ := ap2.getPendingNonce(_addr2) + ap2PNonce2, _ := ap2.GetPendingNonce(_addr2) require.Equal(uint64(3), ap2PNonce2) - ap2PBalance2, _ := ap2.getPendingBalance(_addr2) + ap2PBalance2, _ := getPendingBalance(ap2, _addr2) require.Equal(big.NewInt(30).Uint64(), ap2PBalance2.Uint64()) // Addr3 - ap2PNonce3, _ := ap2.getPendingNonce(_addr3) + ap2PNonce3, _ := ap2.GetPendingNonce(_addr3) require.Equal(uint64(3), ap2PNonce3) - ap2PBalance3, _ := ap2.getPendingBalance(_addr3) + ap2PBalance3, _ := getPendingBalance(ap2, _addr3) require.Equal(big.NewInt(50).Uint64(), ap2PBalance3.Uint64()) // Let ap1 be BP's actpool balances[0] = big.NewInt(220) @@ -585,35 +584,35 @@ func TestActPool_Reset(t *testing.T) { // Check confirmed nonce, pending nonce, and pending balance after resetting actpool for each account // ap1 // Addr1 - ap1PNonce1, _ = ap1.getPendingNonce(_addr1) + ap1PNonce1, _ = ap1.GetPendingNonce(_addr1) require.Equal(uint64(3), ap1PNonce1) - ap1PBalance1, _ = ap1.getPendingBalance(_addr1) + ap1PBalance1, _ = getPendingBalance(ap1, _addr1) require.Equal(big.NewInt(220).Uint64(), ap1PBalance1.Uint64()) // Addr2 - ap1PNonce2, _ = ap1.getPendingNonce(_addr2) + ap1PNonce2, _ = ap1.GetPendingNonce(_addr2) require.Equal(uint64(3), ap1PNonce2) - ap1PBalance2, _ = ap1.getPendingBalance(_addr2) + ap1PBalance2, _ = getPendingBalance(ap1, _addr2) require.Equal(big.NewInt(200).Uint64(), ap1PBalance2.Uint64()) // Addr3 - ap1PNonce3, _ = ap1.getPendingNonce(_addr3) + ap1PNonce3, _ = ap1.GetPendingNonce(_addr3) require.Equal(uint64(3), ap1PNonce3) - ap1PBalance3, _ = ap1.getPendingBalance(_addr3) + ap1PBalance3, _ = getPendingBalance(ap1, _addr3) require.Equal(big.NewInt(180).Uint64(), ap1PBalance3.Uint64()) // ap2 // Addr1 - ap2PNonce1, _ = ap2.getPendingNonce(_addr1) + ap2PNonce1, _ = ap2.GetPendingNonce(_addr1) require.Equal(uint64(4), ap2PNonce1) - ap2PBalance1, _ = ap2.getPendingBalance(_addr1) + ap2PBalance1, _ = getPendingBalance(ap2, _addr1) require.Equal(big.NewInt(200).Uint64(), ap2PBalance1.Uint64()) // Addr2 - ap2PNonce2, _ = ap2.getPendingNonce(_addr2) + ap2PNonce2, _ = ap2.GetPendingNonce(_addr2) require.Equal(uint64(3), ap2PNonce2) - ap2PBalance2, _ = ap2.getPendingBalance(_addr2) + ap2PBalance2, _ = getPendingBalance(ap2, _addr2) require.Equal(big.NewInt(200).Uint64(), ap2PBalance2.Uint64()) // Addr3 - ap2PNonce3, _ = ap2.getPendingNonce(_addr3) + ap2PNonce3, _ = ap2.GetPendingNonce(_addr3) require.Equal(uint64(3), ap2PNonce3) - ap2PBalance3, _ = ap2.getPendingBalance(_addr3) + ap2PBalance3, _ = getPendingBalance(ap2, _addr3) require.Equal(big.NewInt(180).Uint64(), ap2PBalance3.Uint64()) // Add more Tsfs after resetting // Tsfs To be added to ap1 only @@ -642,35 +641,35 @@ func TestActPool_Reset(t *testing.T) { // Check confirmed nonce, pending nonce, and pending balance after adding Tsfs above for each account // ap1 // Addr1 - ap1PNonce1, _ = ap1.getPendingNonce(_addr1) + ap1PNonce1, _ = ap1.GetPendingNonce(_addr1) require.Equal(uint64(3), ap1PNonce1) - ap1PBalance1, _ = ap1.getPendingBalance(_addr1) + ap1PBalance1, _ = getPendingBalance(ap1, _addr1) require.Equal(big.NewInt(220).Uint64(), ap1PBalance1.Uint64()) // Addr2 - ap1PNonce2, _ = ap1.getPendingNonce(_addr2) + ap1PNonce2, _ = ap1.GetPendingNonce(_addr2) require.Equal(uint64(3), ap1PNonce2) - ap1PBalance2, _ = ap1.getPendingBalance(_addr2) + ap1PBalance2, _ = getPendingBalance(ap1, _addr2) require.Equal(big.NewInt(200).Uint64(), ap1PBalance2.Uint64()) // Addr3 - ap1PNonce3, _ = ap1.getPendingNonce(_addr3) + ap1PNonce3, _ = ap1.GetPendingNonce(_addr3) require.Equal(uint64(5), ap1PNonce3) - ap1PBalance3, _ = ap1.getPendingBalance(_addr3) + ap1PBalance3, _ = getPendingBalance(ap1, _addr3) require.Equal(big.NewInt(0).Uint64(), ap1PBalance3.Uint64()) // ap2 // Addr1 - ap2PNonce1, _ = ap2.getPendingNonce(_addr1) + ap2PNonce1, _ = ap2.GetPendingNonce(_addr1) require.Equal(uint64(5), ap2PNonce1) - ap2PBalance1, _ = ap2.getPendingBalance(_addr1) - require.Equal(big.NewInt(50).Uint64(), ap2PBalance1.Uint64()) + ap2PBalance2, _ = getPendingBalance(ap2, _addr2) + require.Equal(big.NewInt(10).Uint64(), ap2PBalance2.Uint64()) // Addr2 - ap2PNonce2, _ = ap2.getPendingNonce(_addr2) + ap2PNonce2, _ = ap2.GetPendingNonce(_addr2) require.Equal(uint64(5), ap2PNonce2) - ap2PBalance2, _ = ap2.getPendingBalance(_addr2) + ap2PBalance2, _ = getPendingBalance(ap2, _addr2) require.Equal(big.NewInt(10).Uint64(), ap2PBalance2.Uint64()) // Addr3 - ap2PNonce3, _ = ap2.getPendingNonce(_addr3) + ap2PNonce3, _ = ap2.GetPendingNonce(_addr3) require.Equal(uint64(3), ap2PNonce3) - ap2PBalance3, _ = ap2.getPendingBalance(_addr3) + ap2PBalance3, _ = getPendingBalance(ap2, _addr3) require.Equal(big.NewInt(180).Uint64(), ap2PBalance3.Uint64()) // Let ap2 be BP's actpool balances[0] = big.NewInt(140) @@ -686,35 +685,35 @@ func TestActPool_Reset(t *testing.T) { // Check confirmed nonce, pending nonce, and pending balance after resetting actpool for each account // ap1 // Addr1 - ap1PNonce1, _ = ap1.getPendingNonce(_addr1) + ap1PNonce1, _ = ap1.GetPendingNonce(_addr1) require.Equal(uint64(5), ap1PNonce1) - ap1PBalance1, _ = ap1.getPendingBalance(_addr1) + ap1PBalance1, _ = getPendingBalance(ap1, _addr1) require.Equal(big.NewInt(140).Uint64(), ap1PBalance1.Uint64()) // Addr2 - ap1PNonce2, _ = ap1.getPendingNonce(_addr2) + ap1PNonce2, _ = ap1.GetPendingNonce(_addr2) require.Equal(uint64(5), ap1PNonce2) - ap1PBalance2, _ = ap1.getPendingBalance(_addr2) + ap1PBalance2, _ = getPendingBalance(ap1, _addr2) require.Equal(big.NewInt(180).Uint64(), ap1PBalance2.Uint64()) // Addr3 - ap1PNonce3, _ = ap1.getPendingNonce(_addr3) + ap1PNonce3, _ = ap1.GetPendingNonce(_addr3) require.Equal(uint64(5), ap1PNonce3) - ap1PBalance3, _ = ap1.getPendingBalance(_addr3) + ap1PBalance3, _ = getPendingBalance(ap1, _addr3) require.Equal(big.NewInt(100).Uint64(), ap1PBalance3.Uint64()) // ap2 // Addr1 - ap2PNonce1, _ = ap2.getPendingNonce(_addr1) + ap2PNonce1, _ = ap2.GetPendingNonce(_addr1) require.Equal(uint64(5), ap2PNonce1) - ap2PBalance1, _ = ap2.getPendingBalance(_addr1) + ap2PBalance1, _ = getPendingBalance(ap2, _addr1) require.Equal(big.NewInt(140).Uint64(), ap2PBalance1.Uint64()) // Addr2 - ap2PNonce2, _ = ap2.getPendingNonce(_addr2) + ap2PNonce2, _ = ap2.GetPendingNonce(_addr2) require.Equal(uint64(5), ap2PNonce2) - ap2PBalance2, _ = ap2.getPendingBalance(_addr2) + ap2PBalance2, _ = getPendingBalance(ap2, _addr2) require.Equal(big.NewInt(180).Uint64(), ap2PBalance2.Uint64()) // Addr3 - ap2PNonce3, _ = ap2.getPendingNonce(_addr3) + ap2PNonce3, _ = ap2.GetPendingNonce(_addr3) require.Equal(uint64(3), ap2PNonce3) - ap2PBalance3, _ = ap2.getPendingBalance(_addr3) + ap2PBalance3, _ = getPendingBalance(ap2, _addr3) require.Equal(big.NewInt(280).Uint64(), ap2PBalance3.Uint64()) // Add two more players @@ -755,14 +754,14 @@ func TestActPool_Reset(t *testing.T) { // Check confirmed nonce, pending nonce, and pending balance after adding actions above for account4 and account5 // ap1 // Addr4 - ap1PNonce4, _ := ap1.getPendingNonce(_addr4) + ap1PNonce4, _ := ap1.GetPendingNonce(_addr4) require.Equal(uint64(2), ap1PNonce4) - ap1PBalance4, _ := ap1.getPendingBalance(_addr4) + ap1PBalance4, _ := getPendingBalance(ap1, _addr4) require.Equal(big.NewInt(0).Uint64(), ap1PBalance4.Uint64()) // Addr5 - ap1PNonce5, _ := ap1.getPendingNonce(_addr5) + ap1PNonce5, _ := ap1.GetPendingNonce(_addr5) require.Equal(uint64(3), ap1PNonce5) - ap1PBalance5, _ := ap1.getPendingBalance(_addr5) + ap1PBalance5, _ := getPendingBalance(ap1, _addr5) require.Equal(big.NewInt(0).Uint64(), ap1PBalance5.Uint64()) // Let ap1 be BP's actpool balances[3] = big.NewInt(10) @@ -774,14 +773,14 @@ func TestActPool_Reset(t *testing.T) { // Check confirmed nonce, pending nonce, and pending balance after resetting actpool for each account // ap1 // Addr4 - ap1PNonce4, _ = ap1.getPendingNonce(_addr4) + ap1PNonce4, _ = ap1.GetPendingNonce(_addr4) require.Equal(uint64(2), ap1PNonce4) - ap1PBalance4, _ = ap1.getPendingBalance(_addr4) + ap1PBalance4, _ = getPendingBalance(ap1, _addr4) require.Equal(big.NewInt(10).Uint64(), ap1PBalance4.Uint64()) // Addr5 - ap1PNonce5, _ = ap1.getPendingNonce(_addr5) + ap1PNonce5, _ = ap1.GetPendingNonce(_addr5) require.Equal(uint64(3), ap1PNonce5) - ap1PBalance5, _ = ap1.getPendingBalance(_addr5) + ap1PBalance5, _ = getPendingBalance(ap1, _addr5) require.Equal(big.NewInt(20).Uint64(), ap1PBalance5.Uint64()) } @@ -824,11 +823,15 @@ func TestActPool_removeInvalidActs(t *testing.T) { hash2, err := tsf4.Hash() require.NoError(err) acts := []action.SealedEnvelope{tsf1, tsf4} - require.NotNil(ap.allActions[hash1]) - require.NotNil(ap.allActions[hash2]) + _, exist1 := ap.allActions.Get(hash1) + require.True(exist1) + _, exist2 := ap.allActions.Get(hash2) + require.True(exist2) ap.removeInvalidActs(acts) - require.Equal(action.SealedEnvelope{}, ap.allActions[hash1]) - require.Equal(action.SealedEnvelope{}, ap.allActions[hash2]) + _, exist1 = ap.allActions.Get(hash1) + require.False(exist1) + _, exist2 = ap.allActions.Get(hash2) + require.False(exist2) } func TestActPool_GetPendingNonce(t *testing.T) { @@ -950,7 +953,7 @@ func TestActPool_GetActionByHash(t *testing.T) { hash2, err := tsf2.Hash() require.NoError(err) - ap.allActions[hash1] = tsf1 + ap.allActions.Set(hash1, tsf1) act, err := ap.GetActionByHash(hash1) require.NoError(err) require.Equal(tsf1, act) @@ -958,7 +961,7 @@ func TestActPool_GetActionByHash(t *testing.T) { require.Equal(action.ErrNotFound, errors.Cause(err)) require.Equal(action.SealedEnvelope{}, act) - ap.allActions[hash2] = tsf2 + ap.allActions.Set(hash2, tsf2) act, err = ap.GetActionByHash(hash2) require.NoError(err) require.Equal(tsf2, act) @@ -1026,7 +1029,7 @@ func TestActPool_GetSize(t *testing.T) { return 0, nil }).Times(1) - ap.removeConfirmedActs(ctx) + ap.Reset() require.Equal(uint64(0), ap.GetSize()) require.Equal(uint64(0), ap.GetGasSize()) } @@ -1091,20 +1094,20 @@ func TestActPool_SpeedUpAction(t *testing.T) { require.NoError(ap.Add(ctx, tsf2)) // check account and actpool status - pBalance1, _ := ap.getPendingBalance(_addr1) + pBalance1, _ := getPendingBalance(ap, _addr1) require.Equal(uint64(10000000-10), pBalance1.Uint64()) - pNonce1, _ := ap.getPendingNonce(_addr1) + pNonce1, _ := ap.GetPendingNonce(_addr1) require.Equal(uint64(2), pNonce1) - pBalance2, _ := ap.getPendingBalance(_addr2) + pBalance2, _ := getPendingBalance(ap, _addr2) require.Equal(uint64(10000000-5-10000), pBalance2.Uint64()) - pNonce2, _ := ap.getPendingNonce(_addr2) + pNonce2, _ := ap.GetPendingNonce(_addr2) require.Equal(uint64(2), pNonce2) // A send action tsf3 with nonce 1 and higher gas price require.NoError(ap.Add(ctx, tsf3)) // check account and actpool status again after new action is inserted - pNonce3, _ := ap.getPendingNonce(_addr1) + pNonce3, _ := ap.GetPendingNonce(_addr1) require.Equal(uint64(2), pNonce3) ai := actioniterator.NewActionIterator(ap.PendingActionMap()) @@ -1128,44 +1131,19 @@ func TestActPool_SpeedUpAction(t *testing.T) { } } -// Helper function to return the correct pending nonce just in case of empty queue -func (ap *actPool) getPendingNonce(addr string) (uint64, error) { - if queue, ok := ap.accountActs[addr]; ok { - return queue.PendingNonce(), nil - } - _addr1, err := address.FromString(addr) - if err != nil { - return 0, err - } - committedState, err := accountutil.AccountState( - genesis.WithGenesisContext(context.Background(), genesis.Default), - ap.sf, - _addr1, - ) - if err != nil { - return 0, err - } - return committedState.PendingNonce(), nil -} - // Helper function to return the correct pending balance just in case of empty queue -func (ap *actPool) getPendingBalance(addr string) (*big.Int, error) { - if queue, ok := ap.accountActs[addr]; ok { - return queue.PendingBalance(), nil - } - _addr1, err := address.FromString(addr) +func getPendingBalance(ap *actPool, addrStr string) (*big.Int, error) { + addr, err := address.FromString(addrStr) if err != nil { return nil, err } - state, err := accountutil.AccountState( - genesis.WithGenesisContext(context.Background(), genesis.Default), - ap.sf, - _addr1, - ) + if queue := ap.worker[ap.allocatedWorker(addr)].GetQueue(addr); queue != nil { + return queue.PendingBalance(), nil + } + state, err := accountutil.AccountState(genesis.WithGenesisContext(context.Background(), genesis.Default), ap.sf, addr) if err != nil { return nil, err } - return state.Balance, nil } @@ -1179,14 +1157,6 @@ func getActPoolCfg() Config { } } -func actionMap2Slice(actMap map[string][]action.SealedEnvelope) []action.SealedEnvelope { - acts := make([]action.SealedEnvelope, 0) - for _, parts := range actMap { - acts = append(acts, parts...) - } - return acts -} - func lenPendingActionMap(acts map[string][]action.SealedEnvelope) int { l := 0 for _, part := range acts { diff --git a/actpool/actqueue.go b/actpool/actqueue.go index e870450843..9f36d578a3 100644 --- a/actpool/actqueue.go +++ b/actpool/actqueue.go @@ -10,6 +10,7 @@ import ( "context" "math/big" "sort" + "sync" "time" "github.com/facebookgo/clock" @@ -70,6 +71,7 @@ type ActQueue interface { Empty() bool PendingActs(context.Context) []action.SealedEnvelope AllActs() []action.SealedEnvelope + Reset() } // actQueue is a queue of actions from an account @@ -86,11 +88,7 @@ type actQueue struct { pendingBalance *big.Int clock clock.Clock ttl time.Duration -} - -// ActQueueOption is the option for actQueue. -type ActQueueOption interface { - SetActQueueOption(*actQueue) + mu sync.RWMutex } // NewActQueue create a new action queue @@ -113,6 +111,8 @@ func NewActQueue(ap *actPool, address string, ops ...ActQueueOption) ActQueue { // Put inserts a new action into the map, also updating the queue's nonce index func (q *actQueue) Put(act action.SealedEnvelope) error { + q.mu.Lock() + defer q.mu.Unlock() nonce := act.Nonce() if actInPool, exist := q.items[nonce]; exist { // act of higher gas price cut in line @@ -136,6 +136,8 @@ func (q *actQueue) Put(act action.SealedEnvelope) error { // FilterNonce removes all actions from the map with a nonce lower than the given threshold func (q *actQueue) FilterNonce(threshold uint64) []action.SealedEnvelope { + q.mu.Lock() + defer q.mu.Unlock() var removed []action.SealedEnvelope // Pop off priority queue and delete corresponding entries from map until the threshold is reached for q.index.Len() > 0 && (q.index)[0].nonce < threshold { @@ -173,6 +175,8 @@ func (q *actQueue) cleanTimeout() []action.SealedEnvelope { // UpdateQueue updates the pending nonce and balance of the queue func (q *actQueue) UpdateQueue(nonce uint64) []action.SealedEnvelope { + q.mu.Lock() + defer q.mu.Unlock() // First remove all timed out actions removedFromQueue := q.cleanTimeout() @@ -221,31 +225,43 @@ func (q *actQueue) UpdateQueue(nonce uint64) []action.SealedEnvelope { // SetPendingNonce sets pending nonce for the queue func (q *actQueue) SetPendingNonce(nonce uint64) { + q.mu.Lock() + defer q.mu.Unlock() q.pendingNonce = nonce } // PendingNonce returns the current pending nonce of the queue func (q *actQueue) PendingNonce() uint64 { + q.mu.RLock() + defer q.mu.RUnlock() return q.pendingNonce } // SetPendingBalance sets pending balance for the queue func (q *actQueue) SetPendingBalance(balance *big.Int) { + q.mu.Lock() + defer q.mu.Unlock() q.pendingBalance = balance } // PendingBalance returns the current pending balance of the queue func (q *actQueue) PendingBalance() *big.Int { + q.mu.RLock() + defer q.mu.RUnlock() return q.pendingBalance } // Len returns the length of the action map func (q *actQueue) Len() int { + q.mu.RLock() + defer q.mu.RUnlock() return len(q.items) } // Empty returns whether the queue of actions is empty or not func (q *actQueue) Empty() bool { + q.mu.RLock() + defer q.mu.RUnlock() return q.Len() == 0 } @@ -266,6 +282,8 @@ func (q *actQueue) PendingActs(ctx context.Context) []action.SealedEnvelope { return nil } nonce := confirmedState.PendingNonce() + q.mu.RLock() + defer q.mu.RUnlock() for ; ; nonce++ { if _, exist := q.items[nonce]; !exist { break @@ -277,8 +295,10 @@ func (q *actQueue) PendingActs(ctx context.Context) []action.SealedEnvelope { // AllActs returns all the actions currently in queue func (q *actQueue) AllActs() []action.SealedEnvelope { + q.mu.Lock() + defer q.mu.Unlock() acts := make([]action.SealedEnvelope, 0, len(q.items)) - if q.Len() == 0 { + if len(q.items) == 0 { return acts } sort.Sort(q.index) @@ -313,3 +333,13 @@ func (q *actQueue) enoughBalance(act action.SealedEnvelope, updateBalance bool) return true } + +// Reset resets the queue +func (q *actQueue) Reset() { + q.mu.Lock() + defer q.mu.Unlock() + q.items = make(map[uint64]action.SealedEnvelope) + q.index = noncePriorityQueue{} + q.pendingNonce = 1 + q.pendingBalance = big.NewInt(0) +} diff --git a/actpool/options.go b/actpool/options.go index dafbd905ad..821f9523ab 100644 --- a/actpool/options.go +++ b/actpool/options.go @@ -11,6 +11,11 @@ import ( "github.com/facebookgo/clock" ) +// ActQueueOption is the option for actQueue. +type ActQueueOption interface { + SetActQueueOption(*actQueue) +} + type clockOption struct{ c clock.Clock } // WithClock returns an option to overwrite clock. diff --git a/actpool/queueworker.go b/actpool/queueworker.go new file mode 100644 index 0000000000..a517f73c65 --- /dev/null +++ b/actpool/queueworker.go @@ -0,0 +1,286 @@ +package actpool + +import ( + "context" + "encoding/hex" + "errors" + "math/big" + "strings" + "sync" + "sync/atomic" + + "github.com/iotexproject/go-pkgs/cache/ttl" + "github.com/iotexproject/go-pkgs/hash" + "github.com/iotexproject/iotex-address/address" + "go.uber.org/zap" + + "github.com/iotexproject/iotex-core/action" + accountutil "github.com/iotexproject/iotex-core/action/protocol/account/util" + "github.com/iotexproject/iotex-core/pkg/log" + "github.com/iotexproject/iotex-core/pkg/tracer" +) + +type ( + queueWorker struct { + queue chan workerJob + ap *actPool + mu sync.RWMutex + accountActs map[string]ActQueue + emptyAccounts *ttl.Cache + } + + workerJob struct { + ctx context.Context + act action.SealedEnvelope + err chan error + } + + pendingActions struct { + sender string + acts []action.SealedEnvelope + } +) + +func newQueueWorker(ap *actPool, jobQueue chan workerJob) *queueWorker { + acc, _ := ttl.NewCache() + return &queueWorker{ + queue: jobQueue, + ap: ap, + accountActs: make(map[string]ActQueue), + emptyAccounts: acc, + } +} + +func (worker *queueWorker) Start() error { + if worker.queue == nil || worker.ap == nil { + return errors.New("worker is invalid") + } + go func() { + for { + job, more := <-worker.queue + if !more { // worker chan is closed + return + } + job.err <- worker.Handle(job) + } + }() + return nil +} + +func (worker *queueWorker) Stop() error { + close(worker.queue) + return nil +} + +func (worker *queueWorker) Handle(job workerJob) error { + ctx := job.ctx + // ctx is canceled or timeout + if ctx.Err() != nil { + return ctx.Err() + } + + var ( + span = tracer.SpanFromContext(ctx) + act = job.act + sender = act.SenderAddress().String() + actHash, _ = act.Hash() + intrinsicGas, _ = act.IntrinsicGas() + ) + defer span.End() + + nonce, balance, err := worker.getConfirmedState(ctx, act.SenderAddress()) + if err != nil { + return err + } + + if err := worker.checkSelpWithState(&act, nonce, balance); err != nil { + return err + } + + if err := worker.putAction(sender, act, nonce, balance); err != nil { + return err + } + + worker.ap.allActions.Set(actHash, act) + + if desAddress, ok := act.Destination(); ok && !strings.EqualFold(sender, desAddress) { + worker.addDestinationMap(act) + } + + atomic.AddUint64(&worker.ap.gasInPool, intrinsicGas) + + worker.removeEmptyAccounts() + + return nil +} + +func (worker *queueWorker) getConfirmedState(ctx context.Context, sender address.Address) (uint64, *big.Int, error) { + // TODO: account Balance(confirmedBalance) will be returned in PR#3377 + confirmedState, err := accountutil.AccountState(ctx, worker.ap.sf, sender) + if err != nil { + return 0, nil, err + } + worker.mu.RLock() + queue := worker.accountActs[sender.String()] + worker.mu.RUnlock() + // account state isn't cached in the actpool + if queue == nil { + return confirmedState.PendingNonce(), confirmedState.Balance, nil + } + return confirmedState.PendingNonce(), queue.PendingBalance(), nil +} + +func (worker *queueWorker) checkSelpWithState(act *action.SealedEnvelope, pendingNonce uint64, balance *big.Int) error { + if act.Nonce() < pendingNonce { + _actpoolMtc.WithLabelValues("nonceTooSmall").Inc() + return action.ErrNonceTooLow + } + + // Nonce exceeds current range + if act.Nonce()-pendingNonce >= worker.ap.cfg.MaxNumActsPerAcct { + hash, _ := act.Hash() + log.L().Debug("Rejecting action because nonce is too large.", + log.Hex("hash", hash[:]), + zap.Uint64("startNonce", pendingNonce), + zap.Uint64("actNonce", act.Nonce())) + _actpoolMtc.WithLabelValues("nonceTooLarge").Inc() + return action.ErrNonceTooHigh + } + + if cost, _ := act.Cost(); balance.Cmp(cost) < 0 { + _actpoolMtc.WithLabelValues("insufficientBalance").Inc() + sender := act.SenderAddress().String() + actHash, _ := act.Hash() + log.L().Info("insufficient balance for action", + zap.String("actionHash", hex.EncodeToString(actHash[:])), + zap.String("cost", cost.String()), + zap.String("balance", balance.String()), + zap.String("sender", sender), + ) + return action.ErrInsufficientFunds + } + return nil +} + +func (worker *queueWorker) putAction(sender string, act action.SealedEnvelope, pendingNonce uint64, confirmedBalance *big.Int) error { + worker.mu.RLock() + queue := worker.accountActs[sender] + worker.mu.RUnlock() + + if queue == nil { + queue = NewActQueue(worker.ap, sender, WithTimeOut(worker.ap.cfg.ActionExpiry)) + queue.SetPendingNonce(pendingNonce) + queue.SetPendingBalance(confirmedBalance) + worker.mu.Lock() + worker.accountActs[sender] = queue + worker.mu.Unlock() + } + + if err := queue.Put(act); err != nil { + actHash, _ := act.Hash() + _actpoolMtc.WithLabelValues("failedPutActQueue").Inc() + log.L().Info("failed put action into ActQueue", + zap.String("actionHash", hex.EncodeToString(actHash[:])), + zap.Error(err)) + return err + } + + queue.UpdateQueue(queue.PendingNonce()) // TODO: to be removed + + return nil +} + +func (worker *queueWorker) addDestinationMap(act action.SealedEnvelope) { + worker.ap.accountDesActs.mu.Lock() + defer worker.ap.accountDesActs.mu.Unlock() + destn, _ := act.Destination() + actHash, _ := act.Hash() + if desQueue := worker.ap.accountDesActs.acts[destn]; desQueue == nil { + worker.ap.accountDesActs.acts[destn] = make(map[hash.Hash256]action.SealedEnvelope) + } + worker.ap.accountDesActs.acts[destn][actHash] = act +} + +func (worker *queueWorker) removeEmptyAccounts() { + if worker.emptyAccounts.Count() == 0 { + return + } + + worker.mu.Lock() + defer worker.mu.Unlock() + + worker.emptyAccounts.Range(func(key, _ interface{}) error { + sender := key.(string) + if worker.accountActs[sender].Empty() { + delete(worker.accountActs, sender) + } + return nil + }) + + worker.emptyAccounts.Reset() +} + +func (worker *queueWorker) Reset(ctx context.Context) { + worker.mu.RLock() + defer worker.mu.RUnlock() + + for from, queue := range worker.accountActs { + addr, _ := address.FromString(from) + confirmedState, err := accountutil.AccountState(ctx, worker.ap.sf, addr) + if err != nil { + log.L().Error("Error when removing confirmed actions", zap.Error(err)) + queue.Reset() + worker.emptyAccounts.Set(from, struct{}{}) + continue + } + queue.SetPendingNonce(confirmedState.PendingNonce()) + queue.SetPendingBalance(confirmedState.Balance) + // Remove all actions that are committed to new block + acts := queue.FilterNonce(queue.PendingNonce()) + acts2 := queue.UpdateQueue(queue.PendingNonce()) + worker.ap.removeInvalidActs(append(acts, acts2...)) + // Delete the queue entry if it becomes empty + if queue.Empty() { + worker.emptyAccounts.Set(from, struct{}{}) + } + } +} + +// PendingActions returns an action interator with all accepted actions +func (worker *queueWorker) PendingActions(ctx context.Context) []*pendingActions { + actionArr := make([]*pendingActions, 0) + + worker.mu.RLock() + defer worker.mu.RUnlock() + for from, queue := range worker.accountActs { + if queue.Empty() { + continue + } + // Remove the actions that are already timeout + acts := queue.UpdateQueue(queue.PendingNonce()) + worker.ap.removeInvalidActs(acts) + actionArr = append(actionArr, &pendingActions{ + sender: from, + acts: queue.PendingActs(ctx), + }) + } + return actionArr +} + +// GetQueue returns the actQueue of sender +func (worker *queueWorker) GetQueue(sender address.Address) ActQueue { + worker.mu.RLock() + defer worker.mu.RUnlock() + return worker.accountActs[sender.String()] +} + +// ResetAccount resets account in the accountActs of worker +func (worker *queueWorker) ResetAccount(sender address.Address) { + senderStr := sender.String() + worker.mu.RLock() + defer worker.mu.RUnlock() + if queue := worker.accountActs[senderStr]; queue != nil { + queue.Reset() + worker.emptyAccounts.Set(senderStr, struct{}{}) + } +} diff --git a/dispatcher/dispatcher.go b/dispatcher/dispatcher.go index 70c910e412..d2a3e455e5 100644 --- a/dispatcher/dispatcher.go +++ b/dispatcher/dispatcher.go @@ -9,11 +9,9 @@ import ( "context" "fmt" "sync" - "sync/atomic" "time" "github.com/libp2p/go-libp2p-core/peer" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "google.golang.org/protobuf/proto" @@ -39,7 +37,7 @@ type ( var ( // DefaultConfig is the default config DefaultConfig = Config{ - ActionChanSize: 1000, + ActionChanSize: 5000, BlockChanSize: 1000, BlockSyncChanSize: 400, ProcessSyncRequestInterval: 0 * time.Second, @@ -118,8 +116,7 @@ func (m actionMsg) ChainID() uint32 { // IotxDispatcher is the request and event dispatcher for iotx node. type IotxDispatcher struct { - started int32 - shutdown int32 + lifecycle.Readiness actionChanLock sync.RWMutex blockChanLock sync.RWMutex syncChanLock sync.RWMutex @@ -163,23 +160,28 @@ func (d *IotxDispatcher) AddSubscriber( // Start starts the dispatcher. func (d *IotxDispatcher) Start(ctx context.Context) error { - if atomic.AddInt32(&d.started, 1) != 1 { - return errors.New("Dispatcher already started") - } log.L().Info("Starting dispatcher.") - d.wg.Add(3) - go d.actionHandler() + + // setup mutiple action consumers to enqueue actions into actpool + for i := 0; i < cap(d.actionChan)/5; i++ { + d.wg.Add(1) + go d.actionHandler() + } + + d.wg.Add(1) go d.blockHandler() + + d.wg.Add(1) go d.syncHandler() - return nil + return d.TurnOn() } // Stop gracefully shuts down the dispatcher by stopping all handlers and waiting for them to finish. func (d *IotxDispatcher) Stop(ctx context.Context) error { - if atomic.AddInt32(&d.shutdown, 1) != 1 { + if err := d.TurnOff(); err != nil { log.L().Warn("Dispatcher already in the process of shutting down.") - return nil + return err } log.L().Info("Dispatcher is shutting down.") close(d.quit) @@ -208,12 +210,12 @@ func (d *IotxDispatcher) EventAudit() map[iotexrpc.MessageType]int { } func (d *IotxDispatcher) actionHandler() { + d.wg.Done() for { select { case a := <-d.actionChan: d.handleActionMsg(a) case <-d.quit: - d.wg.Done() log.L().Info("action handler is terminated.") return } @@ -222,12 +224,12 @@ func (d *IotxDispatcher) actionHandler() { // blockHandler is the main handler for handling all news from peers. func (d *IotxDispatcher) blockHandler() { + defer d.wg.Done() for { select { case b := <-d.blockChan: d.handleBlockMsg(b) case <-d.quit: - d.wg.Done() log.L().Info("block handler is terminated.") return } @@ -236,12 +238,12 @@ func (d *IotxDispatcher) blockHandler() { // syncHandler handles incoming block sync requests func (d *IotxDispatcher) syncHandler() { + defer d.wg.Done() for { select { case m := <-d.syncChan: d.handleBlockSyncMsg(m) case <-d.quit: - d.wg.Done() log.L().Info("block sync handler done.") return } @@ -320,7 +322,7 @@ func (d *IotxDispatcher) handleBlockSyncMsg(m *blockSyncMsg) { // dispatchAction adds the passed action message to the news handling queue. func (d *IotxDispatcher) dispatchAction(ctx context.Context, chainID uint32, msg proto.Message) { - if atomic.LoadInt32(&d.shutdown) != 0 { + if !d.IsReady() { return } subscriber := d.subscriber(chainID) @@ -347,7 +349,7 @@ func (d *IotxDispatcher) dispatchAction(ctx context.Context, chainID uint32, msg // dispatchBlock adds the passed block message to the news handling queue. func (d *IotxDispatcher) dispatchBlock(ctx context.Context, chainID uint32, peer string, msg proto.Message) { - if atomic.LoadInt32(&d.shutdown) != 0 { + if !d.IsReady() { return } subscriber := d.subscriber(chainID) @@ -375,7 +377,7 @@ func (d *IotxDispatcher) dispatchBlock(ctx context.Context, chainID uint32, peer // dispatchBlockSyncReq adds the passed block sync request to the news handling queue. func (d *IotxDispatcher) dispatchBlockSyncReq(ctx context.Context, chainID uint32, peer peer.AddrInfo, msg proto.Message) { - if atomic.LoadInt32(&d.shutdown) != 0 { + if !d.IsReady() { return } subscriber := d.subscriber(chainID)