Skip to content

Commit

Permalink
updates after rebase, plus pulling scope back
Browse files Browse the repository at this point in the history
  • Loading branch information
shannonwells committed Jan 2, 2020
1 parent 975c00b commit 4da6c81
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 398 deletions.
13 changes: 0 additions & 13 deletions retrievalmarket/README.md

This file was deleted.

69 changes: 32 additions & 37 deletions retrievalmarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-components/retrievalmarket"
"github.com/filecoin-project/go-fil-components/retrievalmarket/impl/impl_types"
rmnet "github.com/filecoin-project/go-fil-components/retrievalmarket/network"
"github.com/filecoin-project/go-fil-components/shared/params"
"github.com/filecoin-project/go-fil-components/shared/tokenamount"
"github.com/filecoin-project/go-fil-components/shared/types"
Expand All @@ -36,29 +34,12 @@ type client struct {

nextDealLk sync.Mutex
nextDealID retrievalmarket.DealID
rmnet rmnet.RetrievalMarketNetwork
subscribers []retrievalmarket.ClientSubscriber
}

type NewClientParams struct {
Host host.Host
Blockstore blockstore.Blockstore
RCNode retrievalmarket.RetrievalClientNode
RMNet rmnet.RetrievalMarketNetwork
}

// NewClient creates a new retrieval client
func NewClient(clientParams NewClientParams) retrievalmarket.RetrievalClient {
client := &client{
h: clientParams.Host,
bs: clientParams.Blockstore,
node: clientParams.RCNode,
rmnet: rmnet.NewFromLibp2pHost(clientParams.Host),
}
if clientParams.RMNet != nil {
client.rmnet = clientParams.RMNet
}
return client
func NewClient(h host.Host, bs blockstore.Blockstore, node retrievalmarket.RetrievalClientNode) retrievalmarket.RetrievalClient {
return &client{h: h, bs: bs, node: node}
}

// V0
Expand All @@ -72,24 +53,38 @@ func (c *client) FindProviders(pieceCID []byte) []retrievalmarket.RetrievalPeer
// TODO: Update to match spec for V0 epic
// https://github.com/filecoin-project/go-retrieval-market-project/issues/8
func (c *client) Query(ctx context.Context, p retrievalmarket.RetrievalPeer, pieceCID []byte, params retrievalmarket.QueryParams) (retrievalmarket.QueryResponse, error) {
s, err := c.rmnet.NewQueryStream(p.ID)
cid, err := cid.Cast(pieceCID)
if err != nil {
log.Warn(err)
return retrievalmarket.QueryResponseUndefined, err
}

s, err := c.h.NewStream(ctx, p.ID, retrievalmarket.QueryProtocolID)
if err != nil {
log.Warn(err)
return retrievalmarket.QueryResponseUndefined, err
}
defer s.Close()

err = s.WriteQuery(retrievalmarket.Query{PieceCID: pieceCID})
err = cborutil.WriteCborRPC(s, &OldQuery{
Piece: cid,
})
if err != nil {
log.Warn(err)
return retrievalmarket.QueryResponseUndefined, err
}

var resp retrievalmarket.QueryResponse
if resp, err = s.ReadQueryResponse(); err != nil {
var oldResp OldQueryResponse
if err := oldResp.UnmarshalCBOR(s); err != nil {
log.Warn(err)
return retrievalmarket.QueryResponseUndefined, err
}

resp := retrievalmarket.QueryResponse{
Status: retrievalmarket.QueryResponseStatus(oldResp.Status),
Size: oldResp.Size,
MinPricePerByte: tokenamount.Div(oldResp.MinPrice, tokenamount.FromInt(oldResp.Size)),
}
return resp, nil
}

Expand Down Expand Up @@ -272,11 +267,11 @@ func (cst *clientStream) doOneExchange(ctx context.Context, toFetch uint64) erro
return xerrors.Errorf("setting up retrieval payment: %w", err)
}

deal := &impl_types.OldDealProposal{
deal := &OldDealProposal{
Payment: payment,
Ref: cst.root,
Params: impl_types.RetParams{
Unixfs0: &impl_types.Unixfs0Offer{
Params: RetParams{
Unixfs0: &Unixfs0Offer{
Offset: cst.offset,
Size: toFetch,
},
Expand All @@ -287,19 +282,19 @@ func (cst *clientStream) doOneExchange(ctx context.Context, toFetch uint64) erro
return xerrors.Errorf("sending incremental retrieval request: %w", err)
}

var resp impl_types.OldDealResponse
var resp OldDealResponse
if err := cborutil.ReadCborRPC(cst.peeker, &resp); err != nil {
return xerrors.Errorf("reading retrieval response: %w", err)
}

if resp.Status != impl_types.Accepted {
if resp.Status != Accepted {
cst.windowSize = params.UnixfsChunkSize
// TODO: apply some 'penalty' to miner 'reputation' (needs to be the same in both cases)

if resp.Status == impl_types.Error {
if resp.Status == Error {
return xerrors.Errorf("storage deal error: %s", resp.Message)
}
if resp.Status == impl_types.Rejected {
if resp.Status == Rejected {
return xerrors.Errorf("storage deal rejected: %s", resp.Message)
}
return xerrors.New("storage deal response had no Accepted section")
Expand All @@ -318,7 +313,7 @@ func (cst *clientStream) fetchBlocks(toFetch uint64) error {
for i := uint64(0); i < blocksToFetch; {
log.Infof("block %d of %d", i+1, blocksToFetch)

var block impl_types.Block
var block Block
if err := cborutil.ReadCborRPC(cst.peeker, &block); err != nil {
return xerrors.Errorf("reading fetchBlock response: %w", err)
}
Expand All @@ -334,7 +329,7 @@ func (cst *clientStream) fetchBlocks(toFetch uint64) error {
return nil
}

func (cst *clientStream) consumeBlockMessage(block impl_types.Block) (uint64, error) {
func (cst *clientStream) consumeBlockMessage(block Block) (uint64, error) {
prefix, err := cid.PrefixFromBytes(block.Prefix)
if err != nil {
return 0, err
Expand Down Expand Up @@ -371,17 +366,17 @@ func (cst *clientStream) consumeBlockMessage(block impl_types.Block) (uint64, er
return 1, nil
}

func (cst *clientStream) setupPayment(ctx context.Context, toSend tokenamount.TokenAmount) (impl_types.OldPaymentInfo, error) {
func (cst *clientStream) setupPayment(ctx context.Context, toSend tokenamount.TokenAmount) (OldPaymentInfo, error) {
amount := tokenamount.Add(cst.transferred, toSend)

sv, err := cst.node.CreatePaymentVoucher(ctx, cst.paych, amount, cst.lane)
if err != nil {
return impl_types.OldPaymentInfo{}, err
return OldPaymentInfo{}, err
}

cst.transferred = amount

return impl_types.OldPaymentInfo{
return OldPaymentInfo{
Channel: cst.paych,
ChannelMessage: nil,
Vouchers: []*types.SignedVoucher{sv},
Expand Down
127 changes: 0 additions & 127 deletions retrievalmarket/impl/client_test.go

This file was deleted.

26 changes: 12 additions & 14 deletions retrievalmarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package retrievalimpl

import (
"context"
"github.com/filecoin-project/go-fil-components/retrievalmarket/impl/impl_types"
"io"
"reflect"

Expand Down Expand Up @@ -32,7 +31,6 @@ type UnixfsReader interface {
}

type provider struct {

// TODO: Replace with RetrievalProviderNode for
// https://github.com/filecoin-project/go-retrieval-market-project/issues/4
node retrievalmarket.RetrievalProviderNode
Expand Down Expand Up @@ -110,8 +108,8 @@ func (p *provider) ListDeals() map[retrievalmarket.ProviderDealID]retrievalmarke

func writeErr(stream network.Stream, err error) {
log.Errorf("Retrieval deal error: %+v", err)
_ = cborutil.WriteCborRPC(stream, &impl_types.OldDealResponse{
Status: impl_types.Error,
_ = cborutil.WriteCborRPC(stream, &OldDealResponse{
Status: Error,
Message: err.Error(),
})
}
Expand All @@ -120,7 +118,7 @@ func writeErr(stream network.Stream, err error) {
func (p *provider) handleQueryStream(stream network.Stream) {
defer stream.Close()

var query impl_types.OldQuery
var query OldQuery
if err := cborutil.ReadCborRPC(stream, &query); err != nil {
writeErr(stream, err)
return
Expand All @@ -132,11 +130,11 @@ func (p *provider) handleQueryStream(stream network.Stream) {
return
}

answer := &impl_types.OldQueryResponse{
Status: impl_types.Unavailable,
answer := &OldQueryResponse{
Status: Unavailable,
}
if err == nil {
answer.Status = impl_types.Available
answer.Status = Available

// TODO: get price, look for already unsealed ref to reduce work
answer.MinPrice = tokenamount.Mul(tokenamount.FromInt(uint64(size)), p.pricePerByte)
Expand Down Expand Up @@ -183,7 +181,7 @@ func (p *provider) handleDealStream(stream network.Stream) {
}

func (hnd *handlerDeal) handleNext() (bool, error) {
var deal impl_types.OldDealProposal
var deal OldDealProposal
if err := cborutil.ReadCborRPC(hnd.stream, &deal); err != nil {
if err == io.EOF { // client sent all deals
err = nil
Expand Down Expand Up @@ -226,7 +224,7 @@ func (hnd *handlerDeal) handleNext() (bool, error) {
return true, nil
}

func (hnd *handlerDeal) openFile(deal impl_types.OldDealProposal) error {
func (hnd *handlerDeal) openFile(deal OldDealProposal) error {
unixfs0 := deal.Params.Unixfs0

if unixfs0.Offset != 0 {
Expand Down Expand Up @@ -267,11 +265,11 @@ func (hnd *handlerDeal) openFile(deal impl_types.OldDealProposal) error {
return nil
}

func (hnd *handlerDeal) accept(deal impl_types.OldDealProposal) error {
func (hnd *handlerDeal) accept(deal OldDealProposal) error {
unixfs0 := deal.Params.Unixfs0

resp := &impl_types.OldDealResponse{
Status: impl_types.Accepted,
resp := &OldDealResponse{
Status: Accepted,
}
if err := cborutil.WriteCborRPC(hnd.stream, resp); err != nil {
log.Errorf("Retrieval query: Write Accepted resp: %s", err)
Expand All @@ -296,7 +294,7 @@ func (hnd *handlerDeal) accept(deal impl_types.OldDealProposal) error {
return
}*/

block := &impl_types.Block{
block := &Block{
Prefix: nd.Cid().Prefix().Bytes(),
Data: nd.RawData(),
}
Expand Down
Loading

0 comments on commit 4da6c81

Please sign in to comment.