Skip to content

Commit

Permalink
perf: mempool: lower priority optimizations (#10693)
Browse files Browse the repository at this point in the history
* release the read lock earlier as it is not needed for chaincomputebasefee

* chain/messagepool/selection.go change to read lock in SelectMessages

* tighten up locks in chain/messagepool/repub.go and two questions on whether curTsLks are needed as comments

* include suggestion from @Jorropo to preallocate our msgs array so that we only need to make a single allocation

* mp.pending should not be accessed directly but through the getter

* from @arajasek: just check whether the sender is a robust address (anything except an ID address is robust) here, and return if so. That will:

be faster
reduce the size of this cache by half, because we can drop mp.keyCache.Add(ka, ka) on line 491.

* do not need curTslk and clean up code comments
  • Loading branch information
snissn authored May 3, 2023
1 parent 093d350 commit 742062f
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 16 deletions.
19 changes: 16 additions & 3 deletions chain/messagepool/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,13 @@ func (mp *MessagePool) CheckMessages(ctx context.Context, protos []*api.MessageP
func (mp *MessagePool) CheckPendingMessages(ctx context.Context, from address.Address) ([][]api.MessageCheckStatus, error) {
var msgs []*types.Message
mp.lk.RLock()
mset, ok := mp.pending[from]
mset, ok, err := mp.getPendingMset(ctx, from)
if err != nil {
log.Warnf("errored while getting pending mset: %w", err)
return nil, err
}
if ok {
msgs = make([]*types.Message, 0, len(mset.msgs))
for _, sm := range mset.msgs {
msgs = append(msgs, &sm.Message)
}
Expand Down Expand Up @@ -64,7 +69,11 @@ func (mp *MessagePool) CheckReplaceMessages(ctx context.Context, replace []*type
if !ok {
mmap = make(map[uint64]*types.Message)
msgMap[m.From] = mmap
mset, ok := mp.pending[m.From]
mset, ok, err := mp.getPendingMset(ctx, m.From)
if err != nil {
log.Warnf("errored while getting pending mset: %w", err)
return nil, err
}
if ok {
count += len(mset.msgs)
for _, sm := range mset.msgs {
Expand Down Expand Up @@ -144,7 +153,11 @@ func (mp *MessagePool) checkMessages(ctx context.Context, msgs []*types.Message,
st, ok := state[m.From]
if !ok {
mp.lk.RLock()
mset, ok := mp.pending[m.From]
mset, ok, err := mp.getPendingMset(ctx, m.From)
if err != nil {
log.Warnf("errored while getting pending mset: %w", err)
return nil, err
}
if ok && !interned {
st = &actorState{nextNonce: mset.nextNonce, requiredFunds: mset.requiredFunds}
for _, m := range mset.msgs {
Expand Down
11 changes: 9 additions & 2 deletions chain/messagepool/messagepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,15 @@ func (mp *MessagePool) TryForEachPendingMessage(f func(cid.Cid) error) error {
}

func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) (address.Address, error) {
//if addr is not an ID addr, then it is already resolved to a key
if addr.Protocol() != address.ID {
return addr, nil
}
return mp.resolveToKeyFromID(ctx, addr)
}

func (mp *MessagePool) resolveToKeyFromID(ctx context.Context, addr address.Address) (address.Address, error) {

// check the cache
a, ok := mp.keyCache.Get(addr)
if ok {
Expand All @@ -488,8 +497,6 @@ func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) (

// place both entries in the cache (may both be key addresses, which is fine)
mp.keyCache.Add(addr, ka)
mp.keyCache.Add(ka, ka)

return ka, nil
}

Expand Down
13 changes: 7 additions & 6 deletions chain/messagepool/repub.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,21 @@ var RepublishBatchDelay = 100 * time.Millisecond
func (mp *MessagePool) republishPendingMessages(ctx context.Context) error {
mp.curTsLk.RLock()
ts := mp.curTs
mp.curTsLk.RUnlock()

baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
mp.curTsLk.RUnlock()
if err != nil {
return xerrors.Errorf("computing basefee: %w", err)
}
baseFeeLowerBound := getBaseFeeLowerBound(baseFee, baseFeeLowerBoundFactor)

pending := make(map[address.Address]map[uint64]*types.SignedMessage)
mp.curTsLk.Lock()

mp.lk.Lock()
mp.republished = nil // clear this to avoid races triggering an early republish
mp.lk.Unlock()

mp.lk.RLock()
mp.forEachLocal(ctx, func(ctx context.Context, actor address.Address) {
mset, ok, err := mp.getPendingMset(ctx, actor)
if err != nil {
Expand All @@ -54,9 +57,7 @@ func (mp *MessagePool) republishPendingMessages(ctx context.Context) error {
}
pending[actor] = pend
})

mp.lk.Unlock()
mp.curTsLk.Unlock()
mp.lk.RUnlock()

if len(pending) == 0 {
return nil
Expand Down Expand Up @@ -177,8 +178,8 @@ loop:
republished[m.Cid()] = struct{}{}
}

mp.lk.Lock()
// update the republished set so that we can trigger early republish from head changes
mp.lk.Lock()
mp.republished = republished
mp.lk.Unlock()

Expand Down
9 changes: 4 additions & 5 deletions chain/messagepool/selection.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,11 @@ type msgChain struct {
}

func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq float64) ([]*types.SignedMessage, error) {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
mp.curTsLk.RLock()
defer mp.curTsLk.RUnlock()

//TODO confirm if we can switch to RLock here for performance
mp.lk.Lock()
defer mp.lk.Unlock()
mp.lk.RLock()
defer mp.lk.RUnlock()

// See if we need to prune before selection; excessive buildup can lead to slow selection,
// so prune if we have too many messages (ignoring the cooldown).
Expand Down

0 comments on commit 742062f

Please sign in to comment.