-
Notifications
You must be signed in to change notification settings - Fork 467
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Factor message packaging out of plumbing into core.Outbox
- Loading branch information
Showing
9 changed files
with
459 additions
and
358 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
package core | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
|
||
"github.com/filecoin-project/go-filecoin/abi" | ||
"github.com/filecoin-project/go-filecoin/actor" | ||
"github.com/filecoin-project/go-filecoin/address" | ||
"github.com/filecoin-project/go-filecoin/consensus" | ||
"github.com/filecoin-project/go-filecoin/metrics" | ||
"github.com/filecoin-project/go-filecoin/types" | ||
"github.com/ipfs/go-cid" | ||
"github.com/pkg/errors" | ||
) | ||
|
||
// Topic is the network pubsub topic identifier on which new messages are announced. | ||
const Topic = "/fil/msgs" | ||
|
||
// Outbox validates and marshals messages for sending and maintains the outbound message queue. | ||
type Outbox struct { | ||
// Signs messages | ||
signer types.Signer | ||
// Validates messages before sending them. | ||
validator consensus.SignedMessageValidator | ||
// Holds messages sent from this node but not yet mined. | ||
queue *MessageQueue | ||
// Invoked to publish a message to the network | ||
publisher publisher | ||
|
||
chains chainProvider | ||
actors actorProvider | ||
|
||
// Protects the "next nonce" calculation to avoid collisions. | ||
nonceLock sync.Mutex | ||
} | ||
|
||
type chainProvider interface { | ||
GetHead() types.SortedCidSet | ||
GetTipSet(tsKey types.SortedCidSet) (*types.TipSet, error) | ||
} | ||
|
||
type actorProvider interface { | ||
// GetActor returns the actor state defined by the chain up to some tipset | ||
GetActor(ctx context.Context, tipset types.SortedCidSet, addr address.Address) (*actor.Actor, error) | ||
} | ||
|
||
type publisher interface { | ||
Publish(ctx context.Context, message *types.SignedMessage, height uint64) error | ||
} | ||
|
||
var msgSendErrCt = metrics.NewInt64Counter("message_sender_error", "Number of errors encountered while sending a message") | ||
|
||
// NewOutbox creates a new outbox | ||
func NewOutbox(signer types.Signer, validator consensus.SignedMessageValidator, queue *MessageQueue, | ||
publisher publisher, chains chainProvider, actors actorProvider) *Outbox { | ||
return &Outbox{ | ||
signer: signer, | ||
validator: validator, | ||
queue: queue, | ||
publisher: publisher, | ||
chains: chains, | ||
actors: actors, | ||
} | ||
} | ||
|
||
// Send marshals and sends a message, retaining it in the outbound message queue. | ||
func (ob *Outbox) Send(ctx context.Context, from, to address.Address, value *types.AttoFIL, | ||
gasPrice types.AttoFIL, gasLimit types.GasUnits, method string, params ...interface{}) (out cid.Cid, err error) { | ||
defer func() { | ||
if err != nil { | ||
msgSendErrCt.Inc(ctx, 1) | ||
} | ||
}() | ||
|
||
encodedParams, err := abi.ToEncodedValues(params...) | ||
if err != nil { | ||
return cid.Undef, errors.Wrap(err, "invalid params") | ||
} | ||
|
||
// Lock to avoid race for message nonce. | ||
ob.nonceLock.Lock() | ||
defer ob.nonceLock.Unlock() | ||
|
||
head := ob.chains.GetHead() | ||
|
||
fromActor, err := ob.actors.GetActor(ctx, head, from) | ||
if err != nil { | ||
return cid.Undef, errors.Wrapf(err, "no GetActor at address %s", from) | ||
} | ||
|
||
nonce, err := nextNonce(fromActor, ob.queue, from) | ||
if err != nil { | ||
return cid.Undef, errors.Wrapf(err, "failed calculating nonce for GetActor %s", from) | ||
} | ||
|
||
rawMsg := types.NewMessage(from, to, nonce, value, method, encodedParams) | ||
signed, err := types.NewSignedMessage(*rawMsg, ob.signer, gasPrice, gasLimit) | ||
if err != nil { | ||
return cid.Undef, errors.Wrap(err, "failed to sign message") | ||
} | ||
|
||
err = ob.validator.Validate(ctx, signed, fromActor) | ||
if err != nil { | ||
return cid.Undef, errors.Wrap(err, "invalid message") | ||
} | ||
|
||
height, err := tipsetHeight(ob.chains, head) | ||
if err != nil { | ||
return cid.Undef, errors.Wrap(err, "failed to get block height") | ||
} | ||
|
||
// Add to the local message queue/pool at the last possible moment before broadcasting to network. | ||
if err := ob.queue.Enqueue(signed, height); err != nil { | ||
return cid.Undef, errors.Wrap(err, "failed to add message to outbound queue") | ||
} | ||
|
||
err = ob.publisher.Publish(ctx, signed, height) | ||
if err != nil { | ||
return cid.Undef, err | ||
} | ||
|
||
return signed.Cid() | ||
} | ||
|
||
// nextNonce returns the next expected nonce value for an account GetActor. This is the larger | ||
// of the GetActor's nonce value, or one greater than the largest nonce from the GetActor found in the message queue. | ||
func nextNonce(act *actor.Actor, queue *MessageQueue, address address.Address) (uint64, error) { | ||
actorNonce, err := actor.NextNonce(act) | ||
if err != nil { | ||
return 0, err | ||
} | ||
|
||
poolNonce, found := queue.LargestNonce(address) | ||
if found && poolNonce >= actorNonce { | ||
return poolNonce + 1, nil | ||
} | ||
return actorNonce, nil | ||
} | ||
|
||
func tipsetHeight(provider chainProvider, key types.SortedCidSet) (uint64, error) { | ||
head, err := provider.GetTipSet(key) | ||
if err != nil { | ||
return 0, err | ||
} | ||
return head.Height() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,199 @@ | ||
package core_test | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync" | ||
"testing" | ||
|
||
"github.com/filecoin-project/go-filecoin/actor" | ||
"github.com/filecoin-project/go-filecoin/actor/builtin/account" | ||
"github.com/filecoin-project/go-filecoin/actor/builtin/storagemarket" | ||
"github.com/filecoin-project/go-filecoin/address" | ||
"github.com/filecoin-project/go-filecoin/core" | ||
tf "github.com/filecoin-project/go-filecoin/testhelpers/testflags" | ||
"github.com/filecoin-project/go-filecoin/types" | ||
"github.com/pkg/errors" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestOutbox(t *testing.T) { | ||
tf.UnitTest(t) | ||
|
||
t.Run("invalid message rejected", func(t *testing.T) { | ||
w, _ := types.NewMockSignersAndKeyInfo(1) | ||
sender := w.Addresses[0] | ||
queue := core.NewMessageQueue() | ||
publisher := &mockPublisher{} | ||
provider := &fakeProvider{} | ||
|
||
ob := core.NewOutbox(w, nullValidator{rejectMessages: true}, queue, publisher, provider, provider) | ||
|
||
cid, err := ob.Send(context.Background(), sender, sender, types.NewAttoFILFromFIL(2), types.NewGasPrice(0), types.NewGasUnits(0), "") | ||
assert.Errorf(t, err, "for testing") | ||
assert.False(t, cid.Defined()) | ||
}) | ||
|
||
t.Run("send message enqueues and calls publish", func(t *testing.T) { | ||
w, _ := types.NewMockSignersAndKeyInfo(1) | ||
sender := w.Addresses[0] | ||
toAddr := address.NewForTestGetter()() | ||
queue := core.NewMessageQueue() | ||
publisher := &mockPublisher{} | ||
provider := &fakeProvider{} | ||
|
||
blk := types.NewBlockForTest(nil, 1) | ||
blk.Height = 1000 | ||
actr, _ := account.NewActor(types.NewZeroAttoFIL()) | ||
actr.Nonce = 42 | ||
provider.Set(t, blk, sender, actr) | ||
|
||
ob := core.NewOutbox(w, nullValidator{}, queue, publisher, provider, provider) | ||
require.Empty(t, queue.List(sender)) | ||
require.Nil(t, publisher.message) | ||
|
||
_, err := ob.Send(context.Background(), sender, toAddr, types.NewZeroAttoFIL(), types.NewGasPrice(0), types.NewGasUnits(0), "") | ||
require.NoError(t, err) | ||
assert.Equal(t, uint64(1000), queue.List(sender)[0].Stamp) | ||
assert.NotNil(t, publisher.message) | ||
assert.Equal(t, actr.Nonce, publisher.message.Nonce) | ||
assert.Equal(t, uint64(1000), publisher.height) | ||
}) | ||
|
||
t.Run("send message avoids nonce race", func(t *testing.T) { | ||
ctx := context.Background() | ||
msgCount := 20 // number of messages to send | ||
sendConcurrent := 3 // number of of concurrent message sends | ||
|
||
w, _ := types.NewMockSignersAndKeyInfo(1) | ||
sender := w.Addresses[0] | ||
toAddr := address.NewForTestGetter()() | ||
queue := core.NewMessageQueue() | ||
publisher := &mockPublisher{} | ||
provider := &fakeProvider{} | ||
|
||
blk := types.NewBlockForTest(nil, 1) | ||
blk.Height = 1000 | ||
actr, _ := account.NewActor(types.NewZeroAttoFIL()) | ||
actr.Nonce = 42 | ||
provider.Set(t, blk, sender, actr) | ||
|
||
s := core.NewOutbox(w, nullValidator{}, queue, publisher, provider, provider) | ||
|
||
var wg sync.WaitGroup | ||
addTwentyMessages := func(batch int) { | ||
defer wg.Done() | ||
for i := 0; i < msgCount; i++ { | ||
_, err := s.Send(ctx, sender, toAddr, types.NewZeroAttoFIL(), types.NewGasPrice(0), types.NewGasUnits(0), fmt.Sprintf("%d-%d", batch, i), []byte{}) | ||
require.NoError(t, err) | ||
} | ||
} | ||
|
||
// Add messages concurrently. | ||
for i := 0; i < sendConcurrent; i++ { | ||
wg.Add(1) | ||
go addTwentyMessages(i) | ||
} | ||
wg.Wait() | ||
|
||
enqueued := queue.List(sender) | ||
assert.Equal(t, 60, len(enqueued)) | ||
|
||
// Expect the nonces to be distinct and contiguous | ||
nonces := map[uint64]bool{} | ||
for _, message := range enqueued { | ||
assert.Equal(t, uint64(1000), message.Stamp) | ||
_, found := nonces[uint64(message.Msg.Nonce)] | ||
require.False(t, found) | ||
nonces[uint64(message.Msg.Nonce)] = true | ||
} | ||
|
||
for i := 0; i < 60; i++ { | ||
assert.True(t, nonces[uint64(actr.Nonce)+uint64(i)]) | ||
|
||
} | ||
}) | ||
|
||
t.Run("nonce fails with non-account actor", func(t *testing.T) { | ||
w, _ := types.NewMockSignersAndKeyInfo(1) | ||
sender := w.Addresses[0] | ||
toAddr := address.NewForTestGetter()() | ||
queue := core.NewMessageQueue() | ||
publisher := &mockPublisher{} | ||
provider := &fakeProvider{} | ||
|
||
blk := types.NewBlockForTest(nil, 1) | ||
actr, _ := storagemarket.NewActor() // Not an account actor | ||
provider.Set(t, blk, sender, actr) | ||
|
||
ob := core.NewOutbox(w, nullValidator{}, queue, publisher, provider, provider) | ||
|
||
_, err := ob.Send(context.Background(), sender, toAddr, types.NewZeroAttoFIL(), types.NewGasPrice(0), types.NewGasUnits(0), "") | ||
assert.Error(t, err) | ||
assert.Contains(t, err.Error(), "account or empty") | ||
}) | ||
} | ||
|
||
// A publisher which just stores the last message published. | ||
type mockPublisher struct { | ||
returnError error // Error to be returned by Publish() | ||
message *types.SignedMessage // Message received by Publish() | ||
height uint64 // Height received by Publish() | ||
} | ||
|
||
func (p *mockPublisher) Publish(ctx context.Context, message *types.SignedMessage, height uint64) error { | ||
p.message = message | ||
p.height = height | ||
return p.returnError | ||
} | ||
|
||
// A chain and actor provider which provides and expects values for a single message. | ||
type fakeProvider struct { | ||
head types.SortedCidSet // Provided by GetHead and expected by others | ||
tipset *types.TipSet // Provided by GetTipset(head) | ||
addr address.Address // Expected by GetActor | ||
actor *actor.Actor // Provided by GetActor(head, addr) | ||
} | ||
|
||
func (p *fakeProvider) GetHead() types.SortedCidSet { | ||
return p.head | ||
} | ||
|
||
func (p *fakeProvider) GetTipSet(tsKey types.SortedCidSet) (*types.TipSet, error) { | ||
if !tsKey.Equals(p.head) { | ||
return nil, errors.Errorf("No such tipset %s, expected %s", tsKey, p.head) | ||
} | ||
return p.tipset, nil | ||
} | ||
|
||
func (p *fakeProvider) GetActor(ctx context.Context, tsKey types.SortedCidSet, addr address.Address) (*actor.Actor, error) { | ||
if !tsKey.Equals(p.head) { | ||
return nil, errors.Errorf("No such tipset %s, expected %s", tsKey, p.head) | ||
} | ||
if addr != p.addr { | ||
return nil, errors.Errorf("No such address %s, expected %s", addr, p.addr) | ||
} | ||
return p.actor, nil | ||
} | ||
|
||
// Set sets the tipset, from address, and actor to be provided by creating a tipset with one block. | ||
func (p *fakeProvider) Set(t *testing.T, block *types.Block, addr address.Address, actor *actor.Actor) { | ||
ts := types.RequireNewTipSet(t, block) | ||
p.tipset = &ts | ||
tsKey := types.NewSortedCidSet(block.Cid()) | ||
p.head = tsKey | ||
p.addr = addr | ||
p.actor = actor | ||
} | ||
|
||
type nullValidator struct { | ||
rejectMessages bool | ||
} | ||
|
||
func (v nullValidator) Validate(ctx context.Context, msg *types.SignedMessage, fromActor *actor.Actor) error { | ||
if v.rejectMessages { | ||
return errors.New("rejected for testing") | ||
} | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
package node | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/filecoin-project/go-filecoin/actor" | ||
"github.com/filecoin-project/go-filecoin/actor/builtin" | ||
"github.com/filecoin-project/go-filecoin/address" | ||
"github.com/filecoin-project/go-filecoin/state" | ||
"github.com/filecoin-project/go-filecoin/types" | ||
"github.com/ipfs/go-cid" | ||
"github.com/ipfs/go-hamt-ipld" | ||
"github.com/pkg/errors" | ||
) | ||
|
||
// defaultActorProvider reads actor state out of the IPLD HAMT store. | ||
// This is wiring to provide actor state to the outbox. | ||
type defaultActorProvider struct { | ||
chain chainStore | ||
state *hamt.CborIpldStore | ||
} | ||
|
||
type chainStore interface { | ||
GetTipSetStateRoot(tsKey types.SortedCidSet) (cid.Cid, error) | ||
} | ||
|
||
func newDefaultActorProvider(chain chainStore, state *hamt.CborIpldStore) *defaultActorProvider { | ||
return &defaultActorProvider{chain, state} | ||
} | ||
|
||
func (ap defaultActorProvider) GetActor(ctx context.Context, tipset types.SortedCidSet, addr address.Address) (*actor.Actor, error) { | ||
stateCid, err := ap.chain.GetTipSetStateRoot(tipset) | ||
if err != nil { | ||
return nil, err | ||
} | ||
tree, err := state.LoadStateTree(ctx, ap.state, stateCid, builtin.Actors) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "failed to load latest state") | ||
} | ||
|
||
actr, err := tree.GetActor(ctx, addr) | ||
if err != nil { | ||
return nil, errors.Wrapf(err, "no actor at address %s", addr) | ||
} | ||
return actr, nil | ||
} |
Oops, something went wrong.