From 4da6c81c3e0371a6d4cb771278a8c9179cd0082f Mon Sep 17 00:00:00 2001 From: shannonwells Date: Thu, 2 Jan 2020 12:42:01 -0800 Subject: [PATCH] updates after rebase, plus pulling scope back --- retrievalmarket/README.md | 13 -- retrievalmarket/impl/client.go | 69 ++++---- retrievalmarket/impl/client_test.go | 127 -------------- retrievalmarket/impl/provider.go | 26 ++- .../impl/{impl_types => }/types.go | 2 +- .../impl/{impl_types => }/types_cbor_gen.go | 2 +- retrievalmarket/network/query_stream.go | 6 +- shared/statestore/store.go | 164 ------------------ shared/statestore/store_test.go | 38 ---- 9 files changed, 49 insertions(+), 398 deletions(-) delete mode 100644 retrievalmarket/README.md delete mode 100644 retrievalmarket/impl/client_test.go rename retrievalmarket/impl/{impl_types => }/types.go (98%) rename retrievalmarket/impl/{impl_types => }/types_cbor_gen.go (99%) delete mode 100644 shared/statestore/store.go delete mode 100644 shared/statestore/store_test.go diff --git a/retrievalmarket/README.md b/retrievalmarket/README.md deleted file mode 100644 index 954da03ce..000000000 --- a/retrievalmarket/README.md +++ /dev/null @@ -1,13 +0,0 @@ -# go-fil-components/retrievalmarket - -[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) - -## Description -This module encapsulates retrieval market functionality - -## Contributing -PRs are welcome! Please first read the design docs and look over the current code. PRs against -master require approval of at least two maintainers. For the rest, please see our -[CONTRIBUTING](https://github.com/filecoin-project/go-fil-components/CONTRIBUTING.md) guide. - -Copyright 2019. Protocol Labs, Inc. \ No newline at end of file diff --git a/retrievalmarket/impl/client.go b/retrievalmarket/impl/client.go index 3cc907472..b675b3ae1 100644 --- a/retrievalmarket/impl/client.go +++ b/retrievalmarket/impl/client.go @@ -19,8 +19,6 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-fil-components/retrievalmarket" - "github.com/filecoin-project/go-fil-components/retrievalmarket/impl/impl_types" - rmnet "github.com/filecoin-project/go-fil-components/retrievalmarket/network" "github.com/filecoin-project/go-fil-components/shared/params" "github.com/filecoin-project/go-fil-components/shared/tokenamount" "github.com/filecoin-project/go-fil-components/shared/types" @@ -36,29 +34,12 @@ type client struct { nextDealLk sync.Mutex nextDealID retrievalmarket.DealID - rmnet rmnet.RetrievalMarketNetwork subscribers []retrievalmarket.ClientSubscriber } -type NewClientParams struct { - Host host.Host - Blockstore blockstore.Blockstore - RCNode retrievalmarket.RetrievalClientNode - RMNet rmnet.RetrievalMarketNetwork -} - // NewClient creates a new retrieval client -func NewClient(clientParams NewClientParams) retrievalmarket.RetrievalClient { - client := &client{ - h: clientParams.Host, - bs: clientParams.Blockstore, - node: clientParams.RCNode, - rmnet: rmnet.NewFromLibp2pHost(clientParams.Host), - } - if clientParams.RMNet != nil { - client.rmnet = clientParams.RMNet - } - return client +func NewClient(h host.Host, bs blockstore.Blockstore, node retrievalmarket.RetrievalClientNode) retrievalmarket.RetrievalClient { + return &client{h: h, bs: bs, node: node} } // V0 @@ -72,24 +53,38 @@ func (c *client) FindProviders(pieceCID []byte) []retrievalmarket.RetrievalPeer // TODO: Update to match spec for V0 epic // https://github.com/filecoin-project/go-retrieval-market-project/issues/8 func (c *client) Query(ctx context.Context, p retrievalmarket.RetrievalPeer, pieceCID []byte, params retrievalmarket.QueryParams) (retrievalmarket.QueryResponse, error) { - s, err := c.rmnet.NewQueryStream(p.ID) + cid, err := cid.Cast(pieceCID) + if err != nil { + log.Warn(err) + return retrievalmarket.QueryResponseUndefined, err + } + + s, err := c.h.NewStream(ctx, p.ID, retrievalmarket.QueryProtocolID) if err != nil { log.Warn(err) return retrievalmarket.QueryResponseUndefined, err } defer s.Close() - err = s.WriteQuery(retrievalmarket.Query{PieceCID: pieceCID}) + err = cborutil.WriteCborRPC(s, &OldQuery{ + Piece: cid, + }) if err != nil { log.Warn(err) return retrievalmarket.QueryResponseUndefined, err } - var resp retrievalmarket.QueryResponse - if resp, err = s.ReadQueryResponse(); err != nil { + var oldResp OldQueryResponse + if err := oldResp.UnmarshalCBOR(s); err != nil { log.Warn(err) return retrievalmarket.QueryResponseUndefined, err } + + resp := retrievalmarket.QueryResponse{ + Status: retrievalmarket.QueryResponseStatus(oldResp.Status), + Size: oldResp.Size, + MinPricePerByte: tokenamount.Div(oldResp.MinPrice, tokenamount.FromInt(oldResp.Size)), + } return resp, nil } @@ -272,11 +267,11 @@ func (cst *clientStream) doOneExchange(ctx context.Context, toFetch uint64) erro return xerrors.Errorf("setting up retrieval payment: %w", err) } - deal := &impl_types.OldDealProposal{ + deal := &OldDealProposal{ Payment: payment, Ref: cst.root, - Params: impl_types.RetParams{ - Unixfs0: &impl_types.Unixfs0Offer{ + Params: RetParams{ + Unixfs0: &Unixfs0Offer{ Offset: cst.offset, Size: toFetch, }, @@ -287,19 +282,19 @@ func (cst *clientStream) doOneExchange(ctx context.Context, toFetch uint64) erro return xerrors.Errorf("sending incremental retrieval request: %w", err) } - var resp impl_types.OldDealResponse + var resp OldDealResponse if err := cborutil.ReadCborRPC(cst.peeker, &resp); err != nil { return xerrors.Errorf("reading retrieval response: %w", err) } - if resp.Status != impl_types.Accepted { + if resp.Status != Accepted { cst.windowSize = params.UnixfsChunkSize // TODO: apply some 'penalty' to miner 'reputation' (needs to be the same in both cases) - if resp.Status == impl_types.Error { + if resp.Status == Error { return xerrors.Errorf("storage deal error: %s", resp.Message) } - if resp.Status == impl_types.Rejected { + if resp.Status == Rejected { return xerrors.Errorf("storage deal rejected: %s", resp.Message) } return xerrors.New("storage deal response had no Accepted section") @@ -318,7 +313,7 @@ func (cst *clientStream) fetchBlocks(toFetch uint64) error { for i := uint64(0); i < blocksToFetch; { log.Infof("block %d of %d", i+1, blocksToFetch) - var block impl_types.Block + var block Block if err := cborutil.ReadCborRPC(cst.peeker, &block); err != nil { return xerrors.Errorf("reading fetchBlock response: %w", err) } @@ -334,7 +329,7 @@ func (cst *clientStream) fetchBlocks(toFetch uint64) error { return nil } -func (cst *clientStream) consumeBlockMessage(block impl_types.Block) (uint64, error) { +func (cst *clientStream) consumeBlockMessage(block Block) (uint64, error) { prefix, err := cid.PrefixFromBytes(block.Prefix) if err != nil { return 0, err @@ -371,17 +366,17 @@ func (cst *clientStream) consumeBlockMessage(block impl_types.Block) (uint64, er return 1, nil } -func (cst *clientStream) setupPayment(ctx context.Context, toSend tokenamount.TokenAmount) (impl_types.OldPaymentInfo, error) { +func (cst *clientStream) setupPayment(ctx context.Context, toSend tokenamount.TokenAmount) (OldPaymentInfo, error) { amount := tokenamount.Add(cst.transferred, toSend) sv, err := cst.node.CreatePaymentVoucher(ctx, cst.paych, amount, cst.lane) if err != nil { - return impl_types.OldPaymentInfo{}, err + return OldPaymentInfo{}, err } cst.transferred = amount - return impl_types.OldPaymentInfo{ + return OldPaymentInfo{ Channel: cst.paych, ChannelMessage: nil, Vouchers: []*types.SignedVoucher{sv}, diff --git a/retrievalmarket/impl/client_test.go b/retrievalmarket/impl/client_test.go deleted file mode 100644 index e086cc437..000000000 --- a/retrievalmarket/impl/client_test.go +++ /dev/null @@ -1,127 +0,0 @@ -package retrievalimpl_test - -import ( - "context" - "github.com/filecoin-project/go-fil-components/retrievalmarket/network" - "testing" - - "github.com/filecoin-project/go-address" - "github.com/libp2p/go-libp2p-core/peer" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/filecoin-project/go-fil-components/retrievalmarket" - retrievalimpl "github.com/filecoin-project/go-fil-components/retrievalmarket/impl" - tut "github.com/filecoin-project/go-fil-components/retrievalmarket/network/testutil" - "github.com/filecoin-project/go-fil-components/shared_testutil" -) - -func TestClient_Query(t *testing.T) { - ctx := context.Background() - - td := shared_testutil.NewLibp2pTestData(ctx, t) - - pcid := []byte(string("applesauce")) - rpeer := retrievalmarket.RetrievalPeer{ - Address: address.TestAddress2, - ID: td.Host2.ID(), - } - - t.Run("it works", func(t *testing.T) { - net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{ - Host: td.Host1, - Peers: []peer.ID{td.Host2.ID()}, - }) - c := retrievalimpl.NewClient( - retrievalimpl.NewClientParams{ - Host: td.Host1, - Blockstore: td.Bs1, - RCNode: &tut.TestRetrievalNode{}, - RMNet: net, - }) - - resp, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{}) - require.NoError(t, err) - assert.NotNil(t, resp) - assert.Equal(t, retrievalmarket.QueryResponseAvailable, resp.Status) - }) - - t.Run("when the stream returns error, returns error", func(t *testing.T) { - net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{ - Host: td.Host1, - Peers: []peer.ID{td.Host2.ID()}, - QueryStreamBuilder: tut.FailNewQueryStream, - }) - c := retrievalimpl.NewClient( - retrievalimpl.NewClientParams{ - Host: td.Host1, - Blockstore: td.Bs1, - RCNode: &tut.TestRetrievalNode{}, - RMNet: net, - }) - - _, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{}) - assert.EqualError(t, err, "new query stream failed") - }) - - t.Run("when WriteQuery fails, returns error", func(t *testing.T) { - - qsbuilder := func(p peer.ID) (network.RetrievalQueryStream, error) { - newStream := tut.NewTestRetrievalQueryStream(tut.TestQueryStreamParams{ - PeerID: p, - Writer: tut.FailQueryWriter, - }) - return newStream, nil - } - - net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{ - Host: td.Host2, - Peers: []peer.ID{td.Host1.ID()}, - QueryStreamBuilder: qsbuilder, - }) - c := retrievalimpl.NewClient( - retrievalimpl.NewClientParams{ - Host: td.Host1, - Blockstore: td.Bs1, - RCNode: &tut.TestRetrievalNode{}, - RMNet: net, - }) - - statusCode, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{}) - assert.EqualError(t, err, "write query failed") - assert.Equal(t, retrievalmarket.QueryResponseUndefined, statusCode) - }) - - t.Run("when ReadQueryResponse fails, returns error", func(t *testing.T) { - qsbuilder := func(p peer.ID) (network.RetrievalQueryStream, error) { - newStream := tut.NewTestRetrievalQueryStream(tut.TestQueryStreamParams{ - PeerID: p, - RespReader: tut.FailResponseReader, - }) - return newStream, nil - } - net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{ - Host: td.Host2, - Peers: []peer.ID{td.Host1.ID()}, - QueryStreamBuilder: qsbuilder, - }) - c := retrievalimpl.NewClient( - retrievalimpl.NewClientParams{ - Host: td.Host1, - Blockstore: td.Bs1, - RCNode: &tut.TestRetrievalNode{}, - RMNet: net, - }) - - statusCode, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{}) - assert.EqualError(t, err, "query response failed") - assert.Equal(t, retrievalmarket.QueryResponseUndefined, statusCode) - }) - - t.Run("use the mocknet", func(t *testing.T) { - - }) -} - -func TestClient_Retrieve(t *testing.T) { -} diff --git a/retrievalmarket/impl/provider.go b/retrievalmarket/impl/provider.go index fe003f515..006b735df 100644 --- a/retrievalmarket/impl/provider.go +++ b/retrievalmarket/impl/provider.go @@ -2,7 +2,6 @@ package retrievalimpl import ( "context" - "github.com/filecoin-project/go-fil-components/retrievalmarket/impl/impl_types" "io" "reflect" @@ -32,7 +31,6 @@ type UnixfsReader interface { } type provider struct { - // TODO: Replace with RetrievalProviderNode for // https://github.com/filecoin-project/go-retrieval-market-project/issues/4 node retrievalmarket.RetrievalProviderNode @@ -110,8 +108,8 @@ func (p *provider) ListDeals() map[retrievalmarket.ProviderDealID]retrievalmarke func writeErr(stream network.Stream, err error) { log.Errorf("Retrieval deal error: %+v", err) - _ = cborutil.WriteCborRPC(stream, &impl_types.OldDealResponse{ - Status: impl_types.Error, + _ = cborutil.WriteCborRPC(stream, &OldDealResponse{ + Status: Error, Message: err.Error(), }) } @@ -120,7 +118,7 @@ func writeErr(stream network.Stream, err error) { func (p *provider) handleQueryStream(stream network.Stream) { defer stream.Close() - var query impl_types.OldQuery + var query OldQuery if err := cborutil.ReadCborRPC(stream, &query); err != nil { writeErr(stream, err) return @@ -132,11 +130,11 @@ func (p *provider) handleQueryStream(stream network.Stream) { return } - answer := &impl_types.OldQueryResponse{ - Status: impl_types.Unavailable, + answer := &OldQueryResponse{ + Status: Unavailable, } if err == nil { - answer.Status = impl_types.Available + answer.Status = Available // TODO: get price, look for already unsealed ref to reduce work answer.MinPrice = tokenamount.Mul(tokenamount.FromInt(uint64(size)), p.pricePerByte) @@ -183,7 +181,7 @@ func (p *provider) handleDealStream(stream network.Stream) { } func (hnd *handlerDeal) handleNext() (bool, error) { - var deal impl_types.OldDealProposal + var deal OldDealProposal if err := cborutil.ReadCborRPC(hnd.stream, &deal); err != nil { if err == io.EOF { // client sent all deals err = nil @@ -226,7 +224,7 @@ func (hnd *handlerDeal) handleNext() (bool, error) { return true, nil } -func (hnd *handlerDeal) openFile(deal impl_types.OldDealProposal) error { +func (hnd *handlerDeal) openFile(deal OldDealProposal) error { unixfs0 := deal.Params.Unixfs0 if unixfs0.Offset != 0 { @@ -267,11 +265,11 @@ func (hnd *handlerDeal) openFile(deal impl_types.OldDealProposal) error { return nil } -func (hnd *handlerDeal) accept(deal impl_types.OldDealProposal) error { +func (hnd *handlerDeal) accept(deal OldDealProposal) error { unixfs0 := deal.Params.Unixfs0 - resp := &impl_types.OldDealResponse{ - Status: impl_types.Accepted, + resp := &OldDealResponse{ + Status: Accepted, } if err := cborutil.WriteCborRPC(hnd.stream, resp); err != nil { log.Errorf("Retrieval query: Write Accepted resp: %s", err) @@ -296,7 +294,7 @@ func (hnd *handlerDeal) accept(deal impl_types.OldDealProposal) error { return }*/ - block := &impl_types.Block{ + block := &Block{ Prefix: nd.Cid().Prefix().Bytes(), Data: nd.RawData(), } diff --git a/retrievalmarket/impl/impl_types/types.go b/retrievalmarket/impl/types.go similarity index 98% rename from retrievalmarket/impl/impl_types/types.go rename to retrievalmarket/impl/types.go index 486277520..051d4025e 100644 --- a/retrievalmarket/impl/impl_types/types.go +++ b/retrievalmarket/impl/types.go @@ -1,4 +1,4 @@ -package impl_types +package retrievalimpl import ( "github.com/ipfs/go-cid" diff --git a/retrievalmarket/impl/impl_types/types_cbor_gen.go b/retrievalmarket/impl/types_cbor_gen.go similarity index 99% rename from retrievalmarket/impl/impl_types/types_cbor_gen.go rename to retrievalmarket/impl/types_cbor_gen.go index 1bb6bf0a4..2d88f91f4 100644 --- a/retrievalmarket/impl/impl_types/types_cbor_gen.go +++ b/retrievalmarket/impl/types_cbor_gen.go @@ -1,4 +1,4 @@ -package impl_types +package retrievalimpl import ( "fmt" diff --git a/retrievalmarket/network/query_stream.go b/retrievalmarket/network/query_stream.go index e2c81c40e..4bd2a4970 100644 --- a/retrievalmarket/network/query_stream.go +++ b/retrievalmarket/network/query_stream.go @@ -3,7 +3,7 @@ package network import ( cborutil "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/go-fil-components/retrievalmarket" - "github.com/filecoin-project/go-fil-components/retrievalmarket/impl/impl_types" + "github.com/filecoin-project/go-fil-components/retrievalmarket/impl" "github.com/filecoin-project/go-fil-components/shared/tokenamount" "github.com/ipfs/go-cid" p2pnet "github.com/libp2p/go-libp2p-core/network" @@ -27,11 +27,11 @@ func (qs queryStream) WriteQuery(q retrievalmarket.Query) error { return err } - return cborutil.WriteCborRPC(qs.s, &impl_types.OldQuery{Piece: cid}) + return cborutil.WriteCborRPC(qs.s, &retrievalimpl.OldQuery{Piece: cid}) } func (qs queryStream) ReadQueryResponse() (retrievalmarket.QueryResponse, error) { - var oldResp impl_types.OldQueryResponse + var oldResp retrievalimpl.OldQueryResponse if err := oldResp.UnmarshalCBOR(qs.s); err != nil { log.Warn(err) return retrievalmarket.QueryResponseUndefined, err diff --git a/shared/statestore/store.go b/shared/statestore/store.go deleted file mode 100644 index b326f1713..000000000 --- a/shared/statestore/store.go +++ /dev/null @@ -1,164 +0,0 @@ -package statestore - -import ( - "bytes" - "fmt" - "reflect" - - cborutil "github.com/filecoin-project/go-cbor-util" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/query" - cbg "github.com/whyrusleeping/cbor-gen" - "go.uber.org/multierr" - "golang.org/x/xerrors" -) - -type StateStore struct { - ds datastore.Datastore -} - -func New(ds datastore.Datastore) *StateStore { - return &StateStore{ds: ds} -} - -func toKey(k interface{}) datastore.Key { - switch t := k.(type) { - case uint64: - return datastore.NewKey(fmt.Sprint(t)) - case fmt.Stringer: - return datastore.NewKey(t.String()) - default: - panic("unexpected key type") - } -} - -func (st *StateStore) Begin(i interface{}, state interface{}) error { - k := toKey(i) - has, err := st.ds.Has(k) - if err != nil { - return err - } - if has { - return xerrors.Errorf("already tracking state for %v", i) - } - - b, err := cborutil.Dump(state) - if err != nil { - return err - } - - return st.ds.Put(k, b) -} - -func (st *StateStore) End(i interface{}) error { - k := toKey(i) - has, err := st.ds.Has(k) - if err != nil { - return err - } - if !has { - return xerrors.Errorf("No state for %s", i) - } - return st.ds.Delete(k) -} - -func cborMutator(mutator interface{}) func([]byte) ([]byte, error) { - rmut := reflect.ValueOf(mutator) - - return func(in []byte) ([]byte, error) { - state := reflect.New(rmut.Type().In(0).Elem()) - - err := cborutil.ReadCborRPC(bytes.NewReader(in), state.Interface()) - if err != nil { - return nil, err - } - - out := rmut.Call([]reflect.Value{state}) - - if err := out[0].Interface(); err != nil { - return nil, err.(error) - } - - return cborutil.Dump(state.Interface()) - } -} - -// mutator func(*T) error -func (st *StateStore) Mutate(i interface{}, mutator interface{}) error { - return st.mutate(i, cborMutator(mutator)) -} - -func (st *StateStore) mutate(i interface{}, mutator func([]byte) ([]byte, error)) error { - k := toKey(i) - has, err := st.ds.Has(k) - if err != nil { - return err - } - if !has { - return xerrors.Errorf("No state for %s", i) - } - - cur, err := st.ds.Get(k) - if err != nil { - return err - } - - mutated, err := mutator(cur) - if err != nil { - return err - } - - return st.ds.Put(k, mutated) -} - -func (st *StateStore) Has(i interface{}) (bool, error) { - return st.ds.Has(toKey(i)) -} - -func (st *StateStore) Get(i interface{}, out cbg.CBORUnmarshaler) error { - k := toKey(i) - val, err := st.ds.Get(k) - if err != nil { - if xerrors.Is(err, datastore.ErrNotFound) { - return xerrors.Errorf("No state for %s: %w", i, err) - } - return err - } - - return out.UnmarshalCBOR(bytes.NewReader(val)) -} - -// out: *[]T -func (st *StateStore) List(out interface{}) error { - res, err := st.ds.Query(query.Query{}) - if err != nil { - return err - } - defer res.Close() - - outT := reflect.TypeOf(out).Elem().Elem() - rout := reflect.ValueOf(out) - - var errs error - - for { - res, ok := res.NextSync() - if !ok { - break - } - if res.Error != nil { - return res.Error - } - - elem := reflect.New(outT) - err := cborutil.ReadCborRPC(bytes.NewReader(res.Value), elem.Interface()) - if err != nil { - errs = multierr.Append(errs, xerrors.Errorf("decoding state for key '%s': %w", res.Key, err)) - continue - } - - rout.Elem().Set(reflect.Append(rout.Elem(), elem.Elem())) - } - - return nil -} diff --git a/shared/statestore/store_test.go b/shared/statestore/store_test.go deleted file mode 100644 index 6e57d4662..000000000 --- a/shared/statestore/store_test.go +++ /dev/null @@ -1,38 +0,0 @@ -package statestore - -import ( - "testing" - - cborutil "github.com/filecoin-project/go-cbor-util" - "github.com/ipfs/go-datastore" - - "github.com/filecoin-project/go-fil-components/shared/tokenamount" -) - -func TestList(t *testing.T) { - ds := datastore.NewMapDatastore() - - e, err := cborutil.Dump(tokenamount.FromInt(7)) - if err != nil { - t.Fatal(err) - } - - if err := ds.Put(datastore.NewKey("/2"), e); err != nil { - t.Fatal(err) - } - - st := &StateStore{ds: ds} - - var out []tokenamount.TokenAmount - if err := st.List(&out); err != nil { - t.Fatal(err) - } - - if len(out) != 1 { - t.Fatal("wrong len") - } - - if out[0].Int64() != 7 { - t.Fatal("wrong data") - } -}