Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

track expected nonce in mpool, ignore messages with large nonce gaps #3450

Merged
merged 8 commits into from
Sep 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 117 additions & 32 deletions chain/messagepool/messagepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ var minimumBaseFee = types.NewInt(uint64(build.MinimumBaseFee))

var MaxActorPendingMessages = 1000

var MaxNonceGap = uint64(4)

var (
ErrMessageTooBig = errors.New("message too big")

Expand All @@ -68,6 +70,7 @@ var (
ErrSoftValidationFailure = errors.New("validation failure")
ErrRBFTooLowPremium = errors.New("replace by fee has too low GasPremium")
ErrTooManyPendingMessages = errors.New("too many pending messages for actor")
ErrNonceGap = errors.New("unfulfilled nonce gap")

ErrTryAgain = errors.New("state inconsistency while pushing message; please try again")
)
Expand Down Expand Up @@ -131,19 +134,39 @@ type msgSet struct {
requiredFunds *stdbig.Int
}

func newMsgSet() *msgSet {
func newMsgSet(nonce uint64) *msgSet {
return &msgSet{
msgs: make(map[uint64]*types.SignedMessage),
nextNonce: nonce,
requiredFunds: stdbig.NewInt(0),
}
}

func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, limit bool) (bool, error) {
if len(ms.msgs) == 0 || m.Message.Nonce >= ms.nextNonce {
ms.nextNonce = m.Message.Nonce + 1
func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, strict bool) (bool, error) {
nextNonce := ms.nextNonce
nonceGap := false
switch {
case m.Message.Nonce == nextNonce:
nextNonce++
// advance if we are filling a gap
for _, fillGap := ms.msgs[nextNonce]; fillGap; _, fillGap = ms.msgs[nextNonce] {
nextNonce++
}

case strict && m.Message.Nonce > nextNonce+MaxNonceGap:
return false, xerrors.Errorf("message nonce has too big a gap from expected nonce (Nonce: %d, nextNonce: %d): %w", m.Message.Nonce, nextNonce, ErrNonceGap)

case m.Message.Nonce > nextNonce:
nonceGap = true
}

exms, has := ms.msgs[m.Message.Nonce]
if has {
// refuse RBF if we have a gap
if strict && nonceGap {
return false, xerrors.Errorf("rejecting replace by fee because of nonce gap (Nonce: %d, nextNonce: %d): %w", m.Message.Nonce, nextNonce, ErrNonceGap)
}

if m.Cid() != exms.Cid() {
// check if RBF passes
minPrice := exms.Message.GasPremium
Expand All @@ -159,30 +182,65 @@ func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool, limit bool) (bool
m.Message.From, m.Message.Nonce, minPrice, m.Message.GasPremium,
ErrRBFTooLowPremium)
}
} else {
return false, xerrors.Errorf("message from %s with nonce %d already in mpool: %w",
m.Message.From, m.Message.Nonce, ErrSoftValidationFailure)
}

ms.requiredFunds.Sub(ms.requiredFunds, exms.Message.RequiredFunds().Int)
//ms.requiredFunds.Sub(ms.requiredFunds, exms.Message.Value.Int)
}

if !has && limit && len(ms.msgs) > MaxActorPendingMessages {
if !has && strict && len(ms.msgs) > MaxActorPendingMessages {
log.Errorf("too many pending messages from actor %s", m.Message.From)
return false, ErrTooManyPendingMessages
}

if strict && nonceGap {
log.Warnf("adding nonce-gapped message from %s (nonce: %d, nextNonce: %d)",
m.Message.From, m.Message.Nonce, nextNonce)
}

ms.nextNonce = nextNonce
ms.msgs[m.Message.Nonce] = m
ms.requiredFunds.Add(ms.requiredFunds, m.Message.RequiredFunds().Int)
//ms.requiredFunds.Add(ms.requiredFunds, m.Message.Value.Int)

return !has, nil
}

func (ms *msgSet) rm(nonce uint64) {
func (ms *msgSet) rm(nonce uint64, applied bool) {
m, has := ms.msgs[nonce]
if has {
ms.requiredFunds.Sub(ms.requiredFunds, m.Message.RequiredFunds().Int)
//ms.requiredFunds.Sub(ms.requiredFunds, m.Message.Value.Int)
delete(ms.msgs, nonce)
if !has {
if applied && nonce >= ms.nextNonce {
// we removed a message we did not know about because it was applied
// we need to adjust the nonce and check if we filled a gap
ms.nextNonce = nonce + 1
for _, fillGap := ms.msgs[ms.nextNonce]; fillGap; _, fillGap = ms.msgs[ms.nextNonce] {
ms.nextNonce++
}
}
return
}

ms.requiredFunds.Sub(ms.requiredFunds, m.Message.RequiredFunds().Int)
//ms.requiredFunds.Sub(ms.requiredFunds, m.Message.Value.Int)
delete(ms.msgs, nonce)

// adjust next nonce
if applied {
// we removed a (known) message because it was applied in a tipset
// we can't possibly have filled a gap in this case
if nonce >= ms.nextNonce {
ms.nextNonce = nonce + 1
}
return
}

// we removed a message because it was pruned
// we have to adjust the nonce if it creates a gap or rewinds state
if nonce < ms.nextNonce {
ms.nextNonce = nonce
}
}

Expand Down Expand Up @@ -476,14 +534,48 @@ func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet) error
return mp.addLocked(m, true)
}

func (mp *MessagePool) addLoaded(m *types.SignedMessage) error {
err := mp.checkMessage(m)
if err != nil {
return err
}

mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()

curTs := mp.curTs

snonce, err := mp.getStateNonce(m.Message.From, curTs)
if err != nil {
return xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure)
}

if snonce > m.Message.Nonce {
return xerrors.Errorf("minimum expected nonce is %d: %w", snonce, ErrNonceTooLow)
}

mp.lk.Lock()
defer mp.lk.Unlock()

if err := mp.verifyMsgBeforeAdd(m, curTs.Height()); err != nil {
return err
}

if err := mp.checkBalance(m, curTs); err != nil {
return err
}

return mp.addLocked(m, false)
}

func (mp *MessagePool) addSkipChecks(m *types.SignedMessage) error {
mp.lk.Lock()
defer mp.lk.Unlock()

return mp.addLocked(m, false)
}

func (mp *MessagePool) addLocked(m *types.SignedMessage, limit bool) error {
func (mp *MessagePool) addLocked(m *types.SignedMessage, strict bool) error {
log.Debugf("mpooladd: %s %d", m.Message.From, m.Message.Nonce)
if m.Signature.Type == crypto.SigTypeBLS {
mp.blsSigCache.Add(m.Cid(), m.Signature)
Expand All @@ -501,11 +593,16 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage, limit bool) error {

mset, ok := mp.pending[m.Message.From]
if !ok {
mset = newMsgSet()
nonce, err := mp.getStateNonce(m.Message.From, mp.curTs)
if err != nil {
return xerrors.Errorf("failed to get initial actor nonce: %w", err)
}

mset = newMsgSet(nonce)
mp.pending[m.Message.From] = mset
}

incr, err := mset.add(m, mp, limit)
incr, err := mset.add(m, mp, strict)
if err != nil {
log.Info(err)
return err
Expand Down Expand Up @@ -664,14 +761,14 @@ func (mp *MessagePool) PushWithNonce(ctx context.Context, addr address.Address,
return msg, mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb)
}

func (mp *MessagePool) Remove(from address.Address, nonce uint64) {
func (mp *MessagePool) Remove(from address.Address, nonce uint64, applied bool) {
mp.lk.Lock()
defer mp.lk.Unlock()

mp.remove(from, nonce)
mp.remove(from, nonce, applied)
}

func (mp *MessagePool) remove(from address.Address, nonce uint64) {
func (mp *MessagePool) remove(from address.Address, nonce uint64, applied bool) {
mset, ok := mp.pending[from]
if !ok {
return
Expand All @@ -688,22 +785,10 @@ func (mp *MessagePool) remove(from address.Address, nonce uint64) {

// NB: This deletes any message with the given nonce. This makes sense
// as two messages with the same sender cannot have the same nonce
mset.rm(nonce)
mset.rm(nonce, applied)

if len(mset.msgs) == 0 {
delete(mp.pending, from)
} else {
var max uint64
for nonce := range mset.msgs {
if max < nonce {
max = nonce
}
}
if max < nonce {
max = nonce // we could have not seen the removed message before
}

mset.nextNonce = max + 1
}
}

Expand Down Expand Up @@ -771,7 +856,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
rm := func(from address.Address, nonce uint64) {
s, ok := rmsgs[from]
if !ok {
mp.Remove(from, nonce)
mp.Remove(from, nonce, true)
return
}

Expand All @@ -780,7 +865,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
return
}

mp.Remove(from, nonce)
mp.Remove(from, nonce, true)
}

maybeRepub := func(cid cid.Cid) {
Expand Down Expand Up @@ -1082,7 +1167,7 @@ func (mp *MessagePool) loadLocal() error {
return xerrors.Errorf("unmarshaling local message: %w", err)
}

if err := mp.Add(&sm); err != nil {
if err := mp.addLoaded(&sm); err != nil {
if xerrors.Is(err, ErrNonceTooLow) {
continue // todo: drop the message from local cache (if above certain confidence threshold)
}
Expand Down
6 changes: 6 additions & 0 deletions chain/messagepool/messagepool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,12 @@ func TestRevertMessages(t *testing.T) {
}

func TestPruningSimple(t *testing.T) {
oldMaxNonceGap := MaxNonceGap
MaxNonceGap = 1000
defer func() {
MaxNonceGap = oldMaxNonceGap
}()

tma := newTestMpoolAPI()

w, err := wallet.NewWallet(wallet.NewMemKeyStore())
Expand Down
2 changes: 1 addition & 1 deletion chain/messagepool/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ keepLoop:
// and remove all messages that are still in pruneMsgs after processing the chains
log.Infof("Pruning %d messages", len(pruneMsgs))
for _, m := range pruneMsgs {
mp.remove(m.Message.From, m.Message.Nonce)
mp.remove(m.Message.From, m.Message.Nonce, false)
}

return nil
Expand Down
9 changes: 9 additions & 0 deletions chain/messagepool/repub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package messagepool
import (
"context"
"sort"
"time"

"golang.org/x/xerrors"

Expand All @@ -15,6 +16,8 @@ import (

const repubMsgLimit = 30

var RepublishBatchDelay = 200 * time.Millisecond

func (mp *MessagePool) republishPendingMessages() error {
mp.curTsLk.Lock()
ts := mp.curTs
Expand Down Expand Up @@ -131,6 +134,12 @@ func (mp *MessagePool) republishPendingMessages() error {
}

count++

if count < len(msgs) {
// this delay is here to encourage the pubsub subsystem to process the messages serially
// and avoid creating nonce gaps because of concurrent validation.
time.Sleep(RepublishBatchDelay)
}
}

// track most recently republished messages
Expand Down
6 changes: 6 additions & 0 deletions chain/messagepool/repub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ import (
)

func TestRepubMessages(t *testing.T) {
oldRepublishBatchDelay := RepublishBatchDelay
RepublishBatchDelay = time.Microsecond
defer func() {
RepublishBatchDelay = oldRepublishBatchDelay
}()

tma := newTestMpoolAPI()
ds := datastore.NewMapDatastore()

Expand Down
6 changes: 6 additions & 0 deletions chain/messagepool/selection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,12 @@ func TestMessageChainSkipping(t *testing.T) {
}

func TestBasicMessageSelection(t *testing.T) {
oldMaxNonceGap := MaxNonceGap
MaxNonceGap = 1000
defer func() {
MaxNonceGap = oldMaxNonceGap
}()

mp, tma := makeTestMpool()

// the actors
Expand Down
2 changes: 2 additions & 0 deletions chain/sub/incoming.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,8 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs
fallthrough
case xerrors.Is(err, messagepool.ErrTooManyPendingMessages):
fallthrough
case xerrors.Is(err, messagepool.ErrNonceGap):
fallthrough
case xerrors.Is(err, messagepool.ErrNonceTooLow):
return pubsub.ValidationIgnore
default:
Expand Down