Skip to content

Commit

Permalink
Feat/find providers (#43)
Browse files Browse the repository at this point in the history
* client accepts a peer resolver
* unit tests
  • Loading branch information
shannonwells authored Jan 15, 2020
1 parent 8cf2e3b commit d03e1c2
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 22 deletions.
6 changes: 4 additions & 2 deletions pieceio/cario/cario.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package cario
import (
"context"
"fmt"
"github.com/filecoin-project/go-fil-markets/pieceio"
"io"

"github.com/ipfs/go-car"
"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/traversal/selector"
"io"

"github.com/filecoin-project/go-fil-markets/pieceio"
)

type carIO struct {
Expand Down
5 changes: 3 additions & 2 deletions retrievalmarket/discovery/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ func (l *Local) AddPeer(cid cid.Cid, peer retrievalmarket.RetrievalPeer) error {
return l.ds.Put(dshelp.CidToDsKey(cid), entry)
}

func (l *Local) GetPeers(data cid.Cid) ([]retrievalmarket.RetrievalPeer, error) {
entry, err := l.ds.Get(dshelp.CidToDsKey(data))
func (l *Local) GetPeers(pieceCID []byte) ([]retrievalmarket.RetrievalPeer, error) {
key := string(pieceCID[:])
entry, err := l.ds.Get(datastore.NewKey(key))
if err == datastore.ErrNotFound {
return []retrievalmarket.RetrievalPeer{}, nil
}
Expand Down
36 changes: 23 additions & 13 deletions retrievalmarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@ package retrievalimpl

import (
"context"
"errors"
"reflect"
"sync"

"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/clientstates"

"github.com/filecoin-project/go-address"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/clientstates"
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
"github.com/filecoin-project/go-fil-markets/shared/tokenamount"
)
Expand All @@ -28,19 +27,25 @@ type client struct {
node retrievalmarket.RetrievalClientNode
// The parameters should be replaced by RetrievalClientNode

nextDealLk sync.RWMutex
nextDealID retrievalmarket.DealID
nextDealLk sync.RWMutex
nextDealID retrievalmarket.DealID

subscribersLk sync.RWMutex
subscribers []retrievalmarket.ClientSubscriber
subscribers []retrievalmarket.ClientSubscriber
resolver retrievalmarket.PeerResolver
}

// NewClient creates a new retrieval client
func NewClient(network rmnet.RetrievalMarketNetwork, bs blockstore.Blockstore, node retrievalmarket.RetrievalClientNode) retrievalmarket.RetrievalClient {
func NewClient(
network rmnet.RetrievalMarketNetwork,
bs blockstore.Blockstore,
node retrievalmarket.RetrievalClientNode,
resolver retrievalmarket.PeerResolver) retrievalmarket.RetrievalClient {
return &client{
network: network,
bs: bs,
node: node,
network: network,
bs: bs,
node: node,
resolver: resolver,
}
}

Expand All @@ -49,7 +54,12 @@ func NewClient(network rmnet.RetrievalMarketNetwork, bs blockstore.Blockstore, n
// TODO: Implement for retrieval provider V0 epic
// https://github.com/filecoin-project/go-retrieval-market-project/issues/12
func (c *client) FindProviders(pieceCID []byte) []retrievalmarket.RetrievalPeer {
panic("not implemented")
peers, err := c.resolver.GetPeers(pieceCID)
if err != nil {
log.Error(err)
return []retrievalmarket.RetrievalPeer{}
}
return peers
}

// TODO: Update to match spec for V0 epic
Expand Down Expand Up @@ -138,7 +148,7 @@ func (c *client) handleDeal(ctx context.Context, dealState retrievalmarket.Clien
case retrievalmarket.DealStatusFundsNeeded, retrievalmarket.DealStatusFundsNeededLastPayment:
handler = clientstates.ProcessNextResponse
default:
c.failDeal(&dealState, errors.New("unexpected deal state"))
c.failDeal(&dealState, xerrors.New("unexpected deal state"))
return
}
dealModifier := handler(ctx, environment, dealState)
Expand Down
64 changes: 60 additions & 4 deletions retrievalmarket/impl/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package retrievalimpl_test

import (
"context"
"errors"
"testing"

"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -56,7 +57,7 @@ func TestClient_Query(t *testing.T) {
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: tut.ExpectPeerOnQueryStreamBuilder(t, expectedPeer, qsb, "Peers should match"),
})
c := retrievalimpl.NewClient(net, bs, testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}))
c := retrievalimpl.NewClient(net, bs, testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}), &testPeerResolver{})

resp, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
require.NoError(t, err)
Expand All @@ -68,7 +69,8 @@ func TestClient_Query(t *testing.T) {
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: tut.FailNewQueryStream,
})
c := retrievalimpl.NewClient(net, bs, testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}))
c := retrievalimpl.NewClient(net, bs,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}), &testPeerResolver{})

_, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
assert.EqualError(t, err, "new query stream failed")
Expand All @@ -87,7 +89,8 @@ func TestClient_Query(t *testing.T) {
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: qsbuilder,
})
c := retrievalimpl.NewClient(net, bs, testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}))
c := retrievalimpl.NewClient(net, bs,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}), &testPeerResolver{})

statusCode, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
assert.EqualError(t, err, "write query failed")
Expand All @@ -105,10 +108,63 @@ func TestClient_Query(t *testing.T) {
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: qsbuilder,
})
c := retrievalimpl.NewClient(net, bs, testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}))
c := retrievalimpl.NewClient(
net,
bs,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}),
&testPeerResolver{})

statusCode, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
assert.EqualError(t, err, "query response failed")
assert.Equal(t, retrievalmarket.QueryResponseUndefined, statusCode)
})
}

func TestClient_FindProviders(t *testing.T) {
bs := bstore.NewBlockstore(dss.MutexWrap(datastore.NewMapDatastore()))
expectedPeer := peer.ID("somevalue")

var qsb tut.QueryStreamBuilder = func(p peer.ID) (rmnet.RetrievalQueryStream, error) {
return tut.NewTestRetrievalQueryStream(tut.TestQueryStreamParams{
Writer: tut.TrivialQueryWriter,
RespReader: tut.TrivialQueryResponseReader,
}), nil
}
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: tut.ExpectPeerOnQueryStreamBuilder(t, expectedPeer, qsb, "Peers should match"),
})

t.Run("when providers are found, returns providers", func(t *testing.T) {
peers := tut.RequireGenerateRetrievalPeers(t, 3)
testResolver := testPeerResolver{peers: peers}

c := retrievalimpl.NewClient(net, bs, &testnodes.TestRetrievalClientNode{}, &testResolver)
testCid := []byte("somePieceCID")
assert.Len(t, c.FindProviders(testCid), 3)
})

t.Run("when there is an error, returns empty provider list", func(t *testing.T) {
testResolver := testPeerResolver{peers: []retrievalmarket.RetrievalPeer{}, resolverError: errors.New("boom")}
c := retrievalimpl.NewClient(net, bs, &testnodes.TestRetrievalClientNode{}, &testResolver)
badCid := []byte("doesn't matter")
assert.Len(t, c.FindProviders(badCid), 0)
})

t.Run("when there are no providers", func(t *testing.T) {
testResolver := testPeerResolver{peers: []retrievalmarket.RetrievalPeer{}}
c := retrievalimpl.NewClient(net, bs, &testnodes.TestRetrievalClientNode{}, &testResolver)
testCid := []byte("unimportant")
assert.Len(t, c.FindProviders(testCid), 0)
})
}

type testPeerResolver struct {
peers []retrievalmarket.RetrievalPeer
resolverError error
}

var _ retrievalmarket.PeerResolver = &testPeerResolver{}

func (tpr testPeerResolver) GetPeers( []byte) ([]retrievalmarket.RetrievalPeer, error) {
return tpr.peers, tpr.resolverError
}
2 changes: 1 addition & 1 deletion retrievalmarket/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ type RetrievalProviderNode interface {

// PeerResolver is an interface for looking up providers that may have a piece
type PeerResolver interface {
GetPeers(data cid.Cid) ([]RetrievalPeer, error) // TODO: channel
GetPeers(pieceCID []byte) ([]RetrievalPeer, error) // TODO: channel
}

// RetrievalPeer is a provider address/peer.ID pair (everything needed to make
Expand Down
18 changes: 18 additions & 0 deletions shared_testutil/test_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package shared_testutil
import (
"math/big"
"math/rand"
"testing"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-data-transfer/testutil"
"github.com/libp2p/go-libp2p-core/test"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/shared/tokenamount"
Expand Down Expand Up @@ -107,3 +110,18 @@ func MakeTestDealPayment() retrievalmarket.DealPayment {
PaymentVoucher: MakeTestSignedVoucher(),
}
}

func RequireGenerateRetrievalPeers(t *testing.T, numPeers int) []retrievalmarket.RetrievalPeer {
peers := make([]retrievalmarket.RetrievalPeer, numPeers)
for i := range peers {
pid, err := test.RandPeerID()
require.NoError(t, err)
addr, err := address.NewIDAddress(rand.Uint64())
require.NoError(t, err)
peers[i] = retrievalmarket.RetrievalPeer{
Address: addr,
ID: pid,
}
}
return peers
}

0 comments on commit d03e1c2

Please sign in to comment.