Skip to content

Commit

Permalink
Upgrade Query Protocol to Spec V0 (#25)
Browse files Browse the repository at this point in the history
* feat(retrieval): add network abstraction skeleton

Create a network to wrap cbor read/writes and introduce some typechecking, and provide a mockable
abstraction

* feat(retrievalmarket): upgrade query protocol to spec v0

Implements spec v0 of retrieval query protocol. Also defines mocks for network

* fix(retrievalmarket): fix lint errors

* updates after rebase

* some updates after rebase with master

* fix tests/breakages

* fix test

* updates after repo rename & rebase with master

Co-authored-by: Shannon Wells <shannonwells@users.noreply.github.com>
  • Loading branch information
hannahhoward and shannonwells authored Jan 10, 2020
1 parent 7ac5a8a commit 65eccea
Show file tree
Hide file tree
Showing 13 changed files with 869 additions and 122 deletions.
6 changes: 3 additions & 3 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ jobs:
- run: sudo apt-get update
- restore_cache:
name: restore go mod cache
key: v1-go-deps-{{ arch }}-{{ checksum "/home/circleci/project/go-fil-components/go.mod" }}
key: v1-go-deps-{{ arch }}-{{ checksum "/home/circleci/project/go-fil-markets/go.mod" }}
- run:
command: make build
- store_artifacts:
path: go-fil-components
path: go-fil-markets
- store_artifacts:
path: go-fil-components
path: go-fil-markets

test: &test
description: |
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# go-fil-markets
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io)
[![CircleCI](https://circleci.com/gh/filecoin-project/go-fil-components.svg?style=svg)](https://circleci.com/gh/filecoin-project/go-fil-components)
[![codecov](https://codecov.io/gh/filecoin-project/go-fil-components/branch/master/graph/badge.svg)](https://codecov.io/gh/filecoin-project/go-fil-components)
[![CircleCI](https://circleci.com/gh/filecoin-project/go-fil-markets.svg?style=svg)](https://circleci.com/gh/filecoin-project/go-fil-markets)
[![codecov](https://codecov.io/gh/filecoin-project/go-fil-markets/branch/master/graph/badge.svg)](https://codecov.io/gh/filecoin-project/go-fil-markets)

This repository contains modular implementations of the storage and retrieval market subsystems of Filecoin. These modules are guided by the [v1.0 and 1.1 Filecoin specification updates](https://filecoin-project.github.io/specs/#intro__changelog).

Expand All @@ -14,7 +14,7 @@ Separating an implementation into a blockchain component and one or more mining
## Contributing
PRs are welcome! Please first read the design docs and look over the current code. PRs against
master require approval of at least two maintainers. For the rest, please see our
[CONTRIBUTING](.go-fil-components/CONTRIBUTING.md) guide.
[CONTRIBUTING](.go-fil-markets/CONTRIBUTING.md) guide.

## Project-level documentation
The filecoin-project has a [community repo](https://github.com/filecoin-project/community) that documents in more detail our policies and guidelines, such as discussion forums and chat rooms and [Code of Conduct](https://github.com/filecoin-project/community/blob/master/CODE_OF_CONDUCT.md).
Expand Down
64 changes: 15 additions & 49 deletions retrievalmarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,22 @@ import (
"reflect"
"sync"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-address"
blockstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-cbor-util"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/shared/params"
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
"github.com/filecoin-project/go-fil-markets/shared/tokenamount"
"github.com/filecoin-project/go-fil-markets/shared/types"
)

var log = logging.Logger("retrieval")

type client struct {
h host.Host
bs blockstore.Blockstore
node retrievalmarket.RetrievalClientNode
network rmnet.RetrievalMarketNetwork
bs blockstore.Blockstore
node retrievalmarket.RetrievalClientNode
// The parameters should be replaced by RetrievalClientNode

nextDealLk sync.Mutex
Expand All @@ -39,8 +30,8 @@ type client struct {
}

// NewClient creates a new retrieval client
func NewClient(h host.Host, bs blockstore.Blockstore, node retrievalmarket.RetrievalClientNode) retrievalmarket.RetrievalClient {
return &client{h: h, bs: bs, node: node}
func NewClient(network rmnet.RetrievalMarketNetwork, bs blockstore.Blockstore, node retrievalmarket.RetrievalClientNode) retrievalmarket.RetrievalClient {
return &client{network: network, bs: bs, node: node}
}

// V0
Expand All @@ -54,39 +45,22 @@ func (c *client) FindProviders(pieceCID []byte) []retrievalmarket.RetrievalPeer
// TODO: Update to match spec for V0 epic
// https://github.com/filecoin-project/go-retrieval-market-project/issues/8
func (c *client) Query(ctx context.Context, p retrievalmarket.RetrievalPeer, pieceCID []byte, params retrievalmarket.QueryParams) (retrievalmarket.QueryResponse, error) {
cid, err := cid.Cast(pieceCID)
if err != nil {
log.Warn(err)
return retrievalmarket.QueryResponseUndefined, err
}

s, err := c.h.NewStream(ctx, p.ID, retrievalmarket.QueryProtocolID)
s, err := c.network.NewQueryStream(p.ID)
if err != nil {
log.Warn(err)
return retrievalmarket.QueryResponseUndefined, err
}
defer s.Close()

err = cborutil.WriteCborRPC(s, &OldQuery{
Piece: cid,
err = s.WriteQuery(retrievalmarket.Query{
PieceCID: pieceCID,
})
if err != nil {
log.Warn(err)
return retrievalmarket.QueryResponseUndefined, err
}

var oldResp OldQueryResponse
if err := oldResp.UnmarshalCBOR(s); err != nil {
log.Warn(err)
return retrievalmarket.QueryResponseUndefined, err
}

resp := retrievalmarket.QueryResponse{
Status: retrievalmarket.QueryResponseStatus(oldResp.Status),
Size: oldResp.Size,
MinPricePerByte: tokenamount.Div(oldResp.MinPrice, tokenamount.FromInt(oldResp.Size)),
}
return resp, nil
return s.ReadQueryResponse()
}

// TODO: Update to match spec for V0 Epic:
Expand All @@ -111,16 +85,6 @@ func (c *client) Retrieve(ctx context.Context, pieceCID []byte, params retrieval

go func() {
evt := retrievalmarket.ClientEventError
converted, err := cid.Cast(pieceCID)

if err == nil {
err = c.retrieveUnixfs(ctx, converted, tokenamount.Div(totalFunds, params.PricePerByte).Uint64(), totalFunds, miner, clientWallet, minerWallet)
if err == nil {
evt = retrievalmarket.ClientEventComplete
dealState.Status = retrievalmarket.DealStatusCompleted
}
}

c.notifySubscribers(evt, dealState)
}()

Expand Down Expand Up @@ -178,6 +142,7 @@ func (c *client) ListDeals() map[retrievalmarket.DealID]retrievalmarket.ClientDe
panic("not implemented")
}

/*
type clientStream struct {
node retrievalmarket.RetrievalClientNode
stream network.Stream
Expand All @@ -196,7 +161,7 @@ type clientStream struct {
verifier BlockVerifier
bs blockstore.Blockstore
}

*/
/* This is the old retrieval code that is NOT spec compliant */

// C > S
Expand All @@ -211,7 +176,7 @@ type clientStream struct {
// < ..Blocks
// > DealProposal(...)
// < ...
func (c *client) retrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, total tokenamount.TokenAmount, miner peer.ID, client, minerAddr address.Address) error {
/*func (c *client) retrieveUnixfs(ctx context.Context, root cid.Cid, size uint64, total tokenamount.TokenAmount, miner peer.ID, client, minerAddr address.Address) error {
s, err := c.h.NewStream(ctx, miner, retrievalmarket.ProtocolID)
if err != nil {
return xerrors.Errorf("failed to open stream to miner for retrieval query: %w", err)
Expand Down Expand Up @@ -390,3 +355,4 @@ func (cst *clientStream) setupPayment(ctx context.Context, toSend tokenamount.To
Vouchers: []*types.SignedVoucher{sv},
}, nil
}
*/
129 changes: 129 additions & 0 deletions retrievalmarket/impl/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package retrievalimpl_test

import (
"context"
"testing"

"github.com/filecoin-project/go-address"
"github.com/ipfs/go-datastore"
dss "github.com/ipfs/go-datastore/sync"
bstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-fil-markets/retrievalmarket"
retrievalimpl "github.com/filecoin-project/go-fil-markets/retrievalmarket/impl"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
"github.com/filecoin-project/go-fil-markets/shared/tokenamount"
"github.com/filecoin-project/go-fil-markets/shared/types"
tut "github.com/filecoin-project/go-fil-markets/shared_testutil"
)

func TestClient_Query(t *testing.T) {
ctx := context.Background()

bs := bstore.NewBlockstore(dss.MutexWrap(datastore.NewMapDatastore()))

pcid := []byte(string("applesauce"))
expectedPeer := peer.ID("somevalue")
rpeer := retrievalmarket.RetrievalPeer{
Address: address.TestAddress2,
ID: expectedPeer,
}

expectedQuery := retrievalmarket.Query{
PieceCID: pcid,
}

expectedQueryResponse := retrievalmarket.QueryResponse{
Status: retrievalmarket.QueryResponseAvailable,
Size: 1234,
PaymentAddress: address.TestAddress,
MinPricePerByte: tokenamount.FromInt(5678),
MaxPaymentInterval: 4321,
MaxPaymentIntervalIncrease: 0,
}

t.Run("it works", func(t *testing.T) {
var qsb tut.QueryStreamBuilder = func(p peer.ID) (rmnet.RetrievalQueryStream, error) {
return tut.NewTestRetrievalQueryStream(tut.TestQueryStreamParams{
Writer: tut.ExpectQueryWriter(t, expectedQuery, "queries should match"),
RespReader: tut.StubbedQueryResponseReader(expectedQueryResponse),
}), nil
}
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: tut.ExpectPeerOnQueryStreamBuilder(t, expectedPeer, qsb, "Peers should match"),
})
c := retrievalimpl.NewClient(net, bs, &testRetrievalNode{})

resp, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
require.NoError(t, err)
assert.NotNil(t, resp)
assert.Equal(t, expectedQueryResponse, resp)
})

t.Run("when the stream returns error, returns error", func(t *testing.T) {
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: tut.FailNewQueryStream,
})
c := retrievalimpl.NewClient(net, bs, &testRetrievalNode{})

_, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
assert.EqualError(t, err, "new query stream failed")
})

t.Run("when WriteQuery fails, returns error", func(t *testing.T) {

qsbuilder := func(p peer.ID) (network.RetrievalQueryStream, error) {
newStream := tut.NewTestRetrievalQueryStream(tut.TestQueryStreamParams{
PeerID: p,
Writer: tut.FailQueryWriter,
})
return newStream, nil
}

net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: qsbuilder,
})
c := retrievalimpl.NewClient(net, bs, &testRetrievalNode{})

statusCode, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
assert.EqualError(t, err, "write query failed")
assert.Equal(t, retrievalmarket.QueryResponseUndefined, statusCode)
})

t.Run("when ReadQueryResponse fails, returns error", func(t *testing.T) {
qsbuilder := func(p peer.ID) (network.RetrievalQueryStream, error) {
newStream := tut.NewTestRetrievalQueryStream(tut.TestQueryStreamParams{
PeerID: p,
RespReader: tut.FailResponseReader,
})
return newStream, nil
}
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: qsbuilder,
})
c := retrievalimpl.NewClient(net, bs, &testRetrievalNode{})

statusCode, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
assert.EqualError(t, err, "query response failed")
assert.Equal(t, retrievalmarket.QueryResponseUndefined, statusCode)
})
}

type testRetrievalNode struct {
}

func (t *testRetrievalNode) GetOrCreatePaymentChannel(ctx context.Context, clientAddress address.Address, minerAddress address.Address, clientFundsAvailable tokenamount.TokenAmount) (address.Address, error) {
return address.Address{}, nil
}

func (t *testRetrievalNode) AllocateLane(paymentChannel address.Address) (uint64, error) {
return 0, nil
}

func (t *testRetrievalNode) CreatePaymentVoucher(ctx context.Context, paymentChannel address.Address, amount tokenamount.TokenAmount, lane uint64) (*types.SignedVoucher, error) {
return nil, nil
}
Loading

0 comments on commit 65eccea

Please sign in to comment.