Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly add multiaddrs to avoid dialing issues #356

Merged
merged 2 commits into from
Aug 5, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions retrievalmarket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-multistore"
Expand Down Expand Up @@ -35,7 +34,7 @@ type RetrievalClient interface {
payloadCID cid.Cid,
params Params,
totalFunds abi.TokenAmount,
miner peer.ID,
p RetrievalPeer,
clientWallet address.Address,
minerWallet address.Address,
storeID *multistore.StoreID,
Expand Down
31 changes: 27 additions & 4 deletions retrievalmarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,12 @@ the request are.
The client a new `RetrievalQueryStream` for the chosen peer ID,
and calls WriteQuery on it, which constructs a data-transfer message and writes it to the Query stream.
*/
func (c *Client) Query(_ context.Context, p retrievalmarket.RetrievalPeer, payloadCID cid.Cid, params retrievalmarket.QueryParams) (retrievalmarket.QueryResponse, error) {
func (c *Client) Query(ctx context.Context, p retrievalmarket.RetrievalPeer, payloadCID cid.Cid, params retrievalmarket.QueryParams) (retrievalmarket.QueryResponse, error) {
err := c.addMultiaddrs(ctx, p)
if err != nil {
log.Warn(err)
return retrievalmarket.QueryResponseUndefined, err
}
s, err := c.network.NewQueryStream(p.ID)
if err != nil {
log.Warn(err)
Expand Down Expand Up @@ -181,8 +186,11 @@ From then on, the statemachine controls the deal flow in the client. Other compo

Documentation of the client state machine can be found at https://godoc.org/github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/clientstates
*/
func (c *Client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrievalmarket.Params, totalFunds abi.TokenAmount, miner peer.ID, clientWallet address.Address, minerWallet address.Address, storeID *multistore.StoreID) (retrievalmarket.DealID, error) {
var err error
func (c *Client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrievalmarket.Params, totalFunds abi.TokenAmount, p retrievalmarket.RetrievalPeer, clientWallet address.Address, minerWallet address.Address, storeID *multistore.StoreID) (retrievalmarket.DealID, error) {
err := c.addMultiaddrs(ctx, p)
if err != nil {
return 0, err
}
next, err := c.storedCounter.Next()
if err != nil {
return 0, err
Expand Down Expand Up @@ -210,7 +218,7 @@ func (c *Client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrie
PaymentRequested: abi.NewTokenAmount(0),
FundsSpent: abi.NewTokenAmount(0),
Status: retrievalmarket.DealStatusNew,
Sender: miner,
Sender: p.ID,
UnsealFundsPaid: big.Zero(),
StoreID: storeID,
}
Expand All @@ -235,6 +243,21 @@ func (c *Client) notifySubscribers(eventName fsm.EventName, state fsm.StateType)
_ = c.subscribers.Publish(internalEvent{evt, ds})
}

func (c *Client) addMultiaddrs(ctx context.Context, p retrievalmarket.RetrievalPeer) error {
tok, _, err := c.node.GetChainHead(ctx)
if err != nil {
return err
}
maddrs, err := c.node.GetKnownAddresses(ctx, p, tok)
if err != nil {
return err
}
if len(maddrs) > 0 {
c.network.AddAddrs(p.ID, maddrs)
}
return nil
}

// SubscribeToEvents allows another component to listen for events on the RetrievalClient
// in order to track deals as they progress through the deal flow
func (c *Client) SubscribeToEvents(subscriber retrievalmarket.ClientSubscriber) retrievalmarket.Unsubscribe {
Expand Down
20 changes: 16 additions & 4 deletions retrievalmarket/impl/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,13 @@ func TestClient_Query(t *testing.T) {
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: tut.ExpectPeerOnQueryStreamBuilder(t, expectedPeer, qsb, "Peers should match"),
})
node := testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{})
node.ExpectKnownAddresses(rpeer, nil)
c, err := retrievalimpl.NewClient(
net,
multiStore,
dt,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}),
node,
&tut.TestPeerResolver{},
ds,
storedCounter)
Expand All @@ -109,24 +111,28 @@ func TestClient_Query(t *testing.T) {
require.NoError(t, err)
assert.NotNil(t, resp)
assert.Equal(t, expectedQueryResponse, resp)
node.VerifyExpectations(t)
})

t.Run("when the stream returns error, returns error", func(t *testing.T) {
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: tut.FailNewQueryStream,
})
node := testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{})
node.ExpectKnownAddresses(rpeer, nil)
c, err := retrievalimpl.NewClient(
net,
multiStore,
dt,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}),
node,
&tut.TestPeerResolver{},
ds,
storedCounter)
require.NoError(t, err)

_, err = c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
assert.EqualError(t, err, "new query stream failed")
node.VerifyExpectations(t)
})

t.Run("when WriteDealStatusRequest fails, returns error", func(t *testing.T) {
Expand All @@ -142,11 +148,13 @@ func TestClient_Query(t *testing.T) {
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: qsbuilder,
})
node := testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{})
node.ExpectKnownAddresses(rpeer, nil)
c, err := retrievalimpl.NewClient(
net,
multiStore,
dt,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}),
node,
&tut.TestPeerResolver{},
ds,
storedCounter)
Expand All @@ -155,6 +163,7 @@ func TestClient_Query(t *testing.T) {
statusCode, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
assert.EqualError(t, err, "write query failed")
assert.Equal(t, retrievalmarket.QueryResponseUndefined, statusCode)
node.VerifyExpectations(t)
})

t.Run("when ReadDealStatusResponse fails, returns error", func(t *testing.T) {
Expand All @@ -168,11 +177,13 @@ func TestClient_Query(t *testing.T) {
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: qsbuilder,
})
node := testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{})
node.ExpectKnownAddresses(rpeer, nil)
c, err := retrievalimpl.NewClient(
net,
multiStore,
dt,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}),
node,
&tut.TestPeerResolver{},
ds,
storedCounter)
Expand All @@ -181,6 +192,7 @@ func TestClient_Query(t *testing.T) {
statusCode, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
assert.EqualError(t, err, "query response failed")
assert.Equal(t, retrievalmarket.QueryResponseUndefined, statusCode)
node.VerifyExpectations(t)
})
}

Expand Down
17 changes: 11 additions & 6 deletions retrievalmarket/impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func requireSetupTestClientAndProvider(bgCtx context.Context, t *testing.T, payC
Address: paymentAddress,
ID: testData.Host2.ID(),
}
rcNode1.ExpectKnownAddresses(retrievalPeer, nil)
return client, expectedCIDs, missingCID, expectedQR, retrievalPeer, provider
}

Expand Down Expand Up @@ -296,7 +297,7 @@ func TestClientCanMakeDealWithProvider(t *testing.T) {
provider := setupProvider(bgCtx, t, testData, payloadCID, pieceInfo, expectedQR,
providerPaymentAddr, providerNode, decider)

retrievalPeer := &retrievalmarket.RetrievalPeer{Address: providerPaymentAddr, ID: testData.Host2.ID()}
retrievalPeer := retrievalmarket.RetrievalPeer{Address: providerPaymentAddr, ID: testData.Host2.ID()}

expectedVoucher := tut.MakeTestSignedVoucher()

Expand All @@ -312,9 +313,11 @@ func TestClientCanMakeDealWithProvider(t *testing.T) {

// ------- SET UP CLIENT
nw1 := rmnet.NewFromLibp2pHost(testData.Host1)
createdChan, newLaneAddr, createdVoucher, client, err := setupClient(bgCtx, t, clientPaymentChannel, expectedVoucher, nw1, testData, testCase.addFunds)
createdChan, newLaneAddr, createdVoucher, clientNode, client, err := setupClient(bgCtx, t, clientPaymentChannel, expectedVoucher, nw1, testData, testCase.addFunds)
require.NoError(t, err)

clientNode.ExpectKnownAddresses(retrievalPeer, nil)

clientDealStateChan := make(chan retrievalmarket.ClientDealState)
client.SubscribeToEvents(func(event retrievalmarket.ClientEvent, state retrievalmarket.ClientDealState) {
switch event {
Expand Down Expand Up @@ -358,7 +361,7 @@ CurrentInterval: %d
})
// **** Send the query for the Piece
// set up retrieval params
resp, err := client.Query(bgCtx, *retrievalPeer, payloadCID, retrievalmarket.QueryParams{})
resp, err := client.Query(bgCtx, retrievalPeer, payloadCID, retrievalmarket.QueryParams{})
require.NoError(t, err)
require.Equal(t, retrievalmarket.QueryResponseAvailable, resp.Status)

Expand All @@ -376,7 +379,7 @@ CurrentInterval: %d
clientStoreID = &id
}
// *** Retrieve the piece
did, err := client.Retrieve(bgCtx, payloadCID, rmParams, expectedTotal, retrievalPeer.ID, clientPaymentChannel, retrievalPeer.Address, clientStoreID)
did, err := client.Retrieve(bgCtx, payloadCID, rmParams, expectedTotal, retrievalPeer, clientPaymentChannel, retrievalPeer.Address, clientStoreID)
assert.Equal(t, did, retrievalmarket.DealID(0))
require.NoError(t, err)

Expand Down Expand Up @@ -416,7 +419,8 @@ CurrentInterval: %d
if testCase.decider != nil {
assert.True(t, customDeciderRan)
}
// verify that the provider saved the same voucher values
// verify that the nodes we interacted with as expected
clientNode.VerifyExpectations(t)
providerNode.VerifyExpectations(t)
if testCase.skipStores {
testData.VerifyFileTransferred(t, pieceLink, false, testCase.filesize)
Expand All @@ -440,6 +444,7 @@ func setupClient(
*pmtChan,
*address.Address,
*paych.SignedVoucher,
*testnodes.TestRetrievalClientNode,
retrievalmarket.RetrievalClient,
error) {
var createdChan pmtChan
Expand Down Expand Up @@ -475,7 +480,7 @@ func setupClient(
require.NoError(t, err)

client, err := retrievalimpl.NewClient(nw1, testData.MultiStore1, dt1, clientNode, &tut.TestPeerResolver{}, testData.Ds1, testData.RetrievalStoredCounter1)
return &createdChan, &newLaneAddr, &createdVoucher, client, err
return &createdChan, &newLaneAddr, &createdVoucher, clientNode, client, err
}

func setupProvider(
Expand Down
39 changes: 35 additions & 4 deletions retrievalmarket/impl/testnodes/test_retrieval_client_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package testnodes

import (
"context"
"errors"
"fmt"
"testing"

"github.com/ipfs/go-cid"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/specs-actors/actors/abi"
Expand All @@ -25,10 +29,12 @@ type TestRetrievalClientNode struct {
laneError error
voucher *paych.SignedVoucher
voucherError, waitCreateErr, waitAddErr error

allocateLaneRecorder func(address.Address)
createPaymentVoucherRecorder func(voucher *paych.SignedVoucher)
getCreatePaymentChannelRecorder func(address.Address, address.Address, abi.TokenAmount)
knownAddreses map[retrievalmarket.RetrievalPeer][]ma.Multiaddr
receivedKnownAddresses map[retrievalmarket.RetrievalPeer]struct{}
expectedKnownAddresses map[retrievalmarket.RetrievalPeer]struct{}
allocateLaneRecorder func(address.Address)
createPaymentVoucherRecorder func(voucher *paych.SignedVoucher)
getCreatePaymentChannelRecorder func(address.Address, address.Address, abi.TokenAmount)
}

// TestRetrievalClientNodeParams are parameters for initializing a TestRetrievalClientNode
Expand Down Expand Up @@ -66,6 +72,9 @@ func NewTestRetrievalClientNode(params TestRetrievalClientNodeParams) *TestRetri
getCreatePaymentChannelRecorder: params.PaymentChannelRecorder,
createPaychMsgCID: params.CreatePaychCID,
addFundsMsgCID: params.AddFundsCID,
knownAddreses: map[retrievalmarket.RetrievalPeer][]ma.Multiaddr{},
expectedKnownAddresses: map[retrievalmarket.RetrievalPeer]struct{}{},
receivedKnownAddresses: map[retrievalmarket.RetrievalPeer]struct{}{},
}
}

Expand Down Expand Up @@ -119,3 +128,25 @@ func (trcn *TestRetrievalClientNode) WaitForPaymentChannelCreation(messageCID ci
}
return trcn.payCh, trcn.waitCreateErr
}

// ExpectKnownAddresses stubs a return for a look up of known addresses for the given retrieval peer
// and the fact that it was looked up is verified with VerifyExpectations
func (trcn *TestRetrievalClientNode) ExpectKnownAddresses(p retrievalmarket.RetrievalPeer, maddrs []ma.Multiaddr) {
trcn.expectedKnownAddresses[p] = struct{}{}
trcn.knownAddreses[p] = maddrs
}

// GetKnownAddresses gets any on known multiaddrs for a given address, so we can add to the peer store
func (trcn *TestRetrievalClientNode) GetKnownAddresses(ctx context.Context, p retrievalmarket.RetrievalPeer, tok shared.TipSetToken) ([]ma.Multiaddr, error) {
trcn.receivedKnownAddresses[p] = struct{}{}
addrs, ok := trcn.knownAddreses[p]
if !ok {
return nil, errors.New("Provider not found")
}
return addrs, nil
}

// VerifyExpectations verifies that all expected known addresses were looked up
func (trcn *TestRetrievalClientNode) VerifyExpectations(t *testing.T) {
require.Equal(t, trcn.expectedKnownAddresses, trcn.receivedKnownAddresses)
}
6 changes: 6 additions & 0 deletions retrievalmarket/network/libp2p_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package network
import (
"bufio"
"context"
"time"

logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"

"github.com/filecoin-project/go-fil-markets/retrievalmarket"
)
Expand Down Expand Up @@ -71,3 +73,7 @@ func (impl *libp2pRetrievalMarketNetwork) handleNewQueryStream(s network.Stream)
func (impl *libp2pRetrievalMarketNetwork) ID() peer.ID {
return impl.host.ID()
}

func (impl *libp2pRetrievalMarketNetwork) AddAddrs(p peer.ID, addrs []ma.Multiaddr) {
impl.host.Peerstore().AddAddrs(p, addrs, 8*time.Hour)
}
4 changes: 4 additions & 0 deletions retrievalmarket/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package network

import (
"github.com/libp2p/go-libp2p-core/peer"
ma "github.com/multiformats/go-multiaddr"

"github.com/filecoin-project/go-fil-markets/retrievalmarket"
)
Expand Down Expand Up @@ -42,4 +43,7 @@ type RetrievalMarketNetwork interface {

// ID returns the peer id of the host for this network
ID() peer.ID

// AddAddrs adds the given multi-addrs to the peerstore for the passed peer ID
AddAddrs(peer.ID, []ma.Multiaddr)
}
4 changes: 4 additions & 0 deletions retrievalmarket/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
ma "github.com/multiformats/go-multiaddr"

"github.com/filecoin-project/go-fil-markets/shared"
)
Expand Down Expand Up @@ -40,6 +41,9 @@ type RetrievalClientNode interface {
// WaitForPaymentChannelCreation waits for a message on chain that a
// payment channel has been created
WaitForPaymentChannelCreation(messageCID cid.Cid) (address.Address, error)

// GetKnownAddresses gets any on known multiaddrs for a given address, so we can add to the peer store
GetKnownAddresses(ctx context.Context, p RetrievalPeer, tok shared.TipSetToken) ([]ma.Multiaddr, error)
}

// RetrievalProviderNode are the node depedencies for a RetrevalProvider
Expand Down
7 changes: 5 additions & 2 deletions retrievalmarket/storage_retrieval_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ func TestStorageRetrieval(t *testing.T) {
retrievalPeer := peers[0]
require.NotNil(t, retrievalPeer.PieceCID)

rh.ClientNode.ExpectKnownAddresses(retrievalPeer, nil)

resp, err := rh.Client.Query(bgCtx, retrievalPeer, sh.PayloadCid, retrievalmarket.QueryParams{})
require.NoError(t, err)
require.Equal(t, retrievalmarket.QueryResponseAvailable, resp.Status)
Expand All @@ -165,7 +167,7 @@ func TestStorageRetrieval(t *testing.T) {
// *** Retrieve the piece

clientStoreID := sh.TestData.MultiStore1.Next()
did, err := rh.Client.Retrieve(bgCtx, sh.PayloadCid, rmParams, expectedTotal, retrievalPeer.ID, *rh.ExpPaych, retrievalPeer.Address, &clientStoreID)
did, err := rh.Client.Retrieve(bgCtx, sh.PayloadCid, rmParams, expectedTotal, retrievalPeer, *rh.ExpPaych, retrievalPeer.Address, &clientStoreID)
assert.Equal(t, did, retrievalmarket.DealID(0))
require.NoError(t, err)

Expand Down Expand Up @@ -194,6 +196,7 @@ func TestStorageRetrieval(t *testing.T) {
require.Equal(t, retrievalmarket.DealStatusCompleted, providerDealState.Status)
require.Equal(t, retrievalmarket.DealStatusCompleted, clientDealState.Status)

rh.ClientNode.VerifyExpectations(t)
sh.TestData.VerifyFileTransferredIntoStore(t, sh.PieceLink, clientStoreID, false, uint64(fsize))

}
Expand Down Expand Up @@ -319,7 +322,7 @@ func newStorageHarness(ctx context.Context, t *testing.T) *storageHarness {
PeerID: td.Host2.ID(),
}

smState.Providers = []*storagemarket.StorageProviderInfo{&providerInfo}
smState.Providers = map[address.Address]*storagemarket.StorageProviderInfo{providerAddr: &providerInfo}
return &storageHarness{
Ctx: ctx,
Epoch: epoch,
Expand Down
Loading