diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index b0e7b7e2b73..6c3e776c09f 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -169,13 +169,13 @@ type MessagePool struct { sigValCache *lru.TwoQueueCache[string, struct{}] - nonceCache *lru.Cache[nonceCacheKey, uint64] + stateNonceCache *lru.Cache[stateNonceCacheKey, uint64] evtTypes [3]journal.EventType journal journal.Journal } -type nonceCacheKey struct { +type stateNonceCacheKey struct { tsk types.TipSetKey addr address.Address } @@ -371,7 +371,7 @@ func (ms *msgSet) toSlice() []*types.SignedMessage { func New(ctx context.Context, api Provider, ds dtypes.MetadataDS, us stmgr.UpgradeSchedule, netName dtypes.NetworkName, j journal.Journal) (*MessagePool, error) { cache, _ := lru.New2Q[cid.Cid, crypto.Signature](build.BlsSignatureCacheSize) verifcache, _ := lru.New2Q[string, struct{}](build.VerifSigCacheSize) - noncecache, _ := lru.New[nonceCacheKey, uint64](256) + stateNonceCache, _ := lru.New[stateNonceCacheKey, uint64](32768) // 32k * ~200 bytes = 6MB keycache, _ := lru.New[address.Address, address.Address](1_000_000) cfg, err := loadConfig(ctx, ds) @@ -384,26 +384,26 @@ func New(ctx context.Context, api Provider, ds dtypes.MetadataDS, us stmgr.Upgra } mp := &MessagePool{ - ds: ds, - addSema: make(chan struct{}, 1), - closer: make(chan struct{}), - repubTk: build.Clock.Ticker(RepublishInterval), - repubTrigger: make(chan struct{}, 1), - localAddrs: make(map[address.Address]struct{}), - pending: make(map[address.Address]*msgSet), - keyCache: keycache, - minGasPrice: types.NewInt(0), - getNtwkVersion: us.GetNtwkVersion, - pruneTrigger: make(chan struct{}, 1), - pruneCooldown: make(chan struct{}, 1), - blsSigCache: cache, - sigValCache: verifcache, - nonceCache: noncecache, - changes: lps.New(50), - localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)), - api: api, - netName: netName, - cfg: cfg, + ds: ds, + addSema: make(chan struct{}, 1), + closer: make(chan struct{}), + repubTk: build.Clock.Ticker(RepublishInterval), + repubTrigger: make(chan struct{}, 1), + localAddrs: make(map[address.Address]struct{}), + pending: make(map[address.Address]*msgSet), + keyCache: keycache, + minGasPrice: types.NewInt(0), + getNtwkVersion: us.GetNtwkVersion, + pruneTrigger: make(chan struct{}, 1), + pruneCooldown: make(chan struct{}, 1), + blsSigCache: cache, + sigValCache: verifcache, + stateNonceCache: stateNonceCache, + changes: lps.New(50), + localMsgs: namespace.Wrap(ds, datastore.NewKey(localMsgsDs)), + api: api, + netName: netName, + cfg: cfg, evtTypes: [...]journal.EventType{ evtTypeMpoolAdd: j.RegisterEventType("mpool", "add"), evtTypeMpoolRemove: j.RegisterEventType("mpool", "remove"), @@ -1068,24 +1068,52 @@ func (mp *MessagePool) getStateNonce(ctx context.Context, addr address.Address, done := metrics.Timer(ctx, metrics.MpoolGetNonceDuration) defer done() - nk := nonceCacheKey{ + nk := stateNonceCacheKey{ tsk: ts.Key(), addr: addr, } - n, ok := mp.nonceCache.Get(nk) + n, ok := mp.stateNonceCache.Get(nk) if ok { return n, nil } - act, err := mp.api.GetActorAfter(addr, ts) + // get the nonce from the actor before ts + actor, err := mp.api.GetActorBefore(addr, ts) + if err != nil { + return 0, err + } + nextNonce := actor.Nonce + + raddr, err := mp.resolveToKey(ctx, addr) + if err != nil { + return 0, err + } + + // loop over all messages sent by 'addr' and find the highest nonce + messages, err := mp.api.MessagesForTipset(ctx, ts) if err != nil { return 0, err } + for _, message := range messages { + msg := message.VMMessage() + + maddr, err := mp.resolveToKey(ctx, msg.From) + if err != nil { + log.Warnf("failed to resolve message from address: %s", err) + continue + } + + if maddr == raddr { + if n := msg.Nonce + 1; n > nextNonce { + nextNonce = n + } + } + } - mp.nonceCache.Add(nk, act.Nonce) + mp.stateNonceCache.Add(nk, nextNonce) - return act.Nonce, nil + return nextNonce, nil } func (mp *MessagePool) getStateBalance(ctx context.Context, addr address.Address, ts *types.TipSet) (types.BigInt, error) { diff --git a/chain/messagepool/messagepool_test.go b/chain/messagepool/messagepool_test.go index 20da2317e9b..a781b50748c 100644 --- a/chain/messagepool/messagepool_test.go +++ b/chain/messagepool/messagepool_test.go @@ -120,6 +120,22 @@ func (tma *testMpoolAPI) PubSubPublish(string, []byte) error { return nil } +func (tma *testMpoolAPI) GetActorBefore(addr address.Address, ts *types.TipSet) (*types.Actor, error) { + balance, ok := tma.balance[addr] + if !ok { + balance = types.NewInt(1000e6) + tma.balance[addr] = balance + } + + nonce := tma.statenonce[addr] + + return &types.Actor{ + Code: builtin2.AccountActorCodeID, + Nonce: nonce, + Balance: balance, + }, nil +} + func (tma *testMpoolAPI) GetActorAfter(addr address.Address, ts *types.TipSet) (*types.Actor, error) { // regression check for load bug if ts == nil { diff --git a/chain/messagepool/provider.go b/chain/messagepool/provider.go index 123a2607ea0..764e6c13a92 100644 --- a/chain/messagepool/provider.go +++ b/chain/messagepool/provider.go @@ -2,6 +2,7 @@ package messagepool import ( "context" + "errors" "time" "github.com/ipfs/go-cid" @@ -27,6 +28,7 @@ type Provider interface { SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet PutMessage(ctx context.Context, m types.ChainMsg) (cid.Cid, error) PubSubPublish(string, []byte) error + GetActorBefore(address.Address, *types.TipSet) (*types.Actor, error) GetActorAfter(address.Address, *types.TipSet) (*types.Actor, error) StateDeterministicAddressAtFinality(context.Context, address.Address, *types.TipSet) (address.Address, error) StateNetworkVersion(context.Context, abi.ChainEpoch) network.Version @@ -58,6 +60,23 @@ func (mpp *mpoolProvider) IsLite() bool { return mpp.lite != nil } +func (mpp *mpoolProvider) getActorLite(addr address.Address, ts *types.TipSet) (*types.Actor, error) { + if !mpp.IsLite() { + return nil, errors.New("should not use getActorLite on non lite Provider") + } + + n, err := mpp.lite.GetNonce(context.TODO(), addr, ts.Key()) + if err != nil { + return nil, xerrors.Errorf("getting nonce over lite: %w", err) + } + a, err := mpp.lite.GetActor(context.TODO(), addr, ts.Key()) + if err != nil { + return nil, xerrors.Errorf("getting actor over lite: %w", err) + } + a.Nonce = n + return a, nil +} + func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet { mpp.sm.ChainStore().SubscribeHeadChanges( store.WrapHeadChangeCoalescer( @@ -77,18 +96,17 @@ func (mpp *mpoolProvider) PubSubPublish(k string, v []byte) error { return mpp.ps.Publish(k, v) // nolint } +func (mpp *mpoolProvider) GetActorBefore(addr address.Address, ts *types.TipSet) (*types.Actor, error) { + if mpp.IsLite() { + return mpp.getActorLite(addr, ts) + } + + return mpp.sm.LoadActor(context.TODO(), addr, ts) +} + func (mpp *mpoolProvider) GetActorAfter(addr address.Address, ts *types.TipSet) (*types.Actor, error) { if mpp.IsLite() { - n, err := mpp.lite.GetNonce(context.TODO(), addr, ts.Key()) - if err != nil { - return nil, xerrors.Errorf("getting nonce over lite: %w", err) - } - a, err := mpp.lite.GetActor(context.TODO(), addr, ts.Key()) - if err != nil { - return nil, xerrors.Errorf("getting actor over lite: %w", err) - } - a.Nonce = n - return a, nil + return mpp.getActorLite(addr, ts) } stcid, _, err := mpp.sm.TipSetState(context.TODO(), ts)