Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Liuhaai committed Dec 31, 2022
1 parent 7cbaa90 commit 840ebe8
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 73 deletions.
29 changes: 19 additions & 10 deletions actpool/actpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ type actPool struct {
cfg Config
g genesis.Genesis
sf protocol.StateReader
accountDesActs *desActs
accountDesActs *destinationMap
allActions *ttl.Cache
gasInPool uint64
actionEnvelopeValidators []action.SealedEnvelopeValidator
Expand All @@ -111,11 +111,6 @@ type actPool struct {
worker []*queueWorker
}

type desActs struct {
mu sync.Mutex
acts map[string]map[hash.Hash256]action.SealedEnvelope
}

// NewActPool constructs a new actpool
func NewActPool(g genesis.Genesis, sf protocol.StateReader, cfg Config, opts ...Option) (ActPool, error) {
if sf == nil {
Expand All @@ -133,7 +128,7 @@ func NewActPool(g genesis.Genesis, sf protocol.StateReader, cfg Config, opts ...
g: g,
sf: sf,
senderBlackList: senderBlackList,
accountDesActs: &desActs{acts: make(map[string]map[hash.Hash256]action.SealedEnvelope)},
accountDesActs: &destinationMap{acts: make(map[string]map[hash.Hash256]action.SealedEnvelope)},
allActions: actsMap,
jobQueue: make([]chan workerJob, _numWorker),
worker: make([]*queueWorker, _numWorker),
Expand Down Expand Up @@ -395,10 +390,8 @@ func (ap *actPool) Validate(ctx context.Context, selp action.SealedEnvelope) err

func (ap *actPool) DeleteAction(caller address.Address) {
worker := ap.worker[ap.allocatedWorker(caller)]
if queue := worker.GetQueue(caller); queue != nil {
pendingActs := queue.AllActs()
if pendingActs := worker.ResetAccount(caller); len(pendingActs) != 0 {
ap.removeInvalidActs(pendingActs)
worker.ResetAccount(caller)
}
}

Expand Down Expand Up @@ -495,3 +488,19 @@ func (ap *actPool) allocatedWorker(senderAddr address.Address) int {
var lastByte uint8 = senderBytes[len(senderBytes)-1]
return int(lastByte) % _numWorker
}

type destinationMap struct {
mu sync.Mutex
acts map[string]map[hash.Hash256]action.SealedEnvelope
}

func (des *destinationMap) addAction(act action.SealedEnvelope) {
des.mu.Lock()
defer des.mu.Unlock()
destn, _ := act.Destination()
actHash, _ := act.Hash()
if desMap := des.acts[destn]; desMap == nil {
des.acts[destn] = make(map[hash.Hash256]action.SealedEnvelope)
}
des.acts[destn][actHash] = act
}
39 changes: 12 additions & 27 deletions actpool/actqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,10 @@ type ActQueue interface {
Put(action.SealedEnvelope) error
CleanConfirmedAct() []action.SealedEnvelope
UpdateQueue() []action.SealedEnvelope
SetPendingNonce(uint64)
SetAccountState(uint64, *big.Int)
AccountState() (uint64, *big.Int)
PendingNonce() uint64
AccountNonce() uint64
SetAccountBalance(*big.Int)
PendingBalance() *big.Int
AccountBalance() *big.Int
Len() int
Empty() bool
PendingActs(context.Context) []action.SealedEnvelope
Expand Down Expand Up @@ -240,34 +238,28 @@ func (q *actQueue) UpdateQueue() []action.SealedEnvelope {
return removedFromQueue
}

// SetPendingNonce sets pending nonce for the queue
func (q *actQueue) SetPendingNonce(nonce uint64) {
// SetAccountState sets the account's nonce and balance for the queue
func (q *actQueue) SetAccountState(nonce uint64, balance *big.Int) {
q.mu.Lock()
defer q.mu.Unlock()
q.pendingNonce = nonce
q.accountNonce = nonce
q.accountBalance.Set(balance)
q.pendingBalance = make(map[uint64]*big.Int)
}

// PendingNonce returns the current pending nonce of the queue
func (q *actQueue) PendingNonce() uint64 {
// AccountState returns the current account's nonce and balance
func (q *actQueue) AccountState() (uint64, *big.Int) {
q.mu.RLock()
defer q.mu.RUnlock()
return q.pendingNonce
return q.accountNonce, new(big.Int).Set(q.accountBalance)
}

// AccountNonce returns the current account nonce
func (q *actQueue) AccountNonce() uint64 {
// PendingNonce returns the current pending nonce of the queue
func (q *actQueue) PendingNonce() uint64 {
q.mu.RLock()
defer q.mu.RUnlock()
return q.accountNonce
}

// SetAccountBalance sets account balance for the queue
func (q *actQueue) SetAccountBalance(balance *big.Int) {
q.mu.Lock()
defer q.mu.Unlock()
q.accountBalance.Set(balance)
q.pendingBalance = make(map[uint64]*big.Int)
return q.pendingNonce
}

// PendingBalance returns the current pending balance of the queue
Expand All @@ -277,13 +269,6 @@ func (q *actQueue) PendingBalance() *big.Int {
return q.getPendingBalanceAtNonce(q.pendingNonce)
}

// AccountBalance returns the current account balance
func (q *actQueue) AccountBalance() *big.Int {
q.mu.RLock()
defer q.mu.RUnlock()
return new(big.Int).Set(q.accountBalance)
}

// Len returns the length of the action map
func (q *actQueue) Len() int {
q.mu.RLock()
Expand Down
25 changes: 9 additions & 16 deletions actpool/actqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/facebookgo/clock"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/iotexproject/iotex-core/action"
Expand Down Expand Up @@ -62,8 +61,7 @@ func TestNoncePriorityQueue(t *testing.T) {

func TestActQueuePut(t *testing.T) {
require := require.New(t)
q := NewActQueue(nil, "", 1, big.NewInt(0)).(*actQueue)
q.SetAccountBalance(big.NewInt(maxBalance))
q := NewActQueue(nil, "", 1, big.NewInt(maxBalance)).(*actQueue)
tsf1, err := action.SignedTransfer(_addr2, _priKey1, 2, big.NewInt(100), nil, uint64(0), big.NewInt(1))
require.NoError(err)
require.NoError(q.Put(tsf1))
Expand All @@ -88,8 +86,7 @@ func TestActQueuePut(t *testing.T) {

func TestActQueueFilterNonce(t *testing.T) {
require := require.New(t)
q := NewActQueue(nil, "", 1, big.NewInt(0)).(*actQueue)
q.SetAccountBalance(big.NewInt(maxBalance))
q := NewActQueue(nil, "", 1, big.NewInt(maxBalance)).(*actQueue)
tsf1, err := action.SignedTransfer(_addr2, _priKey1, 1, big.NewInt(1), nil, uint64(0), big.NewInt(0))
require.NoError(err)
tsf2, err := action.SignedTransfer(_addr2, _priKey1, 2, big.NewInt(1), nil, uint64(0), big.NewInt(0))
Expand All @@ -99,7 +96,7 @@ func TestActQueueFilterNonce(t *testing.T) {
require.NoError(q.Put(tsf1))
require.NoError(q.Put(tsf2))
require.NoError(q.Put(tsf3))
q.SetPendingNonce(3)
q.SetAccountState(3, big.NewInt(maxBalance))
q.CleanConfirmedAct()
require.Equal(1, len(q.items))
require.Equal(uint64(3), q.index[0].nonce)
Expand All @@ -108,8 +105,7 @@ func TestActQueueFilterNonce(t *testing.T) {

func TestActQueueUpdateNonce(t *testing.T) {
require := require.New(t)
q := NewActQueue(nil, "", 1, big.NewInt(0)).(*actQueue)
q.SetAccountBalance(big.NewInt(1010))
q := NewActQueue(nil, "", 1, big.NewInt(1010)).(*actQueue)
tsf1, err := action.SignedTransfer(_addr2, _priKey1, 1, big.NewInt(1), nil, uint64(0), big.NewInt(0))
require.NoError(err)
tsf2, err := action.SignedTransfer(_addr2, _priKey1, 3, big.NewInt(1000), nil, uint64(0), big.NewInt(0))
Expand Down Expand Up @@ -143,8 +139,7 @@ func TestActQueuePendingActs(t *testing.T) {
ctx := genesis.WithGenesisContext(context.Background(), genesis.Default)
ap, err := NewActPool(genesis.Default, sf, DefaultConfig, EnableExperimentalActions())
require.NoError(err)
q := NewActQueue(ap.(*actPool), identityset.Address(0).String(), 1, big.NewInt(0)).(*actQueue)
q.SetAccountBalance(big.NewInt(maxBalance))
q := NewActQueue(ap.(*actPool), identityset.Address(0).String(), 1, big.NewInt(maxBalance)).(*actQueue)
tsf1, err := action.SignedTransfer(_addr2, _priKey1, 2, big.NewInt(100), nil, uint64(0), big.NewInt(0))
require.NoError(err)
tsf2, err := action.SignedTransfer(_addr2, _priKey1, 3, big.NewInt(100), nil, uint64(0), big.NewInt(0))
Expand All @@ -167,8 +162,7 @@ func TestActQueuePendingActs(t *testing.T) {

func TestActQueueAllActs(t *testing.T) {
require := require.New(t)
q := NewActQueue(nil, "", 1, big.NewInt(0)).(*actQueue)
q.SetAccountBalance(big.NewInt(maxBalance))
q := NewActQueue(nil, "", 1, big.NewInt(maxBalance)).(*actQueue)
tsf1, err := action.SignedTransfer(_addr2, _priKey1, 1, big.NewInt(1000), nil, uint64(0), big.NewInt(0))
require.NoError(err)
tsf3, err := action.SignedTransfer(_addr2, _priKey1, 3, big.NewInt(1000), nil, uint64(0), big.NewInt(0))
Expand All @@ -181,8 +175,7 @@ func TestActQueueAllActs(t *testing.T) {

func TestActQueueTimeOutAction(t *testing.T) {
c := clock.NewMock()
q := NewActQueue(nil, "", 1, big.NewInt(0), WithClock(c), WithTimeOut(3*time.Minute))
q.SetAccountBalance(big.NewInt(maxBalance))
q := NewActQueue(nil, "", 1, big.NewInt(maxBalance), WithClock(c), WithTimeOut(3*time.Minute))
tsf1, err := action.SignedTransfer(_addr2, _priKey1, 1, big.NewInt(100), nil, uint64(0), big.NewInt(0))
require.NoError(t, err)
tsf2, err := action.SignedTransfer(_addr2, _priKey1, 3, big.NewInt(100), nil, uint64(0), big.NewInt(0))
Expand All @@ -193,10 +186,10 @@ func TestActQueueTimeOutAction(t *testing.T) {

require.NoError(t, q.Put(tsf2))
q.(*actQueue).cleanTimeout()
assert.Equal(t, 2, q.Len())
require.Equal(t, 2, q.Len())
c.Add(2 * time.Minute)
q.(*actQueue).cleanTimeout()
assert.Equal(t, 1, q.Len())
require.Equal(t, 1, q.Len())
}

func TestActQueueCleanTimeout(t *testing.T) {
Expand Down
31 changes: 11 additions & 20 deletions actpool/queueworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"sync/atomic"

"github.com/iotexproject/go-pkgs/cache/ttl"
"github.com/iotexproject/go-pkgs/hash"
"github.com/iotexproject/iotex-address/address"
"go.uber.org/zap"

Expand Down Expand Up @@ -72,6 +71,7 @@ func (worker *queueWorker) Stop() error {
return nil
}

// Hanlde is called sequentially by worker
func (worker *queueWorker) Handle(job workerJob) error {
ctx := job.ctx
// ctx is canceled or timeout
Expand Down Expand Up @@ -104,11 +104,13 @@ func (worker *queueWorker) Handle(job workerJob) error {
worker.ap.allActions.Set(actHash, act)

if desAddress, ok := act.Destination(); ok && !strings.EqualFold(sender, desAddress) {
worker.addDestinationMap(act)
worker.ap.accountDesActs.addAction(act)
}

atomic.AddUint64(&worker.ap.gasInPool, intrinsicGas)

worker.mu.Lock()
defer worker.mu.Unlock()
worker.removeEmptyAccounts()

return nil
Expand All @@ -126,7 +128,8 @@ func (worker *queueWorker) getConfirmedState(ctx context.Context, sender address
}
return confirmedState.PendingNonce(), confirmedState.Balance, nil
}
return queue.AccountNonce(), queue.AccountBalance(), nil
nonce, balance := queue.AccountState()
return nonce, balance, nil
}

func (worker *queueWorker) checkSelpWithState(act *action.SealedEnvelope, pendingNonce uint64, balance *big.Int) error {
Expand Down Expand Up @@ -190,25 +193,11 @@ func (worker *queueWorker) putAction(sender string, act action.SealedEnvelope, p
return nil
}

func (worker *queueWorker) addDestinationMap(act action.SealedEnvelope) {
worker.ap.accountDesActs.mu.Lock()
defer worker.ap.accountDesActs.mu.Unlock()
destn, _ := act.Destination()
actHash, _ := act.Hash()
if desQueue := worker.ap.accountDesActs.acts[destn]; desQueue == nil {
worker.ap.accountDesActs.acts[destn] = make(map[hash.Hash256]action.SealedEnvelope)
}
worker.ap.accountDesActs.acts[destn][actHash] = act
}

func (worker *queueWorker) removeEmptyAccounts() {
if worker.emptyAccounts.Count() == 0 {
return
}

worker.mu.Lock()
defer worker.mu.Unlock()

worker.emptyAccounts.Range(func(key, _ interface{}) error {
sender := key.(string)
if worker.accountActs[sender].Empty() {
Expand All @@ -233,8 +222,7 @@ func (worker *queueWorker) Reset(ctx context.Context) {
worker.emptyAccounts.Set(from, struct{}{})
continue
}
queue.SetPendingNonce(confirmedState.PendingNonce())
queue.SetAccountBalance(confirmedState.Balance)
queue.SetAccountState(confirmedState.PendingNonce(), confirmedState.Balance)
// Remove all actions that are committed to new block
acts := queue.CleanConfirmedAct()
acts2 := queue.UpdateQueue()
Expand Down Expand Up @@ -275,12 +263,15 @@ func (worker *queueWorker) GetQueue(sender address.Address) ActQueue {
}

// ResetAccount resets account in the accountActs of worker
func (worker *queueWorker) ResetAccount(sender address.Address) {
func (worker *queueWorker) ResetAccount(sender address.Address) []action.SealedEnvelope {
senderStr := sender.String()
worker.mu.RLock()
defer worker.mu.RUnlock()
if queue := worker.accountActs[senderStr]; queue != nil {
pendingActs := queue.AllActs()
queue.Reset()
worker.emptyAccounts.Set(senderStr, struct{}{})
return pendingActs
}
return nil
}

0 comments on commit 840ebe8

Please sign in to comment.