diff --git a/core/outbox.go b/core/outbox.go new file mode 100644 index 0000000000..2135757af9 --- /dev/null +++ b/core/outbox.go @@ -0,0 +1,147 @@ +package core + +import ( + "context" + "sync" + + "github.com/filecoin-project/go-filecoin/abi" + "github.com/filecoin-project/go-filecoin/actor" + "github.com/filecoin-project/go-filecoin/address" + "github.com/filecoin-project/go-filecoin/consensus" + "github.com/filecoin-project/go-filecoin/metrics" + "github.com/filecoin-project/go-filecoin/types" + "github.com/ipfs/go-cid" + "github.com/pkg/errors" +) + +// Topic is the network pubsub topic identifier on which new messages are announced. +const Topic = "/fil/msgs" + +// Outbox validates and marshals messages for sending and maintains the outbound message queue. +type Outbox struct { + // Signs messages + signer types.Signer + // Validates messages before sending them. + validator consensus.SignedMessageValidator + // Holds messages sent from this node but not yet mined. + queue *MessageQueue + // Invoked to publish a message to the network + publisher publisher + + chains chainProvider + actors actorProvider + + // Protects the "next nonce" calculation to avoid collisions. + nonceLock sync.Mutex +} + +type chainProvider interface { + GetHead() types.SortedCidSet + GetTipSet(tsKey types.SortedCidSet) (*types.TipSet, error) +} + +type actorProvider interface { + // GetActor returns the actor state defined by the chain up to some tipset + GetActor(ctx context.Context, tipset types.SortedCidSet, addr address.Address) (*actor.Actor, error) +} + +type publisher interface { + Publish(ctx context.Context, message *types.SignedMessage, height uint64) error +} + +var msgSendErrCt = metrics.NewInt64Counter("message_sender_error", "Number of errors encountered while sending a message") + +// NewOutbox creates a new outbox +func NewOutbox(signer types.Signer, validator consensus.SignedMessageValidator, queue *MessageQueue, + publisher publisher, chains chainProvider, actors actorProvider) *Outbox { + return &Outbox{ + signer: signer, + validator: validator, + queue: queue, + publisher: publisher, + chains: chains, + actors: actors, + } +} + +// Send marshals and sends a message, retaining it in the outbound message queue. +func (ob *Outbox) Send(ctx context.Context, from, to address.Address, value *types.AttoFIL, + gasPrice types.AttoFIL, gasLimit types.GasUnits, method string, params ...interface{}) (out cid.Cid, err error) { + defer func() { + if err != nil { + msgSendErrCt.Inc(ctx, 1) + } + }() + + encodedParams, err := abi.ToEncodedValues(params...) + if err != nil { + return cid.Undef, errors.Wrap(err, "invalid params") + } + + // Lock to avoid race for message nonce. + ob.nonceLock.Lock() + defer ob.nonceLock.Unlock() + + head := ob.chains.GetHead() + + fromActor, err := ob.actors.GetActor(ctx, head, from) + if err != nil { + return cid.Undef, errors.Wrapf(err, "no GetActor at address %s", from) + } + + nonce, err := nextNonce(fromActor, ob.queue, from) + if err != nil { + return cid.Undef, errors.Wrapf(err, "failed calculating nonce for GetActor %s", from) + } + + rawMsg := types.NewMessage(from, to, nonce, value, method, encodedParams) + signed, err := types.NewSignedMessage(*rawMsg, ob.signer, gasPrice, gasLimit) + if err != nil { + return cid.Undef, errors.Wrap(err, "failed to sign message") + } + + err = ob.validator.Validate(ctx, signed, fromActor) + if err != nil { + return cid.Undef, errors.Wrap(err, "invalid message") + } + + height, err := tipsetHeight(ob.chains, head) + if err != nil { + return cid.Undef, errors.Wrap(err, "failed to get block height") + } + + // Add to the local message queue/pool at the last possible moment before broadcasting to network. + if err := ob.queue.Enqueue(signed, height); err != nil { + return cid.Undef, errors.Wrap(err, "failed to add message to outbound queue") + } + + err = ob.publisher.Publish(ctx, signed, height) + if err != nil { + return cid.Undef, err + } + + return signed.Cid() +} + +// nextNonce returns the next expected nonce value for an account GetActor. This is the larger +// of the GetActor's nonce value, or one greater than the largest nonce from the GetActor found in the message queue. +func nextNonce(act *actor.Actor, queue *MessageQueue, address address.Address) (uint64, error) { + actorNonce, err := actor.NextNonce(act) + if err != nil { + return 0, err + } + + poolNonce, found := queue.LargestNonce(address) + if found && poolNonce >= actorNonce { + return poolNonce + 1, nil + } + return actorNonce, nil +} + +func tipsetHeight(provider chainProvider, key types.SortedCidSet) (uint64, error) { + head, err := provider.GetTipSet(key) + if err != nil { + return 0, err + } + return head.Height() +} diff --git a/core/outbox_test.go b/core/outbox_test.go new file mode 100644 index 0000000000..3f7004ad64 --- /dev/null +++ b/core/outbox_test.go @@ -0,0 +1,199 @@ +package core_test + +import ( + "context" + "fmt" + "sync" + "testing" + + "github.com/filecoin-project/go-filecoin/actor" + "github.com/filecoin-project/go-filecoin/actor/builtin/account" + "github.com/filecoin-project/go-filecoin/actor/builtin/storagemarket" + "github.com/filecoin-project/go-filecoin/address" + "github.com/filecoin-project/go-filecoin/core" + tf "github.com/filecoin-project/go-filecoin/testhelpers/testflags" + "github.com/filecoin-project/go-filecoin/types" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestOutbox(t *testing.T) { + tf.UnitTest(t) + + t.Run("invalid message rejected", func(t *testing.T) { + w, _ := types.NewMockSignersAndKeyInfo(1) + sender := w.Addresses[0] + queue := core.NewMessageQueue() + publisher := &mockPublisher{} + provider := &fakeProvider{} + + ob := core.NewOutbox(w, nullValidator{rejectMessages: true}, queue, publisher, provider, provider) + + cid, err := ob.Send(context.Background(), sender, sender, types.NewAttoFILFromFIL(2), types.NewGasPrice(0), types.NewGasUnits(0), "") + assert.Errorf(t, err, "for testing") + assert.False(t, cid.Defined()) + }) + + t.Run("send message enqueues and calls publish", func(t *testing.T) { + w, _ := types.NewMockSignersAndKeyInfo(1) + sender := w.Addresses[0] + toAddr := address.NewForTestGetter()() + queue := core.NewMessageQueue() + publisher := &mockPublisher{} + provider := &fakeProvider{} + + blk := types.NewBlockForTest(nil, 1) + blk.Height = 1000 + actr, _ := account.NewActor(types.NewZeroAttoFIL()) + actr.Nonce = 42 + provider.Set(t, blk, sender, actr) + + ob := core.NewOutbox(w, nullValidator{}, queue, publisher, provider, provider) + require.Empty(t, queue.List(sender)) + require.Nil(t, publisher.message) + + _, err := ob.Send(context.Background(), sender, toAddr, types.NewZeroAttoFIL(), types.NewGasPrice(0), types.NewGasUnits(0), "") + require.NoError(t, err) + assert.Equal(t, uint64(1000), queue.List(sender)[0].Stamp) + assert.NotNil(t, publisher.message) + assert.Equal(t, actr.Nonce, publisher.message.Nonce) + assert.Equal(t, uint64(1000), publisher.height) + }) + + t.Run("send message avoids nonce race", func(t *testing.T) { + ctx := context.Background() + msgCount := 20 // number of messages to send + sendConcurrent := 3 // number of of concurrent message sends + + w, _ := types.NewMockSignersAndKeyInfo(1) + sender := w.Addresses[0] + toAddr := address.NewForTestGetter()() + queue := core.NewMessageQueue() + publisher := &mockPublisher{} + provider := &fakeProvider{} + + blk := types.NewBlockForTest(nil, 1) + blk.Height = 1000 + actr, _ := account.NewActor(types.NewZeroAttoFIL()) + actr.Nonce = 42 + provider.Set(t, blk, sender, actr) + + s := core.NewOutbox(w, nullValidator{}, queue, publisher, provider, provider) + + var wg sync.WaitGroup + addTwentyMessages := func(batch int) { + defer wg.Done() + for i := 0; i < msgCount; i++ { + _, err := s.Send(ctx, sender, toAddr, types.NewZeroAttoFIL(), types.NewGasPrice(0), types.NewGasUnits(0), fmt.Sprintf("%d-%d", batch, i), []byte{}) + require.NoError(t, err) + } + } + + // Add messages concurrently. + for i := 0; i < sendConcurrent; i++ { + wg.Add(1) + go addTwentyMessages(i) + } + wg.Wait() + + enqueued := queue.List(sender) + assert.Equal(t, 60, len(enqueued)) + + // Expect the nonces to be distinct and contiguous + nonces := map[uint64]bool{} + for _, message := range enqueued { + assert.Equal(t, uint64(1000), message.Stamp) + _, found := nonces[uint64(message.Msg.Nonce)] + require.False(t, found) + nonces[uint64(message.Msg.Nonce)] = true + } + + for i := 0; i < 60; i++ { + assert.True(t, nonces[uint64(actr.Nonce)+uint64(i)]) + + } + }) + + t.Run("nonce fails with non-account actor", func(t *testing.T) { + w, _ := types.NewMockSignersAndKeyInfo(1) + sender := w.Addresses[0] + toAddr := address.NewForTestGetter()() + queue := core.NewMessageQueue() + publisher := &mockPublisher{} + provider := &fakeProvider{} + + blk := types.NewBlockForTest(nil, 1) + actr, _ := storagemarket.NewActor() // Not an account actor + provider.Set(t, blk, sender, actr) + + ob := core.NewOutbox(w, nullValidator{}, queue, publisher, provider, provider) + + _, err := ob.Send(context.Background(), sender, toAddr, types.NewZeroAttoFIL(), types.NewGasPrice(0), types.NewGasUnits(0), "") + assert.Error(t, err) + assert.Contains(t, err.Error(), "account or empty") + }) +} + +// A publisher which just stores the last message published. +type mockPublisher struct { + returnError error // Error to be returned by Publish() + message *types.SignedMessage // Message received by Publish() + height uint64 // Height received by Publish() +} + +func (p *mockPublisher) Publish(ctx context.Context, message *types.SignedMessage, height uint64) error { + p.message = message + p.height = height + return p.returnError +} + +// A chain and actor provider which provides and expects values for a single message. +type fakeProvider struct { + head types.SortedCidSet // Provided by GetHead and expected by others + tipset *types.TipSet // Provided by GetTipset(head) + addr address.Address // Expected by GetActor + actor *actor.Actor // Provided by GetActor(head, addr) +} + +func (p *fakeProvider) GetHead() types.SortedCidSet { + return p.head +} + +func (p *fakeProvider) GetTipSet(tsKey types.SortedCidSet) (*types.TipSet, error) { + if !tsKey.Equals(p.head) { + return nil, errors.Errorf("No such tipset %s, expected %s", tsKey, p.head) + } + return p.tipset, nil +} + +func (p *fakeProvider) GetActor(ctx context.Context, tsKey types.SortedCidSet, addr address.Address) (*actor.Actor, error) { + if !tsKey.Equals(p.head) { + return nil, errors.Errorf("No such tipset %s, expected %s", tsKey, p.head) + } + if addr != p.addr { + return nil, errors.Errorf("No such address %s, expected %s", addr, p.addr) + } + return p.actor, nil +} + +// Set sets the tipset, from address, and actor to be provided by creating a tipset with one block. +func (p *fakeProvider) Set(t *testing.T, block *types.Block, addr address.Address, actor *actor.Actor) { + ts := types.RequireNewTipSet(t, block) + p.tipset = &ts + tsKey := types.NewSortedCidSet(block.Cid()) + p.head = tsKey + p.addr = addr + p.actor = actor +} + +type nullValidator struct { + rejectMessages bool +} + +func (v nullValidator) Validate(ctx context.Context, msg *types.SignedMessage, fromActor *actor.Actor) error { + if v.rejectMessages { + return errors.New("rejected for testing") + } + return nil +} diff --git a/node/actor_provider.go b/node/actor_provider.go new file mode 100644 index 0000000000..82cd2d3986 --- /dev/null +++ b/node/actor_provider.go @@ -0,0 +1,46 @@ +package node + +import ( + "context" + + "github.com/filecoin-project/go-filecoin/actor" + "github.com/filecoin-project/go-filecoin/actor/builtin" + "github.com/filecoin-project/go-filecoin/address" + "github.com/filecoin-project/go-filecoin/state" + "github.com/filecoin-project/go-filecoin/types" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-hamt-ipld" + "github.com/pkg/errors" +) + +// defaultActorProvider reads actor state out of the IPLD HAMT store. +// This is wiring to provide actor state to the outbox. +type defaultActorProvider struct { + chain chainStore + state *hamt.CborIpldStore +} + +type chainStore interface { + GetTipSetStateRoot(tsKey types.SortedCidSet) (cid.Cid, error) +} + +func newDefaultActorProvider(chain chainStore, state *hamt.CborIpldStore) *defaultActorProvider { + return &defaultActorProvider{chain, state} +} + +func (ap defaultActorProvider) GetActor(ctx context.Context, tipset types.SortedCidSet, addr address.Address) (*actor.Actor, error) { + stateCid, err := ap.chain.GetTipSetStateRoot(tipset) + if err != nil { + return nil, err + } + tree, err := state.LoadStateTree(ctx, ap.state, stateCid, builtin.Actors) + if err != nil { + return nil, errors.Wrap(err, "failed to load latest state") + } + + actr, err := tree.GetActor(ctx, addr) + if err != nil { + return nil, errors.Wrapf(err, "no actor at address %s", addr) + } + return actr, nil +} diff --git a/node/msg_publisher.go b/node/msg_publisher.go new file mode 100644 index 0000000000..4a3fd48b97 --- /dev/null +++ b/node/msg_publisher.go @@ -0,0 +1,38 @@ +package node + +import ( + "context" + + "github.com/filecoin-project/go-filecoin/core" + "github.com/filecoin-project/go-filecoin/net/pubsub" + "github.com/filecoin-project/go-filecoin/types" + "github.com/pkg/errors" +) + +// defaultMessagePublisher publishes messages to a pubsub topic and adds them to a message pool. +// This is wiring for message publication from the outbox. +type defaultMessagePublisher struct { + network *pubsub.Publisher + topic string + pool *core.MessagePool +} + +func newDefaultMessagePublisher(pubsub *pubsub.Publisher, topic string, pool *core.MessagePool) *defaultMessagePublisher { + return &defaultMessagePublisher{pubsub, topic, pool} +} + +func (p *defaultMessagePublisher) Publish(ctx context.Context, message *types.SignedMessage, height uint64) error { + encoded, err := message.Marshal() + if err != nil { + return errors.Wrap(err, "failed to marshal message") + } + + if _, err := p.pool.Add(ctx, message); err != nil { + return errors.Wrap(err, "failed to add message to message pool") + } + + if err = p.network.Publish(p.topic, encoded); err != nil { + return errors.Wrap(err, "failed to publish message to network") + } + return nil +} diff --git a/node/node.go b/node/node.go index 74aed30a34..183d4db8aa 100644 --- a/node/node.go +++ b/node/node.go @@ -119,7 +119,7 @@ type Node struct { // Incoming messages for block mining. MsgPool *core.MessagePool // Messages sent and not yet mined. - Outbox *core.MessageQueue + MsgQueue *core.MessageQueue Wallet *wallet.Wallet @@ -392,6 +392,7 @@ func (nc *Config) Build(ctx context.Context) (*Node, error) { // set up chainstore chainStore := chain.NewDefaultStore(nc.Repo.ChainDatastore(), genCid) + chainFacade := bcf.NewBlockChainFacade(chainStore, &cstOffline) powerTable := &consensus.MarketView{} // set up processor @@ -410,24 +411,27 @@ func (nc *Config) Build(ctx context.Context) (*Node, error) { nodeConsensus = consensus.NewExpected(&cstOffline, bs, processor, powerTable, genCid, nc.Verifier) } - chainFacade := bcf.NewBlockChainFacade(chainStore, &cstOffline) - - // only the syncer gets the storage which is online connected - chainSyncer := chain.NewDefaultSyncer(&cstOffline, nodeConsensus, chainStore, fetcher) - msgPool := core.NewMessagePool(chainStore, nc.Repo.Config().Mpool, consensus.NewIngestionValidator(chainFacade, nc.Repo.Config().Mpool)) - outbox := core.NewMessageQueue() - - // Set up libp2p pubsub + // Set up libp2p network fsub, err := libp2pps.NewFloodSub(ctx, peerHost) if err != nil { - return nil, errors.Wrap(err, "failed to set up pubsub") + return nil, errors.Wrap(err, "failed to set up network") } + backend, err := wallet.NewDSBackend(nc.Repo.WalletDatastore()) if err != nil { return nil, errors.Wrap(err, "failed to set up wallet backend") } fcWallet := wallet.New(backend) + // only the syncer gets the storage which is online connected + chainSyncer := chain.NewDefaultSyncer(&cstOffline, nodeConsensus, chainStore, fetcher) + msgPool := core.NewMessagePool(chainStore, nc.Repo.Config().Mpool, consensus.NewIngestionValidator(chainFacade, nc.Repo.Config().Mpool)) + msgQueue := core.NewMessageQueue() + + msgPublisher := newDefaultMessagePublisher(pubsub.NewPublisher(fsub), core.Topic, msgPool) + actorProvider := newDefaultActorProvider(chainStore, &cstOffline) + outbox := core.NewOutbox(fcWallet, consensus.NewOutboundMessageValidator(), msgQueue, msgPublisher, chainStore, actorProvider) + PorcelainAPI := porcelain.New(plumbing.New(&plumbing.APIDeps{ Bitswap: bswap, Chain: chainFacade, @@ -437,10 +441,10 @@ func (nc *Config) Build(ctx context.Context) (*Node, error) { MsgPool: msgPool, MsgPreviewer: msg.NewPreviewer(fcWallet, chainStore, &cstOffline, bs), MsgQueryer: msg.NewQueryer(nc.Repo, fcWallet, chainStore, &cstOffline, bs), - MsgSender: msg.NewSender(fcWallet, chainStore, &cstOffline, chainStore, outbox, msgPool, consensus.NewOutboundMessageValidator(), fsub.Publish), + MsgSender: msg.NewSender(outbox), MsgWaiter: msg.NewWaiter(chainStore, bs, &cstOffline), Network: net.New(peerHost, pubsub.NewPublisher(fsub), pubsub.NewSubscriber(fsub), net.NewRouter(router), bandwidthTracker, net.NewPinger(peerHost, pingService)), - Outbox: outbox, + Outbox: msgQueue, Wallet: fcWallet, })) @@ -457,7 +461,7 @@ func (nc *Config) Build(ctx context.Context) (*Node, error) { Exchange: bswap, host: peerHost, MsgPool: msgPool, - Outbox: outbox, + MsgQueue: msgQueue, OfflineMode: nc.OfflineMode, PeerHost: peerHost, Repo: nc.Repo, @@ -532,7 +536,7 @@ func (node *Node) Start(ctx context.Context) error { node.BlockSub = blkSub // subscribe to message notifications - msgSub, err := node.PorcelainAPI.PubSubSubscribe(msg.Topic) + msgSub, err := node.PorcelainAPI.PubSubSubscribe(core.Topic) if err != nil { return errors.Wrap(err, "failed to subscribe to message topic") } @@ -544,7 +548,7 @@ func (node *Node) Start(ctx context.Context) error { go node.handleSubscription(cctx, node.processBlock, "processBlock", node.BlockSub, "BlockSub") go node.handleSubscription(cctx, node.processMessage, "processMessage", node.MessageSub, "MessageSub") - outboxPolicy := core.NewMessageQueuePolicy(node.Outbox, node.ChainReader, core.OutboxMaxAgeRounds) + outboxPolicy := core.NewMessageQueuePolicy(node.MsgQueue, node.ChainReader, core.OutboxMaxAgeRounds) node.HeaviestTipSetHandled = func() {} node.HeaviestTipSetCh = node.ChainReader.HeadEvents().Sub(chain.NewHeadTopic) diff --git a/plumbing/msg/sender.go b/plumbing/msg/sender.go index 7845ff0df3..a9d9e880c4 100644 --- a/plumbing/msg/sender.go +++ b/plumbing/msg/sender.go @@ -2,160 +2,25 @@ package msg import ( "context" - "sync" - "github.com/ipfs/go-cid" - "github.com/ipfs/go-hamt-ipld" - "github.com/pkg/errors" - - "github.com/filecoin-project/go-filecoin/abi" - "github.com/filecoin-project/go-filecoin/actor" "github.com/filecoin-project/go-filecoin/address" - "github.com/filecoin-project/go-filecoin/chain" - "github.com/filecoin-project/go-filecoin/consensus" "github.com/filecoin-project/go-filecoin/core" - "github.com/filecoin-project/go-filecoin/metrics" "github.com/filecoin-project/go-filecoin/types" + "github.com/ipfs/go-cid" ) -var msgSendErrCt = metrics.NewInt64Counter("message_sender_error", "Number of errors encountered while sending a message") - -// Topic is the network pubsub topic identifier on which new messages are announced. -const Topic = "/fil/msgs" - -// Abstracts over a store of blockchain state. -type senderChainReader interface { - GetHead() types.SortedCidSet - GetTipSetStateRoot(tsKey types.SortedCidSet) (cid.Cid, error) -} - -// BlockClock defines a interface to a struct that can give the current block height. -type BlockClock interface { - BlockHeight() (uint64, error) -} - -// PublishFunc is a function the Sender calls to publish a message to the network. -type PublishFunc func(topic string, data []byte) error - // Sender is plumbing implementation that knows how to send a message. type Sender struct { - // Signs messages. - signer types.Signer - // Provides actor state - chainState senderChainReader - // To load the tree for the head tipset state root. - cst *hamt.CborIpldStore - // Provides the current block height - blockTimer BlockClock - // Tracks inbound messages for mining - inbox *core.MessagePool - // Tracks outbound messages - outbox *core.MessageQueue - // Validates messages before sending them. - validator consensus.SignedMessageValidator - // Invoked to publish the new message to the network. - publish PublishFunc - // Protects the "next nonce" calculation to avoid collisions. - l sync.Mutex + outbox *core.Outbox } -// NewSender returns a new Sender. There should be exactly one of these per node because -// sending locks to reduce nonce collisions. -func NewSender(signer types.Signer, chainReader senderChainReader, cst *hamt.CborIpldStore, blockTimer BlockClock, - msgQueue *core.MessageQueue, msgPool *core.MessagePool, - validator consensus.SignedMessageValidator, publish PublishFunc) *Sender { - return &Sender{ - signer: signer, - chainState: chainReader, - cst: cst, - blockTimer: blockTimer, - inbox: msgPool, - outbox: msgQueue, - validator: validator, - publish: publish, - } +// NewSender creates a new sender plumbing +func NewSender(outbox *core.Outbox) *Sender { + return &Sender{outbox} } // Send sends a message. See api description. func (s *Sender) Send(ctx context.Context, from, to address.Address, value *types.AttoFIL, gasPrice types.AttoFIL, gasLimit types.GasUnits, method string, params ...interface{}) (out cid.Cid, err error) { - defer func() { - if err != nil { - msgSendErrCt.Inc(ctx, 1) - } - }() - - encodedParams, err := abi.ToEncodedValues(params...) - if err != nil { - return cid.Undef, errors.Wrap(err, "invalid params") - } - - // Lock to avoid race for message nonce. - s.l.Lock() - defer s.l.Unlock() - - st, err := chain.LatestState(ctx, s.chainState, s.cst) - if err != nil { - return cid.Undef, errors.Wrap(err, "failed to load state from chain") - } - - fromActor, err := st.GetActor(ctx, from) - if err != nil { - return cid.Undef, errors.Wrapf(err, "no actor at address %s", from) - } - - nonce, err := nextNonce(fromActor, s.outbox, from) - if err != nil { - return cid.Undef, errors.Wrapf(err, "failed calculating nonce for actor %s", from) - } - - msg := types.NewMessage(from, to, nonce, value, method, encodedParams) - smsg, err := types.NewSignedMessage(*msg, s.signer, gasPrice, gasLimit) - if err != nil { - return cid.Undef, errors.Wrap(err, "failed to sign message") - } - - err = s.validator.Validate(ctx, smsg, fromActor) - if err != nil { - return cid.Undef, errors.Wrap(err, "invalid message") - } - - smsgdata, err := smsg.Marshal() - if err != nil { - return cid.Undef, errors.Wrap(err, "failed to marshal message") - } - - height, err := s.blockTimer.BlockHeight() - if err != nil { - return cid.Undef, errors.Wrap(err, "failed to get block height") - } - - // Add to the local message queue/pool at the last possible moment before broadcasting to network. - if err := s.outbox.Enqueue(smsg, height); err != nil { - return cid.Undef, errors.Wrap(err, "failed to add message to outbound queue") - } - if _, err := s.inbox.Add(ctx, smsg); err != nil { - return cid.Undef, errors.Wrap(err, "failed to add message to message pool") - } - - if err = s.publish(Topic, smsgdata); err != nil { - return cid.Undef, errors.Wrap(err, "failed to publish message to network") - } - - log.Debugf("MessageSend with message: %s", smsg) - return smsg.Cid() -} - -// nextNonce returns the next expected nonce value for an account actor. This is the larger -// of the actor's nonce value, or one greater than the largest nonce from the actor found in the message pool. -func nextNonce(act *actor.Actor, outbox *core.MessageQueue, address address.Address) (uint64, error) { - actorNonce, err := actor.NextNonce(act) - if err != nil { - return 0, err - } + return s.outbox.Send(ctx, from, to, value, gasPrice, gasLimit, method, params...) - poolNonce, found := outbox.LargestNonce(address) - if found && poolNonce >= actorNonce { - return poolNonce + 1, nil - } - return actorNonce, nil } diff --git a/plumbing/msg/sender_test.go b/plumbing/msg/sender_test.go deleted file mode 100644 index af5c5fa6ea..0000000000 --- a/plumbing/msg/sender_test.go +++ /dev/null @@ -1,198 +0,0 @@ -package msg - -import ( - "context" - "fmt" - "sync" - "testing" - - "github.com/filecoin-project/go-filecoin/actor" - "github.com/filecoin-project/go-filecoin/actor/builtin/account" - "github.com/filecoin-project/go-filecoin/actor/builtin/storagemarket" - "github.com/filecoin-project/go-filecoin/address" - "github.com/filecoin-project/go-filecoin/chain" - "github.com/filecoin-project/go-filecoin/config" - "github.com/filecoin-project/go-filecoin/consensus" - "github.com/filecoin-project/go-filecoin/core" - "github.com/filecoin-project/go-filecoin/testhelpers" - tf "github.com/filecoin-project/go-filecoin/testhelpers/testflags" - "github.com/filecoin-project/go-filecoin/types" - "github.com/filecoin-project/go-filecoin/wallet" - hamt "github.com/ipfs/go-hamt-ipld" - "github.com/pkg/errors" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestSend(t *testing.T) { - tf.UnitTest(t) - - t.Run("invalid message rejected", func(t *testing.T) { - w, chainStore, cst := setupSendTest(t) - addr := w.Addresses()[0] - timer := testhelpers.NewTestMessagePoolAPI(1000) - queue := core.NewMessageQueue() - pool := core.NewMessagePool(timer, config.NewDefaultConfig().Mpool, testhelpers.NewMockMessagePoolValidator()) - nopPublish := func(string, []byte) error { return nil } - - s := NewSender(w, chainStore, cst, timer, queue, pool, nullValidator{rejectMessages: true}, nopPublish) - _, err := s.Send(context.Background(), addr, addr, types.NewAttoFILFromFIL(2), types.NewGasPrice(0), types.NewGasUnits(0), "") - assert.Errorf(t, err, "for testing") - }) - - t.Run("send message enqueues and calls publish", func(t *testing.T) { - w, chainStore, cst := setupSendTest(t) - addr := w.Addresses()[0] - toAddr := address.NewForTestGetter()() - timer := testhelpers.NewTestMessagePoolAPI(1000) - queue := core.NewMessageQueue() - pool := core.NewMessagePool(timer, config.NewDefaultConfig().Mpool, testhelpers.NewMockMessagePoolValidator()) - - publishCalled := false - publish := func(topic string, data []byte) error { - assert.Equal(t, Topic, topic) - publishCalled = true - return nil - } - - s := NewSender(w, chainStore, cst, timer, queue, pool, nullValidator{}, publish) - require.Empty(t, queue.List(addr)) - require.Empty(t, pool.Pending()) - - _, err := s.Send(context.Background(), addr, toAddr, types.NewZeroAttoFIL(), types.NewGasPrice(0), types.NewGasUnits(0), "") - require.NoError(t, err) - assert.Equal(t, uint64(1000), queue.List(addr)[0].Stamp) - assert.Equal(t, 1, len(pool.Pending())) - assert.True(t, publishCalled) - }) - - t.Run("send message avoids nonce race", func(t *testing.T) { - ctx := context.Background() - - // number of messages to send - msgCount := 20 - // number of of concurrent message sends - sendConcurrent := 3 - // total messages sent == msgCount * sendConcurrent - mpoolCfg := config.NewDefaultConfig().Mpool - mpoolCfg.MaxPoolSize = msgCount * sendConcurrent - - w, chainStore, cst := setupSendTest(t) - addr := w.Addresses()[0] - toAddr := address.NewForTestGetter()() - timer := testhelpers.NewTestMessagePoolAPI(1000) - queue := core.NewMessageQueue() - pool := core.NewMessagePool(timer, mpoolCfg, testhelpers.NewMockMessagePoolValidator()) - nopPublish := func(string, []byte) error { return nil } - - s := NewSender(w, chainStore, cst, timer, queue, pool, nullValidator{}, nopPublish) - - var wg sync.WaitGroup - addTwentyMessages := func(batch int) { - defer wg.Done() - for i := 0; i < msgCount; i++ { - _, err := s.Send(ctx, addr, toAddr, types.NewZeroAttoFIL(), types.NewGasPrice(0), types.NewGasUnits(0), fmt.Sprintf("%d-%d", batch, i), []byte{}) - require.NoError(t, err) - } - } - - // Add messages concurrently. - for i := 0; i < sendConcurrent; i++ { - wg.Add(1) - go addTwentyMessages(i) - } - - wg.Wait() - - assert.Equal(t, 60, len(pool.Pending())) - - // Expect none of the messages to have the same nonce. - nonces := map[uint64]bool{} - for _, message := range pool.Pending() { - _, found := nonces[uint64(message.Nonce)] - require.False(t, found) - nonces[uint64(message.Nonce)] = true - } - }) -} - -func TestNextNonce(t *testing.T) { - tf.UnitTest(t) - - t.Run("account exists but wrong type", func(t *testing.T) { - address := address.NewForTestGetter()() - actor, err := storagemarket.NewActor() - assert.NoError(t, err) - - _, err = nextNonce(actor, core.NewMessageQueue(), address) - assert.Error(t, err) - assert.Contains(t, err.Error(), "account or empty") - }) - - t.Run("account exists, gets correct value", func(t *testing.T) { - address := address.NewForTestGetter()() - actor, err := account.NewActor(types.NewAttoFILFromFIL(0)) - assert.NoError(t, err) - actor.Nonce = 42 - - nonce, err := nextNonce(actor, core.NewMessageQueue(), address) - assert.NoError(t, err) - assert.Equal(t, uint64(42), nonce) - }) - - t.Run("gets nonce from highest message queue value", func(t *testing.T) { - outbox := core.NewMessageQueue() - addr := mockSigner.Addresses[0] - actor, err := account.NewActor(types.NewAttoFILFromFIL(0)) - assert.NoError(t, err) - actor.Nonce = 2 - - nonce, err := nextNonce(actor, outbox, addr) - assert.NoError(t, err) - assert.Equal(t, uint64(2), nonce) - - msg := types.NewMessage(addr, address.TestAddress, nonce, nil, "", []byte{}) - smsg := testhelpers.MustSign(mockSigner, msg) - core.MustEnqueue(outbox, 100, smsg...) - - nonce, err = nextNonce(actor, outbox, addr) - assert.NoError(t, err) - assert.Equal(t, uint64(3), nonce) - - msg = types.NewMessage(addr, address.TestAddress, nonce, nil, "", []byte{}) - smsg = testhelpers.MustSign(mockSigner, msg) - core.MustEnqueue(outbox, 100, smsg...) - - nonce, err = nextNonce(actor, outbox, addr) - assert.NoError(t, err) - assert.Equal(t, uint64(4), nonce) - }) -} - -type nullValidator struct { - rejectMessages bool -} - -func (v nullValidator) Validate(ctx context.Context, msg *types.SignedMessage, fromActor *actor.Actor) error { - if v.rejectMessages { - return errors.New("rejected for testing") - } - return nil -} - -func setupSendTest(t *testing.T) (*wallet.Wallet, *chain.DefaultStore, *hamt.CborIpldStore) { - // Install an account actor in the genesis block. - ki := types.MustGenerateKeyInfo(1, types.GenerateKeyInfoSeed())[0] - addr, err := ki.Address() - require.NoError(t, err) - genesis := consensus.MakeGenesisFunc( - consensus.ActorAccount(addr, types.NewAttoFILFromFIL(100)), - ) - - d := requiredCommonDeps(t, genesis) - - // Install the key in the wallet for use in signing. - err = d.wallet.Backends(wallet.DSBackendType)[0].(*wallet.DSBackend).ImportKey(&ki) - require.NoError(t, err) - return d.wallet, d.chainStore, d.cst -} diff --git a/porcelain/mpool.go b/porcelain/mpool.go index 6bc480d10f..9373e0cad1 100644 --- a/porcelain/mpool.go +++ b/porcelain/mpool.go @@ -3,8 +3,8 @@ package porcelain import ( "context" + "github.com/filecoin-project/go-filecoin/core" "github.com/filecoin-project/go-filecoin/net/pubsub" - "github.com/filecoin-project/go-filecoin/plumbing/msg" "github.com/filecoin-project/go-filecoin/types" ) @@ -18,7 +18,7 @@ type mpwPlumbing interface { func MessagePoolWait(ctx context.Context, plumbing mpwPlumbing, messageCount uint) ([]*types.SignedMessage, error) { pending := plumbing.MessagePoolPending() if len(pending) < int(messageCount) { - subscription, err := plumbing.PubSubSubscribe(msg.Topic) + subscription, err := plumbing.PubSubSubscribe(core.Topic) defer subscription.Cancel() if err != nil { return nil, err diff --git a/porcelain/mpool_test.go b/porcelain/mpool_test.go index a7f0788048..22b5e25fac 100644 --- a/porcelain/mpool_test.go +++ b/porcelain/mpool_test.go @@ -5,11 +5,11 @@ import ( "sync" "testing" + "github.com/filecoin-project/go-filecoin/core" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/filecoin-project/go-filecoin/net/pubsub" - "github.com/filecoin-project/go-filecoin/plumbing/msg" "github.com/filecoin-project/go-filecoin/porcelain" tf "github.com/filecoin-project/go-filecoin/testhelpers/testflags" "github.com/filecoin-project/go-filecoin/types" @@ -35,7 +35,7 @@ func (plumbing *fakeMpoolWaitPlumbing) MessagePoolPending() []*types.SignedMessa } func (plumbing *fakeMpoolWaitPlumbing) PubSubSubscribe(topic string) (pubsub.Subscription, error) { - subscription := pubsub.NewFakeSubscription(msg.Topic, 1) + subscription := pubsub.NewFakeSubscription(core.Topic, 1) plumbing.subscription = subscription return subscription, nil }