Skip to content

Commit

Permalink
Factor message packaging out of plumbing into core.Outbox (#2795)
Browse files Browse the repository at this point in the history
Factor message packaging out of plumbing into core.Outbox
Replace default actor provider with chain state and remove msg.Sender plumbing.
  • Loading branch information
anorth authored May 22, 2019
1 parent 69f5dab commit 92883c2
Show file tree
Hide file tree
Showing 11 changed files with 463 additions and 396 deletions.
4 changes: 2 additions & 2 deletions commands/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/filecoin-project/go-filecoin/address"
"github.com/filecoin-project/go-filecoin/core"
"github.com/filecoin-project/go-filecoin/exec"
"github.com/filecoin-project/go-filecoin/plumbing/bcf"
"github.com/filecoin-project/go-filecoin/plumbing/cst"
"github.com/filecoin-project/go-filecoin/plumbing/msg"
"github.com/filecoin-project/go-filecoin/types"
)
Expand Down Expand Up @@ -169,7 +169,7 @@ var msgWaitCmd = &cmds.Command{
err = GetPorcelainAPI(env).MessageWait(req.Context, msgCid, func(blk *types.Block, msg *types.SignedMessage, receipt *types.MessageReceipt) error {
found = true
sig, err := GetPorcelainAPI(env).ActorGetSignature(req.Context, msg.To, msg.Method)
if err != nil && err != bcf.ErrNoMethod && err != bcf.ErrNoActorImpl {
if err != nil && err != cst.ErrNoMethod && err != cst.ErrNoActorImpl {
return errors.Wrap(err, "Couldn't get signature for message")
}

Expand Down
152 changes: 152 additions & 0 deletions core/outbox.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package core

import (
"context"
"sync"

"github.com/filecoin-project/go-filecoin/abi"
"github.com/filecoin-project/go-filecoin/actor"
"github.com/filecoin-project/go-filecoin/address"
"github.com/filecoin-project/go-filecoin/consensus"
"github.com/filecoin-project/go-filecoin/metrics"
"github.com/filecoin-project/go-filecoin/types"
"github.com/ipfs/go-cid"
"github.com/pkg/errors"
)

// Topic is the network pubsub topic identifier on which new messages are announced.
const Topic = "/fil/msgs"

// Outbox validates and marshals messages for sending and maintains the outbound message queue.
type Outbox struct {
// Signs messages
signer types.Signer
// Validates messages before sending them.
validator consensus.SignedMessageValidator
// Holds messages sent from this node but not yet mined.
queue *MessageQueue
// Invoked to publish a message to the network
publisher publisher

chains chainProvider
actors actorProvider

// Protects the "next nonce" calculation to avoid collisions.
nonceLock sync.Mutex
}

type chainProvider interface {
GetHead() types.SortedCidSet
GetTipSet(tsKey types.SortedCidSet) (*types.TipSet, error)
}

type actorProvider interface {
// GetActorAt returns the actor state defined by the chain up to some tipset
GetActorAt(ctx context.Context, tipset types.SortedCidSet, addr address.Address) (*actor.Actor, error)
}

type publisher interface {
Publish(ctx context.Context, message *types.SignedMessage, height uint64) error
}

var msgSendErrCt = metrics.NewInt64Counter("message_sender_error", "Number of errors encountered while sending a message")

// NewOutbox creates a new outbox
func NewOutbox(signer types.Signer, validator consensus.SignedMessageValidator, queue *MessageQueue,
publisher publisher, chains chainProvider, actors actorProvider) *Outbox {
return &Outbox{
signer: signer,
validator: validator,
queue: queue,
publisher: publisher,
chains: chains,
actors: actors,
}
}

// Queue returns the outbox's outbound message queue.
func (ob *Outbox) Queue() *MessageQueue {
return ob.queue
}

// Send marshals and sends a message, retaining it in the outbound message queue.
func (ob *Outbox) Send(ctx context.Context, from, to address.Address, value *types.AttoFIL,
gasPrice types.AttoFIL, gasLimit types.GasUnits, method string, params ...interface{}) (out cid.Cid, err error) {
defer func() {
if err != nil {
msgSendErrCt.Inc(ctx, 1)
}
}()

encodedParams, err := abi.ToEncodedValues(params...)
if err != nil {
return cid.Undef, errors.Wrap(err, "invalid params")
}

// Lock to avoid a race inspecting the actor state and message queue to calculate next nonce.
ob.nonceLock.Lock()
defer ob.nonceLock.Unlock()

head := ob.chains.GetHead()

fromActor, err := ob.actors.GetActorAt(ctx, head, from)
if err != nil {
return cid.Undef, errors.Wrapf(err, "no actor at address %s", from)
}

nonce, err := nextNonce(fromActor, ob.queue, from)
if err != nil {
return cid.Undef, errors.Wrapf(err, "failed calculating nonce for actor at %s", from)
}

rawMsg := types.NewMessage(from, to, nonce, value, method, encodedParams)
signed, err := types.NewSignedMessage(*rawMsg, ob.signer, gasPrice, gasLimit)
if err != nil {
return cid.Undef, errors.Wrap(err, "failed to sign message")
}

err = ob.validator.Validate(ctx, signed, fromActor)
if err != nil {
return cid.Undef, errors.Wrap(err, "invalid message")
}

height, err := tipsetHeight(ob.chains, head)
if err != nil {
return cid.Undef, errors.Wrap(err, "failed to get block height")
}

// Add to the local message queue/pool at the last possible moment before broadcasting to network.
if err := ob.queue.Enqueue(signed, height); err != nil {
return cid.Undef, errors.Wrap(err, "failed to add message to outbound queue")
}

err = ob.publisher.Publish(ctx, signed, height)
if err != nil {
return cid.Undef, err
}

return signed.Cid()
}

// nextNonce returns the next expected nonce value for an account actor. This is the larger
// of the actor's nonce value, or one greater than the largest nonce from the actor found in the message queue.
func nextNonce(act *actor.Actor, queue *MessageQueue, address address.Address) (uint64, error) {
actorNonce, err := actor.NextNonce(act)
if err != nil {
return 0, err
}

poolNonce, found := queue.LargestNonce(address)
if found && poolNonce >= actorNonce {
return poolNonce + 1, nil
}
return actorNonce, nil
}

func tipsetHeight(provider chainProvider, key types.SortedCidSet) (uint64, error) {
head, err := provider.GetTipSet(key)
if err != nil {
return 0, err
}
return head.Height()
}
199 changes: 199 additions & 0 deletions core/outbox_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package core_test

import (
"context"
"fmt"
"sync"
"testing"

"github.com/filecoin-project/go-filecoin/actor"
"github.com/filecoin-project/go-filecoin/actor/builtin/account"
"github.com/filecoin-project/go-filecoin/actor/builtin/storagemarket"
"github.com/filecoin-project/go-filecoin/address"
"github.com/filecoin-project/go-filecoin/core"
tf "github.com/filecoin-project/go-filecoin/testhelpers/testflags"
"github.com/filecoin-project/go-filecoin/types"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestOutbox(t *testing.T) {
tf.UnitTest(t)

t.Run("invalid message rejected", func(t *testing.T) {
w, _ := types.NewMockSignersAndKeyInfo(1)
sender := w.Addresses[0]
queue := core.NewMessageQueue()
publisher := &mockPublisher{}
provider := &fakeProvider{}

ob := core.NewOutbox(w, nullValidator{rejectMessages: true}, queue, publisher, provider, provider)

cid, err := ob.Send(context.Background(), sender, sender, types.NewAttoFILFromFIL(2), types.NewGasPrice(0), types.NewGasUnits(0), "")
assert.Errorf(t, err, "for testing")
assert.False(t, cid.Defined())
})

t.Run("send message enqueues and calls publish", func(t *testing.T) {
w, _ := types.NewMockSignersAndKeyInfo(1)
sender := w.Addresses[0]
toAddr := address.NewForTestGetter()()
queue := core.NewMessageQueue()
publisher := &mockPublisher{}
provider := &fakeProvider{}

blk := types.NewBlockForTest(nil, 1)
blk.Height = 1000
actr, _ := account.NewActor(types.NewZeroAttoFIL())
actr.Nonce = 42
provider.Set(t, blk, sender, actr)

ob := core.NewOutbox(w, nullValidator{}, queue, publisher, provider, provider)
require.Empty(t, queue.List(sender))
require.Nil(t, publisher.message)

_, err := ob.Send(context.Background(), sender, toAddr, types.NewZeroAttoFIL(), types.NewGasPrice(0), types.NewGasUnits(0), "")
require.NoError(t, err)
assert.Equal(t, uint64(1000), queue.List(sender)[0].Stamp)
assert.NotNil(t, publisher.message)
assert.Equal(t, actr.Nonce, publisher.message.Nonce)
assert.Equal(t, uint64(1000), publisher.height)
})

t.Run("send message avoids nonce race", func(t *testing.T) {
ctx := context.Background()
msgCount := 20 // number of messages to send
sendConcurrent := 3 // number of of concurrent message sends

w, _ := types.NewMockSignersAndKeyInfo(1)
sender := w.Addresses[0]
toAddr := address.NewForTestGetter()()
queue := core.NewMessageQueue()
publisher := &mockPublisher{}
provider := &fakeProvider{}

blk := types.NewBlockForTest(nil, 1)
blk.Height = 1000
actr, _ := account.NewActor(types.NewZeroAttoFIL())
actr.Nonce = 42
provider.Set(t, blk, sender, actr)

s := core.NewOutbox(w, nullValidator{}, queue, publisher, provider, provider)

var wg sync.WaitGroup
addTwentyMessages := func(batch int) {
defer wg.Done()
for i := 0; i < msgCount; i++ {
_, err := s.Send(ctx, sender, toAddr, types.NewZeroAttoFIL(), types.NewGasPrice(0), types.NewGasUnits(0), fmt.Sprintf("%d-%d", batch, i), []byte{})
require.NoError(t, err)
}
}

// Add messages concurrently.
for i := 0; i < sendConcurrent; i++ {
wg.Add(1)
go addTwentyMessages(i)
}
wg.Wait()

enqueued := queue.List(sender)
assert.Equal(t, 60, len(enqueued))

// Expect the nonces to be distinct and contiguous
nonces := map[uint64]bool{}
for _, message := range enqueued {
assert.Equal(t, uint64(1000), message.Stamp)
_, found := nonces[uint64(message.Msg.Nonce)]
require.False(t, found)
nonces[uint64(message.Msg.Nonce)] = true
}

for i := 0; i < 60; i++ {
assert.True(t, nonces[uint64(actr.Nonce)+uint64(i)])

}
})

t.Run("fails with non-account actor", func(t *testing.T) {
w, _ := types.NewMockSignersAndKeyInfo(1)
sender := w.Addresses[0]
toAddr := address.NewForTestGetter()()
queue := core.NewMessageQueue()
publisher := &mockPublisher{}
provider := &fakeProvider{}

blk := types.NewBlockForTest(nil, 1)
actr, _ := storagemarket.NewActor() // Not an account actor
provider.Set(t, blk, sender, actr)

ob := core.NewOutbox(w, nullValidator{}, queue, publisher, provider, provider)

_, err := ob.Send(context.Background(), sender, toAddr, types.NewZeroAttoFIL(), types.NewGasPrice(0), types.NewGasUnits(0), "")
assert.Error(t, err)
assert.Contains(t, err.Error(), "account or empty")
})
}

// A publisher which just stores the last message published.
type mockPublisher struct {
returnError error // Error to be returned by Publish()
message *types.SignedMessage // Message received by Publish()
height uint64 // Height received by Publish()
}

func (p *mockPublisher) Publish(ctx context.Context, message *types.SignedMessage, height uint64) error {
p.message = message
p.height = height
return p.returnError
}

// A chain and actor provider which provides and expects values for a single message.
type fakeProvider struct {
head types.SortedCidSet // Provided by GetHead and expected by others
tipset *types.TipSet // Provided by GetTipset(head)
addr address.Address // Expected by GetActorAt
actor *actor.Actor // Provided by GetActorAt(head, tipKey, addr)
}

func (p *fakeProvider) GetHead() types.SortedCidSet {
return p.head
}

func (p *fakeProvider) GetTipSet(tsKey types.SortedCidSet) (*types.TipSet, error) {
if !tsKey.Equals(p.head) {
return nil, errors.Errorf("No such tipset %s, expected %s", tsKey, p.head)
}
return p.tipset, nil
}

func (p *fakeProvider) GetActorAt(ctx context.Context, tsKey types.SortedCidSet, addr address.Address) (*actor.Actor, error) {
if !tsKey.Equals(p.head) {
return nil, errors.Errorf("No such tipset %s, expected %s", tsKey, p.head)
}
if addr != p.addr {
return nil, errors.Errorf("No such address %s, expected %s", addr, p.addr)
}
return p.actor, nil
}

// Set sets the tipset, from address, and actor to be provided by creating a tipset with one block.
func (p *fakeProvider) Set(t *testing.T, block *types.Block, addr address.Address, actor *actor.Actor) {
ts := types.RequireNewTipSet(t, block)
p.tipset = &ts
tsKey := types.NewSortedCidSet(block.Cid())
p.head = tsKey
p.addr = addr
p.actor = actor
}

type nullValidator struct {
rejectMessages bool
}

func (v nullValidator) Validate(ctx context.Context, msg *types.SignedMessage, fromActor *actor.Actor) error {
if v.rejectMessages {
return errors.New("rejected for testing")
}
return nil
}
Loading

0 comments on commit 92883c2

Please sign in to comment.