Skip to content

Commit

Permalink
WIPWIPWIP saving current work
Browse files Browse the repository at this point in the history
  • Loading branch information
shannonwells committed Dec 19, 2019
1 parent 825168f commit 939ce0a
Show file tree
Hide file tree
Showing 8 changed files with 280 additions and 173 deletions.
10 changes: 5 additions & 5 deletions datatransfer/impl/graphsync/graphsync_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1122,12 +1122,12 @@ type graphsyncTestingData struct {
storer2 ipld.Storer
host1 host.Host
host2 host.Host
gsNet1 gsnet.GraphSyncNetwork
gsNet2 gsnet.GraphSyncNetwork
bridge1 ipldbridge.IPLDBridge
bridge2 ipldbridge.IPLDBridge
allSelector ipld.Node
origBytes []byte
gsNet1 gsnet.GraphSyncNetwork
gsNet2 gsnet.GraphSyncNetwork
}

func newGraphsyncTestingData(ctx context.Context, t *testing.T) *graphsyncTestingData {
Expand Down Expand Up @@ -1194,9 +1194,6 @@ func newGraphsyncTestingData(ctx context.Context, t *testing.T) *graphsyncTestin
err = mn.LinkAll()
require.NoError(t, err)

gsData.gsNet1 = gsnet.NewFromLibp2pHost(gsData.host1)
gsData.gsNet2 = gsnet.NewFromLibp2pHost(gsData.host2)

gsData.bridge1 = ipldbridge.NewIPLDBridge()
gsData.bridge2 = ipldbridge.NewIPLDBridge()

Expand All @@ -1206,6 +1203,9 @@ func newGraphsyncTestingData(ctx context.Context, t *testing.T) *graphsyncTestin
gsData.allSelector = ssb.ExploreRecursive(selector.RecursionLimitNone(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()

gsData.gsNet1 = gsnet.NewFromLibp2pHost(gsData.host1)
gsData.gsNet2 = gsnet.NewFromLibp2pHost(gsData.host2)

return gsData
}

Expand Down
26 changes: 7 additions & 19 deletions retrievalmarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"golang.org/x/xerrors"

"github.com/filecoin-project/go-fil-components/retrievalmarket"
rmnet "github.com/filecoin-project/go-fil-components/retrievalmarket/network"
"github.com/filecoin-project/go-fil-components/shared/address"
"github.com/filecoin-project/go-fil-components/shared/cborutil"
"github.com/filecoin-project/go-fil-components/shared/params"
Expand All @@ -33,12 +34,13 @@ type client struct {

nextDealLk sync.Mutex
nextDealID retrievalmarket.DealID
rmnet rmnet.RetrievalMarketNetwork
subscribers []retrievalmarket.ClientSubscriber
}

// NewClient creates a new retrieval client
func NewClient(h host.Host, bs blockstore.Blockstore, node retrievalmarket.RetrievalClientNode) retrievalmarket.RetrievalClient {
return &client{h: h, bs: bs, node: node}
return &client{h: h, bs: bs, node: node, rmnet: rmnet.NewFromLibp2pHost(h)}
}

// V0
Expand All @@ -52,38 +54,24 @@ func (c *client) FindProviders(pieceCID []byte) []retrievalmarket.RetrievalPeer
// TODO: Update to match spec for V0 epic
// https://github.com/filecoin-project/go-retrieval-market-project/issues/8
func (c *client) Query(ctx context.Context, p retrievalmarket.RetrievalPeer, pieceCID []byte, params retrievalmarket.QueryParams) (retrievalmarket.QueryResponse, error) {
cid, err := cid.Cast(pieceCID)
if err != nil {
log.Warn(err)
return retrievalmarket.QueryResponseUndefined, err
}

s, err := c.h.NewStream(ctx, p.ID, retrievalmarket.QueryProtocolID)
s, err := c.rmnet.NewQueryStream(p.ID)
if err != nil {
log.Warn(err)
return retrievalmarket.QueryResponseUndefined, err
}
defer s.Close()

err = cborutil.WriteCborRPC(s, &OldQuery{
Piece: cid,
})
err = s.WriteQuery(retrievalmarket.Query{PieceCID:pieceCID})
if err != nil {
log.Warn(err)
return retrievalmarket.QueryResponseUndefined, err
}

var oldResp OldQueryResponse
if err := oldResp.UnmarshalCBOR(s); err != nil {
var resp retrievalmarket.QueryResponse
if resp, err = s.ReadQueryResponse(); err != nil {
log.Warn(err)
return retrievalmarket.QueryResponseUndefined, err
}

resp := retrievalmarket.QueryResponse{
Status: retrievalmarket.QueryResponseStatus(oldResp.Status),
Size: oldResp.Size,
MinPricePerByte: tokenamount.Div(oldResp.MinPrice, tokenamount.FromInt(oldResp.Size)),
}
return resp, nil
}

Expand Down
31 changes: 31 additions & 0 deletions retrievalmarket/impl/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package retrievalimpl_test

import (
"context"
retrievalimpl "github.com/filecoin-project/go-fil-components/retrievalmarket/impl"
tut "github.com/filecoin-project/go-fil-components/retrievalmarket/network/testutil"
"github.com/libp2p/go-libp2p-core/peer"
"testing"

mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"
)

func TestClient_Query(t *testing.T) {
ctx := context.Background()


mnet := mocknet.New(ctx)
h1, err := mnet.GenPeer()
require.NoError(t, err)
h2, err := mnet.GenPeer()
require.NoError(t, err)

bs :=

t.Run("it works", func(t *testing.T) {
net := tut.NewTestRetrievalMarketNetwork(h1, []peer.ID{h2.ID()})

c := retrievalimpl.NewClient(h1, bs, n)
})
}
38 changes: 32 additions & 6 deletions retrievalmarket/network/libp2p_impl.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,45 @@
package network

import (
"context"
"github.com/filecoin-project/go-fil-components/retrievalmarket"
logging "github.com/ipfs/go-log"
host "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
)

var log = logging.Logger("retrieval_network")

func NewFromLibp2pHost(host.Host) RetrievalMarketNetwork {
return nil
func NewFromLibp2pHost(h host.Host) RetrievalMarketNetwork {
return libp2pRetrievalMarketNetwork{host: h}
}
// libp2pDataTransferNetwork transforms the libp2p host interface, which sends and receives
// libp2pRetrievalMarketNetwork transforms the libp2p host interface, which sends and receives
// NetMessage objects, into the graphsync network interface.
type libp2pDataTransferNetwork struct {
type libp2pRetrievalMarketNetwork struct {
host host.Host
// inbound messages from the network are forwarded to the receiver
receiver
receiver RetrievalReceiver
}

func (impl libp2pRetrievalMarketNetwork) NewQueryStream(id peer.ID) (RetrievalQueryStream, error) {
s, err := impl.host.NewStream(context.Background(), id, retrievalmarket.QueryProtocolID)
if err != nil {
return nil, err
}
return
}

func (impl libp2pRetrievalMarketNetwork) NewDealStream(id peer.ID) (RetrievalDealStream, error) {
panic("implement me")
}

func (impl libp2pRetrievalMarketNetwork) SetDelegate(r RetrievalReceiver) error {
impl.receiver = r
impl.host.SetStreamHandler(retrievalmarket.ProtocolID, impl.handleNewStream)
}


func debugLog(msg string) {
log.Debugf("retrievalmarket net handleNewStream -- %s", ms)

}
4 changes: 3 additions & 1 deletion retrievalmarket/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package network

import (
"github.com/filecoin-project/go-fil-components/retrievalmarket"
peer "github.com/libp2p/go-libp2p-peer"
"github.com/libp2p/go-libp2p-core/peer"
)

type RetrievalQueryStream interface {
ReadQuery() (retrievalmarket.Query, error)
WriteQuery(retrievalmarket.Query) error
ReadQueryResponse() (retrievalmarket.QueryResponse, error)
WriteQueryResponse(retrievalmarket.QueryResponse) error
Close() error
}

type RetrievalDealStream interface {
Expand All @@ -19,6 +20,7 @@ type RetrievalDealStream interface {
WriteDealResponse(retrievalmarket.DealResponse) error
ReadDealPayment() (retrievalmarket.DealPayment, error)
WriteDealPayment(retrievalmarket.DealPayment) error
Close() error
}

type RetrievalReceiver interface {
Expand Down
54 changes: 54 additions & 0 deletions retrievalmarket/network/query_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package network

import (
"github.com/filecoin-project/go-fil-components/retrievalmarket"
retrievalimpl "github.com/filecoin-project/go-fil-components/retrievalmarket/impl"
"github.com/filecoin-project/go-fil-components/shared/cborutil"
"github.com/filecoin-project/go-fil-components/shared/tokenamount"
"github.com/ipfs/go-cid"
p2pnet "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
)

type queryStream struct {
p peer.ID
s p2pnet.Stream
}

func (qs queryStream) ReadQuery() (retrievalmarket.Query, error) {
panic("implement me")
}

func (qs queryStream) WriteQuery(q retrievalmarket.Query) error {
cid, err := cid.Cast(q.PieceCID)
if err != nil {
return err
}

return cborutil.WriteCborRPC(qs.s, &retrievalimpl.OldQuery{Piece: cid})
}

func (qs queryStream) ReadQueryResponse() (retrievalmarket.QueryResponse, error) {
var oldResp retrievalimpl.OldQueryResponse
if err := oldResp.UnmarshalCBOR(qs.s); err != nil {
log.Warn(err)
return retrievalmarket.QueryResponseUndefined, err
}

resp := retrievalmarket.QueryResponse{
Status: retrievalmarket.QueryResponseStatus(oldResp.Status),
Size: oldResp.Size,
MinPricePerByte: tokenamount.Div(oldResp.MinPrice, tokenamount.FromInt(oldResp.Size)),
}
return resp, nil
}

func (qs queryStream) WriteQueryResponse(retrievalmarket.QueryResponse) error {
panic("implement me")
}

var _ RetrievalQueryStream = (*queryStream)(nil)

func (qs queryStream) Close() error {
return qs.s.Close()
}
Loading

0 comments on commit 939ce0a

Please sign in to comment.