From 840ebe8fba1f4a6787312c15a06b0be323ecec8a Mon Sep 17 00:00:00 2001 From: Liuhaai Date: Wed, 21 Sep 2022 17:29:42 -0700 Subject: [PATCH] fix comments --- actpool/actpool.go | 29 +++++++++++++++++++---------- actpool/actqueue.go | 39 ++++++++++++--------------------------- actpool/actqueue_test.go | 25 +++++++++---------------- actpool/queueworker.go | 31 +++++++++++-------------------- 4 files changed, 51 insertions(+), 73 deletions(-) diff --git a/actpool/actpool.go b/actpool/actpool.go index 3cfddf7ca5..c86417afbd 100644 --- a/actpool/actpool.go +++ b/actpool/actpool.go @@ -100,7 +100,7 @@ type actPool struct { cfg Config g genesis.Genesis sf protocol.StateReader - accountDesActs *desActs + accountDesActs *destinationMap allActions *ttl.Cache gasInPool uint64 actionEnvelopeValidators []action.SealedEnvelopeValidator @@ -111,11 +111,6 @@ type actPool struct { worker []*queueWorker } -type desActs struct { - mu sync.Mutex - acts map[string]map[hash.Hash256]action.SealedEnvelope -} - // NewActPool constructs a new actpool func NewActPool(g genesis.Genesis, sf protocol.StateReader, cfg Config, opts ...Option) (ActPool, error) { if sf == nil { @@ -133,7 +128,7 @@ func NewActPool(g genesis.Genesis, sf protocol.StateReader, cfg Config, opts ... g: g, sf: sf, senderBlackList: senderBlackList, - accountDesActs: &desActs{acts: make(map[string]map[hash.Hash256]action.SealedEnvelope)}, + accountDesActs: &destinationMap{acts: make(map[string]map[hash.Hash256]action.SealedEnvelope)}, allActions: actsMap, jobQueue: make([]chan workerJob, _numWorker), worker: make([]*queueWorker, _numWorker), @@ -395,10 +390,8 @@ func (ap *actPool) Validate(ctx context.Context, selp action.SealedEnvelope) err func (ap *actPool) DeleteAction(caller address.Address) { worker := ap.worker[ap.allocatedWorker(caller)] - if queue := worker.GetQueue(caller); queue != nil { - pendingActs := queue.AllActs() + if pendingActs := worker.ResetAccount(caller); len(pendingActs) != 0 { ap.removeInvalidActs(pendingActs) - worker.ResetAccount(caller) } } @@ -495,3 +488,19 @@ func (ap *actPool) allocatedWorker(senderAddr address.Address) int { var lastByte uint8 = senderBytes[len(senderBytes)-1] return int(lastByte) % _numWorker } + +type destinationMap struct { + mu sync.Mutex + acts map[string]map[hash.Hash256]action.SealedEnvelope +} + +func (des *destinationMap) addAction(act action.SealedEnvelope) { + des.mu.Lock() + defer des.mu.Unlock() + destn, _ := act.Destination() + actHash, _ := act.Hash() + if desMap := des.acts[destn]; desMap == nil { + des.acts[destn] = make(map[hash.Hash256]action.SealedEnvelope) + } + des.acts[destn][actHash] = act +} diff --git a/actpool/actqueue.go b/actpool/actqueue.go index c97aac6e11..318b0a64ac 100644 --- a/actpool/actqueue.go +++ b/actpool/actqueue.go @@ -63,12 +63,10 @@ type ActQueue interface { Put(action.SealedEnvelope) error CleanConfirmedAct() []action.SealedEnvelope UpdateQueue() []action.SealedEnvelope - SetPendingNonce(uint64) + SetAccountState(uint64, *big.Int) + AccountState() (uint64, *big.Int) PendingNonce() uint64 - AccountNonce() uint64 - SetAccountBalance(*big.Int) PendingBalance() *big.Int - AccountBalance() *big.Int Len() int Empty() bool PendingActs(context.Context) []action.SealedEnvelope @@ -240,34 +238,28 @@ func (q *actQueue) UpdateQueue() []action.SealedEnvelope { return removedFromQueue } -// SetPendingNonce sets pending nonce for the queue -func (q *actQueue) SetPendingNonce(nonce uint64) { +// SetAccountState sets the account's nonce and balance for the queue +func (q *actQueue) SetAccountState(nonce uint64, balance *big.Int) { q.mu.Lock() defer q.mu.Unlock() q.pendingNonce = nonce q.accountNonce = nonce + q.accountBalance.Set(balance) + q.pendingBalance = make(map[uint64]*big.Int) } -// PendingNonce returns the current pending nonce of the queue -func (q *actQueue) PendingNonce() uint64 { +// AccountState returns the current account's nonce and balance +func (q *actQueue) AccountState() (uint64, *big.Int) { q.mu.RLock() defer q.mu.RUnlock() - return q.pendingNonce + return q.accountNonce, new(big.Int).Set(q.accountBalance) } -// AccountNonce returns the current account nonce -func (q *actQueue) AccountNonce() uint64 { +// PendingNonce returns the current pending nonce of the queue +func (q *actQueue) PendingNonce() uint64 { q.mu.RLock() defer q.mu.RUnlock() - return q.accountNonce -} - -// SetAccountBalance sets account balance for the queue -func (q *actQueue) SetAccountBalance(balance *big.Int) { - q.mu.Lock() - defer q.mu.Unlock() - q.accountBalance.Set(balance) - q.pendingBalance = make(map[uint64]*big.Int) + return q.pendingNonce } // PendingBalance returns the current pending balance of the queue @@ -277,13 +269,6 @@ func (q *actQueue) PendingBalance() *big.Int { return q.getPendingBalanceAtNonce(q.pendingNonce) } -// AccountBalance returns the current account balance -func (q *actQueue) AccountBalance() *big.Int { - q.mu.RLock() - defer q.mu.RUnlock() - return new(big.Int).Set(q.accountBalance) -} - // Len returns the length of the action map func (q *actQueue) Len() int { q.mu.RLock() diff --git a/actpool/actqueue_test.go b/actpool/actqueue_test.go index 5934743f05..9db36e6f55 100644 --- a/actpool/actqueue_test.go +++ b/actpool/actqueue_test.go @@ -15,7 +15,6 @@ import ( "github.com/facebookgo/clock" "github.com/golang/mock/gomock" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/iotexproject/iotex-core/action" @@ -62,8 +61,7 @@ func TestNoncePriorityQueue(t *testing.T) { func TestActQueuePut(t *testing.T) { require := require.New(t) - q := NewActQueue(nil, "", 1, big.NewInt(0)).(*actQueue) - q.SetAccountBalance(big.NewInt(maxBalance)) + q := NewActQueue(nil, "", 1, big.NewInt(maxBalance)).(*actQueue) tsf1, err := action.SignedTransfer(_addr2, _priKey1, 2, big.NewInt(100), nil, uint64(0), big.NewInt(1)) require.NoError(err) require.NoError(q.Put(tsf1)) @@ -88,8 +86,7 @@ func TestActQueuePut(t *testing.T) { func TestActQueueFilterNonce(t *testing.T) { require := require.New(t) - q := NewActQueue(nil, "", 1, big.NewInt(0)).(*actQueue) - q.SetAccountBalance(big.NewInt(maxBalance)) + q := NewActQueue(nil, "", 1, big.NewInt(maxBalance)).(*actQueue) tsf1, err := action.SignedTransfer(_addr2, _priKey1, 1, big.NewInt(1), nil, uint64(0), big.NewInt(0)) require.NoError(err) tsf2, err := action.SignedTransfer(_addr2, _priKey1, 2, big.NewInt(1), nil, uint64(0), big.NewInt(0)) @@ -99,7 +96,7 @@ func TestActQueueFilterNonce(t *testing.T) { require.NoError(q.Put(tsf1)) require.NoError(q.Put(tsf2)) require.NoError(q.Put(tsf3)) - q.SetPendingNonce(3) + q.SetAccountState(3, big.NewInt(maxBalance)) q.CleanConfirmedAct() require.Equal(1, len(q.items)) require.Equal(uint64(3), q.index[0].nonce) @@ -108,8 +105,7 @@ func TestActQueueFilterNonce(t *testing.T) { func TestActQueueUpdateNonce(t *testing.T) { require := require.New(t) - q := NewActQueue(nil, "", 1, big.NewInt(0)).(*actQueue) - q.SetAccountBalance(big.NewInt(1010)) + q := NewActQueue(nil, "", 1, big.NewInt(1010)).(*actQueue) tsf1, err := action.SignedTransfer(_addr2, _priKey1, 1, big.NewInt(1), nil, uint64(0), big.NewInt(0)) require.NoError(err) tsf2, err := action.SignedTransfer(_addr2, _priKey1, 3, big.NewInt(1000), nil, uint64(0), big.NewInt(0)) @@ -143,8 +139,7 @@ func TestActQueuePendingActs(t *testing.T) { ctx := genesis.WithGenesisContext(context.Background(), genesis.Default) ap, err := NewActPool(genesis.Default, sf, DefaultConfig, EnableExperimentalActions()) require.NoError(err) - q := NewActQueue(ap.(*actPool), identityset.Address(0).String(), 1, big.NewInt(0)).(*actQueue) - q.SetAccountBalance(big.NewInt(maxBalance)) + q := NewActQueue(ap.(*actPool), identityset.Address(0).String(), 1, big.NewInt(maxBalance)).(*actQueue) tsf1, err := action.SignedTransfer(_addr2, _priKey1, 2, big.NewInt(100), nil, uint64(0), big.NewInt(0)) require.NoError(err) tsf2, err := action.SignedTransfer(_addr2, _priKey1, 3, big.NewInt(100), nil, uint64(0), big.NewInt(0)) @@ -167,8 +162,7 @@ func TestActQueuePendingActs(t *testing.T) { func TestActQueueAllActs(t *testing.T) { require := require.New(t) - q := NewActQueue(nil, "", 1, big.NewInt(0)).(*actQueue) - q.SetAccountBalance(big.NewInt(maxBalance)) + q := NewActQueue(nil, "", 1, big.NewInt(maxBalance)).(*actQueue) tsf1, err := action.SignedTransfer(_addr2, _priKey1, 1, big.NewInt(1000), nil, uint64(0), big.NewInt(0)) require.NoError(err) tsf3, err := action.SignedTransfer(_addr2, _priKey1, 3, big.NewInt(1000), nil, uint64(0), big.NewInt(0)) @@ -181,8 +175,7 @@ func TestActQueueAllActs(t *testing.T) { func TestActQueueTimeOutAction(t *testing.T) { c := clock.NewMock() - q := NewActQueue(nil, "", 1, big.NewInt(0), WithClock(c), WithTimeOut(3*time.Minute)) - q.SetAccountBalance(big.NewInt(maxBalance)) + q := NewActQueue(nil, "", 1, big.NewInt(maxBalance), WithClock(c), WithTimeOut(3*time.Minute)) tsf1, err := action.SignedTransfer(_addr2, _priKey1, 1, big.NewInt(100), nil, uint64(0), big.NewInt(0)) require.NoError(t, err) tsf2, err := action.SignedTransfer(_addr2, _priKey1, 3, big.NewInt(100), nil, uint64(0), big.NewInt(0)) @@ -193,10 +186,10 @@ func TestActQueueTimeOutAction(t *testing.T) { require.NoError(t, q.Put(tsf2)) q.(*actQueue).cleanTimeout() - assert.Equal(t, 2, q.Len()) + require.Equal(t, 2, q.Len()) c.Add(2 * time.Minute) q.(*actQueue).cleanTimeout() - assert.Equal(t, 1, q.Len()) + require.Equal(t, 1, q.Len()) } func TestActQueueCleanTimeout(t *testing.T) { diff --git a/actpool/queueworker.go b/actpool/queueworker.go index 9b872fe37f..32008b0fab 100644 --- a/actpool/queueworker.go +++ b/actpool/queueworker.go @@ -10,7 +10,6 @@ import ( "sync/atomic" "github.com/iotexproject/go-pkgs/cache/ttl" - "github.com/iotexproject/go-pkgs/hash" "github.com/iotexproject/iotex-address/address" "go.uber.org/zap" @@ -72,6 +71,7 @@ func (worker *queueWorker) Stop() error { return nil } +// Hanlde is called sequentially by worker func (worker *queueWorker) Handle(job workerJob) error { ctx := job.ctx // ctx is canceled or timeout @@ -104,11 +104,13 @@ func (worker *queueWorker) Handle(job workerJob) error { worker.ap.allActions.Set(actHash, act) if desAddress, ok := act.Destination(); ok && !strings.EqualFold(sender, desAddress) { - worker.addDestinationMap(act) + worker.ap.accountDesActs.addAction(act) } atomic.AddUint64(&worker.ap.gasInPool, intrinsicGas) + worker.mu.Lock() + defer worker.mu.Unlock() worker.removeEmptyAccounts() return nil @@ -126,7 +128,8 @@ func (worker *queueWorker) getConfirmedState(ctx context.Context, sender address } return confirmedState.PendingNonce(), confirmedState.Balance, nil } - return queue.AccountNonce(), queue.AccountBalance(), nil + nonce, balance := queue.AccountState() + return nonce, balance, nil } func (worker *queueWorker) checkSelpWithState(act *action.SealedEnvelope, pendingNonce uint64, balance *big.Int) error { @@ -190,25 +193,11 @@ func (worker *queueWorker) putAction(sender string, act action.SealedEnvelope, p return nil } -func (worker *queueWorker) addDestinationMap(act action.SealedEnvelope) { - worker.ap.accountDesActs.mu.Lock() - defer worker.ap.accountDesActs.mu.Unlock() - destn, _ := act.Destination() - actHash, _ := act.Hash() - if desQueue := worker.ap.accountDesActs.acts[destn]; desQueue == nil { - worker.ap.accountDesActs.acts[destn] = make(map[hash.Hash256]action.SealedEnvelope) - } - worker.ap.accountDesActs.acts[destn][actHash] = act -} - func (worker *queueWorker) removeEmptyAccounts() { if worker.emptyAccounts.Count() == 0 { return } - worker.mu.Lock() - defer worker.mu.Unlock() - worker.emptyAccounts.Range(func(key, _ interface{}) error { sender := key.(string) if worker.accountActs[sender].Empty() { @@ -233,8 +222,7 @@ func (worker *queueWorker) Reset(ctx context.Context) { worker.emptyAccounts.Set(from, struct{}{}) continue } - queue.SetPendingNonce(confirmedState.PendingNonce()) - queue.SetAccountBalance(confirmedState.Balance) + queue.SetAccountState(confirmedState.PendingNonce(), confirmedState.Balance) // Remove all actions that are committed to new block acts := queue.CleanConfirmedAct() acts2 := queue.UpdateQueue() @@ -275,12 +263,15 @@ func (worker *queueWorker) GetQueue(sender address.Address) ActQueue { } // ResetAccount resets account in the accountActs of worker -func (worker *queueWorker) ResetAccount(sender address.Address) { +func (worker *queueWorker) ResetAccount(sender address.Address) []action.SealedEnvelope { senderStr := sender.String() worker.mu.RLock() defer worker.mu.RUnlock() if queue := worker.accountActs[senderStr]; queue != nil { + pendingActs := queue.AllActs() queue.Reset() worker.emptyAccounts.Set(senderStr, struct{}{}) + return pendingActs } + return nil }