Skip to content

Commit

Permalink
refactor: move nonce generation from mpool to wallet
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Sep 24, 2020
1 parent efd2dff commit 954edaa
Show file tree
Hide file tree
Showing 5 changed files with 319 additions and 125 deletions.
92 changes: 0 additions & 92 deletions chain/messagepool/messagepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,98 +795,6 @@ func (mp *MessagePool) getStateBalance(addr address.Address, ts *types.TipSet) (
return act.Balance, nil
}

func (mp *MessagePool) PushWithNonce(ctx context.Context, addr address.Address, cb func(address.Address, uint64) (*types.SignedMessage, error)) (*types.SignedMessage, error) {
// serialize push access to reduce lock contention
mp.addSema <- struct{}{}
defer func() {
<-mp.addSema
}()

mp.curTsLk.Lock()
mp.lk.Lock()

curTs := mp.curTs

fromKey := addr
if fromKey.Protocol() == address.ID {
var err error
fromKey, err = mp.api.StateAccountKey(ctx, fromKey, mp.curTs)
if err != nil {
mp.lk.Unlock()
mp.curTsLk.Unlock()
return nil, xerrors.Errorf("resolving sender key: %w", err)
}
}

nonce, err := mp.getNonceLocked(fromKey, mp.curTs)
if err != nil {
mp.lk.Unlock()
mp.curTsLk.Unlock()
return nil, xerrors.Errorf("get nonce locked failed: %w", err)
}

// release the locks for signing
mp.lk.Unlock()
mp.curTsLk.Unlock()

msg, err := cb(fromKey, nonce)
if err != nil {
return nil, err
}

err = mp.checkMessage(msg)
if err != nil {
return nil, err
}

msgb, err := msg.Serialize()
if err != nil {
return nil, err
}

// reacquire the locks and check state for consistency
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()

if mp.curTs != curTs {
return nil, ErrTryAgain
}

mp.lk.Lock()
defer mp.lk.Unlock()

nonce2, err := mp.getNonceLocked(fromKey, mp.curTs)
if err != nil {
return nil, xerrors.Errorf("get nonce locked failed: %w", err)
}

if nonce2 != nonce {
return nil, ErrTryAgain
}

publish, err := mp.verifyMsgBeforeAdd(msg, curTs, true)
if err != nil {
return nil, err
}

if err := mp.checkBalance(msg, curTs); err != nil {
return nil, err
}

if err := mp.addLocked(msg, false); err != nil {
return nil, xerrors.Errorf("add locked failed: %w", err)
}
if err := mp.addLocal(msg, msgb); err != nil {
log.Errorf("addLocal failed: %+v", err)
}

if publish {
err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb)
}

return msg, err
}

func (mp *MessagePool) Remove(from address.Address, nonce uint64, applied bool) {
mp.lk.Lock()
defer mp.lk.Unlock()
Expand Down
108 changes: 108 additions & 0 deletions chain/messagesigner/messagesigner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package wallet

import (
"bytes"
"context"

"github.com/filecoin-project/lotus/chain/messagepool"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
)

const dsKeyActorNonce = "ActorNonce"

type mpoolAPI interface {
GetNonce(address.Address) (uint64, error)
}

type MessageSigner struct {
*Wallet
mpool mpoolAPI
ds datastore.Batching
}

func NewMessageSigner(wallet *Wallet, mpool *messagepool.MessagePool, ds dtypes.MetadataDS) *MessageSigner {
return newMessageSigner(wallet, mpool, ds)
}

func newMessageSigner(wallet *Wallet, mpool mpoolAPI, ds dtypes.MetadataDS) *MessageSigner {
ds = namespace.Wrap(ds, datastore.NewKey("/message-signer/"))
return &MessageSigner{
Wallet: wallet,
mpool: mpool,
ds: ds,
}
}

func (ms *MessageSigner) SignMessage(ctx context.Context, msg *types.Message) (*types.SignedMessage, error) {
nonce, err := ms.nextNonce(msg.From)
if err != nil {
return nil, xerrors.Errorf("failed to create nonce: %w", err)
}

msg.Nonce = nonce
sig, err := ms.Sign(ctx, msg.From, msg.Cid().Bytes())
if err != nil {
return nil, xerrors.Errorf("failed to sign message: %w", err)
}

return &types.SignedMessage{
Message: *msg,
Signature: *sig,
}, nil
}

func (ms *MessageSigner) nextNonce(addr address.Address) (uint64, error) {
addrNonceKey := datastore.KeyWithNamespaces([]string{dsKeyActorNonce, addr.String()})

// Get the nonce for this address from the datastore
nonceBytes, err := ms.ds.Get(addrNonceKey)

var nonce uint64
switch {
case xerrors.Is(err, datastore.ErrNotFound):
// If a nonce for this address hasn't yet been created in the
// datastore, check the mempool - nonces used to be created by
// the mempool so we need to support nodes that still have mempool
// nonces. Note that the mempool returns the actor state's nonce by
// default.
nonce, err = ms.mpool.GetNonce(addr)
if err != nil {
return 0, xerrors.Errorf("failed to get nonce from mempool: %w", err)
}

case err != nil:
return 0, xerrors.Errorf("failed to get nonce from datastore: %w", err)

default:
// There is a nonce in the mempool, so unmarshall and increment it
maj, val, err := cbg.CborReadHeader(bytes.NewReader(nonceBytes))
if err != nil {
return 0, xerrors.Errorf("failed to parse nonce from datastore: %w", err)
}
if maj != cbg.MajUnsignedInt {
return 0, xerrors.Errorf("bad cbor type parsing nonce from datastore")
}

nonce = val + 1
}

// Write the nonce for this address to the datastore
buf := bytes.Buffer{}
_, err = buf.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, nonce))
if err != nil {
return 0, xerrors.Errorf("failed to marshall nonce: %w", err)
}
err = ms.ds.Put(addrNonceKey, buf.Bytes())
if err != nil {
return 0, xerrors.Errorf("failed to write nonce to datastore: %w", err)
}

return nonce, nil
}
180 changes: 180 additions & 0 deletions chain/messagesigner/messagesigner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package wallet

import (
"context"
"sync"
"testing"

"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/lotus/lib/sigs"

"github.com/stretchr/testify/require"

tutils "github.com/filecoin-project/specs-actors/support/testing"

ds_sync "github.com/ipfs/go-datastore/sync"

"github.com/filecoin-project/go-address"

"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-datastore"
)

type mockMpool struct {
lk sync.RWMutex
nonces map[address.Address]uint64
}

func newMockMpool() *mockMpool {
return &mockMpool{nonces: make(map[address.Address]uint64)}
}

func (mp *mockMpool) setNonce(addr address.Address, nonce uint64) {
mp.lk.Lock()
defer mp.lk.Unlock()

mp.nonces[addr] = nonce
}

func (mp *mockMpool) GetNonce(addr address.Address) (uint64, error) {
mp.lk.RLock()
defer mp.lk.RUnlock()

return mp.nonces[addr], nil
}

func TestMessageSignerSignMessage(t *testing.T) {
ctx := context.Background()

fromKeyPrivate1, fromKeyPublic1 := testGenerateKeyPair(t)
_, toKeyPublic1 := testGenerateKeyPair(t)
from1 := tutils.NewSECP256K1Addr(t, string(fromKeyPublic1))
to1 := tutils.NewSECP256K1Addr(t, string(toKeyPublic1))
fromKeyPrivate2, fromKeyPublic2 := testGenerateKeyPair(t)
_, toKeyPublic2 := testGenerateKeyPair(t)
from2 := tutils.NewSECP256K1Addr(t, string(fromKeyPublic2))
to2 := tutils.NewSECP256K1Addr(t, string(toKeyPublic2))

wallet, _ := NewWallet(NewMemKeyStore())
_, err := wallet.Import(&types.KeyInfo{
Type: KTSecp256k1,
PrivateKey: fromKeyPrivate1,
})
require.NoError(t, err)
_, err = wallet.Import(&types.KeyInfo{
Type: KTSecp256k1,
PrivateKey: fromKeyPrivate2,
})
require.NoError(t, err)

type msgSpec struct {
msg *types.Message
mpoolNonce [1]uint64
expNonce uint64
}
tests := []struct {
name string
msgs []msgSpec
}{{
// No nonce yet in datastore
name: "no nonce yet",
msgs: []msgSpec{{
msg: &types.Message{
To: to1,
From: from1,
},
expNonce: 0,
}},
}, {
// Get nonce value of zero from mpool
name: "mpool nonce zero",
msgs: []msgSpec{{
msg: &types.Message{
To: to1,
From: from1,
},
mpoolNonce: [1]uint64{0},
expNonce: 0,
}},
}, {
// Get non-zero nonce value from mpool
name: "mpool nonce set",
msgs: []msgSpec{{
msg: &types.Message{
To: to1,
From: from1,
},
mpoolNonce: [1]uint64{5},
expNonce: 5,
}, {
msg: &types.Message{
To: to1,
From: from1,
},
// Should ignore mpool nonce because after the first message nonce
// will come from the datastore
mpoolNonce: [1]uint64{10},
expNonce: 6,
}},
}, {
// Nonce should increment independently for each address
name: "nonce increments per address",
msgs: []msgSpec{{
msg: &types.Message{
To: to1,
From: from1,
},
expNonce: 0,
}, {
msg: &types.Message{
To: to1,
From: from1,
},
expNonce: 1,
}, {
msg: &types.Message{
To: to2,
From: from2,
},
mpoolNonce: [1]uint64{5},
expNonce: 5,
}, {
msg: &types.Message{
To: to2,
From: from2,
},
expNonce: 6,
}, {
msg: &types.Message{
To: to1,
From: from1,
},
expNonce: 2,
}},
}}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
mpool := newMockMpool()
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
ms := newMessageSigner(wallet, mpool, ds)

for _, m := range tt.msgs {
if len(m.mpoolNonce) == 1 {
mpool.setNonce(m.msg.From, m.mpoolNonce[0])
}
smsg, err := ms.SignMessage(ctx, m.msg)
require.NoError(t, err)
require.Equal(t, m.expNonce, smsg.Message.Nonce)
}
})
}
}

func testGenerateKeyPair(t *testing.T) ([]byte, []byte) {
priv, err := sigs.Generate(crypto.SigTypeSecp256k1)
require.NoError(t, err)
pub, err := sigs.ToPublic(crypto.SigTypeSecp256k1, priv)
require.NoError(t, err)
return priv, pub
}
1 change: 1 addition & 0 deletions node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func Online() Option {
Override(new(*store.ChainStore), modules.ChainStore),
Override(new(*stmgr.StateManager), stmgr.NewStateManager),
Override(new(*wallet.Wallet), wallet.NewWallet),
Override(new(*wallet.MessageSigner), wallet.NewMessageSigner),

Override(new(dtypes.ChainGCLocker), blockstore.NewGCLocker),
Override(new(dtypes.ChainGCBlockstore), modules.ChainGCBlockstore),
Expand Down
Loading

0 comments on commit 954edaa

Please sign in to comment.