Skip to content

Commit

Permalink
Merge pull request #117 from filecoin-project/feat/offline-deal-flow-1
Browse files Browse the repository at this point in the history
update interfaces to work with offline deal flow
  • Loading branch information
whyrusleeping committed Feb 11, 2020
2 parents 973498b + a22dafa commit 4e8b26b
Show file tree
Hide file tree
Showing 13 changed files with 223 additions and 71 deletions.
2 changes: 1 addition & 1 deletion shared_testutil/test_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func MakeTestSignedStorageAsk() *types.SignedStorageAsk {
func MakeTestStorageNetworkProposal() smnet.Proposal {
return smnet.Proposal{
DealProposal: MakeTestStorageDealProposal(),
Piece: GenerateCids(1)[0],
Piece: &storagemarket.DataRef{Root: GenerateCids(1)[0]},
}
}

Expand Down
10 changes: 5 additions & 5 deletions storagemarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"

"github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-data-transfer"

cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/ipfs/go-cid"
Expand All @@ -14,6 +13,7 @@ import (
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/filestore"
"github.com/filecoin-project/go-fil-markets/pieceio"
"github.com/filecoin-project/go-fil-markets/pieceio/cario"
Expand Down Expand Up @@ -170,7 +170,7 @@ func (c *Client) onUpdated(ctx context.Context, update clientDealUpdate) {
}

type ClientDealProposal struct {
Data cid.Cid
Data *storagemarket.DataRef

PricePerEpoch tokenamount.TokenAmount
ProposalExpiration uint64
Expand All @@ -188,7 +188,7 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro
return cid.Undef, xerrors.Errorf("adding market funds failed: %w", err)
}

commP, pieceSize, err := c.commP(ctx, p.Data)
commP, pieceSize, err := c.commP(ctx, p.Data.Root)
if err != nil {
return cid.Undef, xerrors.Errorf("computing commP failed: %w", err)
}
Expand Down Expand Up @@ -230,15 +230,15 @@ func (c *Client) Start(ctx context.Context, p ClientDealProposal) (cid.Cid, erro
State: storagemarket.StorageDealUnknown,
Miner: p.MinerID,
MinerWorker: p.MinerWorker,
PayloadCid: p.Data,
DataRef: p.Data,
},

s: s,
}

c.incoming <- deal

return deal.ProposalCid, c.discovery.AddPeer(p.Data, retrievalmarket.RetrievalPeer{
return deal.ProposalCid, c.discovery.AddPeer(p.Data.Root, retrievalmarket.RetrievalPeer{
Address: dealProposal.Provider,
ID: deal.Miner,
})
Expand Down
11 changes: 0 additions & 11 deletions storagemarket/impl/client_cbor_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions storagemarket/impl/client_storagemarket.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ func (c *Client) GetAsk(ctx context.Context, info storagemarket.StorageProviderI
return c.QueryAsk(ctx, info.PeerID, info.Address)
}

func (c *Client) ProposeStorageDeal(ctx context.Context, addr address.Address, info *storagemarket.StorageProviderInfo, payloadCid cid.Cid, proposalExpiration storagemarket.Epoch, duration storagemarket.Epoch, price tokenamount.TokenAmount, collateral tokenamount.TokenAmount) (*storagemarket.ProposeStorageDealResult, error) {
func (c *Client) ProposeStorageDeal(ctx context.Context, addr address.Address, info *storagemarket.StorageProviderInfo, data *storagemarket.DataRef, proposalExpiration storagemarket.Epoch, duration storagemarket.Epoch, price tokenamount.TokenAmount, collateral tokenamount.TokenAmount) (*storagemarket.ProposeStorageDealResult, error) {

proposal := ClientDealProposal{
Data: payloadCid,
Data: data,
PricePerEpoch: price,
ProposalExpiration: uint64(proposalExpiration),
Duration: uint64(duration),
Expand Down
5 changes: 3 additions & 2 deletions storagemarket/impl/client_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"golang.org/x/xerrors"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/go-data-transfer"
)

func (c *Client) failDeal(id cid.Cid, cerr error) {
Expand Down Expand Up @@ -134,10 +134,11 @@ func (c *ClientRequestValidator) ValidatePull(
if err != nil {
return xerrors.Errorf("Proposal CID %s: %w", dealVoucher.Proposal.String(), ErrNoDeal)
}

if deal.Miner != receiver {
return xerrors.Errorf("Deal Peer %s, Data Transfer Peer %s: %w", deal.Miner.String(), receiver.String(), ErrWrongPeer)
}
if !deal.PayloadCid.Equals(baseCid) {
if !deal.DataRef.Root.Equals(baseCid) {
return xerrors.Errorf("Deal Payload CID %s, Data Transfer CID %s: %w", string(deal.Proposal.PieceRef), baseCid.String(), ErrWrongPiece)
}
for _, state := range DataTransferStates {
Expand Down
51 changes: 50 additions & 1 deletion storagemarket/impl/provider.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package storageimpl

import (
"bytes"
"context"
"errors"
"io"
"sync"

"github.com/ipfs/go-cid"
Expand All @@ -13,6 +15,7 @@ import (

"github.com/filecoin-project/go-address"
cborutil "github.com/filecoin-project/go-cbor-util"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/filestore"
"github.com/filecoin-project/go-fil-markets/pieceio"
"github.com/filecoin-project/go-fil-markets/pieceio/cario"
Expand All @@ -22,7 +25,6 @@ import (
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/go-data-transfer"
)

var ProviderDsPrefix = "/deals/provider"
Expand Down Expand Up @@ -292,3 +294,50 @@ func (p *Provider) Stop() error {
<-p.stopped
return p.net.StopHandlingRequests()
}

func (p *Provider) ImportDataForDeal(ctx context.Context, propCid cid.Cid, data io.Reader) error {
// TODO: be able to check if we have enough disk space
var d MinerDeal
if err := p.deals.Get(propCid).Get(&d); err != nil {
return xerrors.Errorf("failed getting deal %s: %w", propCid, err)
}

tempfi, err := p.fs.CreateTemp()
if err != nil {
return xerrors.Errorf("failed to create temp file for data import: %w", err)
}

n, err := io.Copy(tempfi, data)
if err != nil {
return xerrors.Errorf("importing deal data failed: %w", err)
}
_ = n // TODO: verify n?

_, err = tempfi.Seek(0, io.SeekStart)
if err != nil {
return xerrors.Errorf("failed to seek through temp imported file: %w", err)
}

commP, err := p.pio.ReadPiece(tempfi)
if err != nil {
return xerrors.Errorf("failed to generate commP")
}

if !bytes.Equal(commP.Bytes(), d.Proposal.PieceRef) {
return xerrors.Errorf("given data does not match expected commP (got: %x, expected %x)", commP.Bytes(), d.Proposal.PieceRef)
}

select {
case p.updated <- minerDealUpdate{
newState: storagemarket.StorageDealPublishing,
id: propCid,
mut: func(deal *MinerDeal) {
deal.PiecePath = tempfi.Path()
},
}:
case <-ctx.Done():
return ctx.Err()
}

return nil
}
11 changes: 8 additions & 3 deletions storagemarket/impl/provider_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ func (p *Provider) validating(ctx context.Context, deal MinerDeal) (func(*MinerD

// State: StorageDealTransferring
func (p *Provider) transferring(ctx context.Context, deal MinerDeal) (func(*MinerDeal), error) {
if deal.Ref.TransferType == storagemarket.TTManual {
log.Info("deal entering manual transfer state")
return nil, nil
}

ssb := builder.NewSelectorSpecBuilder(ipldfree.NodeBuilder())

// this is the selector for "get the whole DAG"
Expand All @@ -93,7 +98,7 @@ func (p *Provider) transferring(ctx context.Context, deal MinerDeal) (func(*Mine
_, err := p.dataTransfer.OpenPullDataChannel(ctx,
deal.Client,
&StorageDataTransferVoucher{Proposal: deal.ProposalCid},
deal.Ref,
deal.Ref.Root,
allSelector,
)
if err != nil {
Expand All @@ -110,7 +115,7 @@ func (p *Provider) verifydata(ctx context.Context, deal MinerDeal) (func(*MinerD
allSelector := ssb.ExploreRecursive(selector.RecursionLimitNone(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()

commp, path, _, err := p.pio.GeneratePieceCommitmentToFile(deal.Ref, allSelector)
commp, path, _, err := p.pio.GeneratePieceCommitmentToFile(deal.Ref.Root, allSelector)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -225,7 +230,7 @@ func (p *Provider) complete(ctx context.Context, deal MinerDeal) (func(*MinerDea
}
// TODO: Record actual block locations for all CIDs in piece by improving car writing
err = p.pieceStore.AddPieceBlockLocations(deal.Proposal.PieceRef, map[cid.Cid]piecestore.BlockLocation{
deal.Ref: {},
deal.Ref.Root: {},
})
if err != nil {
return nil, err
Expand Down
5 changes: 3 additions & 2 deletions storagemarket/impl/provider_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (

"github.com/ipld/go-ipld-prime"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-data-transfer"

cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-statestore"
Expand Down Expand Up @@ -139,6 +139,7 @@ func NewProviderRequestValidator(deals *statestore.StateStore) *ProviderRequestV
// - referenced deal matches the client
// - referenced deal matches the given base CID
// - referenced deal is in an acceptable state
// TODO: maybe this should accept a dataref?
func (m *ProviderRequestValidator) ValidatePush(
sender peer.ID,
voucher datatransfer.Voucher,
Expand All @@ -158,7 +159,7 @@ func (m *ProviderRequestValidator) ValidatePush(
return xerrors.Errorf("Deal Peer %s, Data Transfer Peer %s: %w", deal.Client.String(), sender.String(), ErrWrongPeer)
}

if !deal.Ref.Equals(baseCid) {
if !deal.Ref.Root.Equals(baseCid) {
return xerrors.Errorf("Deal Payload CID %s, Data Transfer CID %s: %w", string(deal.Proposal.PieceRef), baseCid.String(), ErrWrongPiece)
}
for _, state := range DataTransferStates {
Expand Down
20 changes: 11 additions & 9 deletions storagemarket/impl/request_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
xerrors "golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-cbor-util"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-fil-markets/shared/types"
"github.com/filecoin-project/go-fil-markets/storagemarket"
deals "github.com/filecoin-project/go-fil-markets/storagemarket/impl"
Expand Down Expand Up @@ -76,7 +76,9 @@ func newClientDeal(minerID peer.ID, state storagemarket.StorageDealStatus) (deal
ClientDeal: storagemarket.ClientDeal{
Proposal: newProposal,
ProposalCid: proposalNd.Cid(),
PayloadCid: blockGenerator.Next().Cid(),
DataRef: &storagemarket.DataRef{
Root: blockGenerator.Next().Cid(),
},
Miner: minerID,
MinerWorker: minerAddr,
State: state,
Expand All @@ -101,7 +103,7 @@ func newMinerDeal(clientID peer.ID, state storagemarket.StorageDealStatus) (deal
ProposalCid: proposalNd.Cid(),
Client: clientID,
State: state,
Ref: ref,
Ref: &storagemarket.DataRef{Root: ref},
},
}, nil
}
Expand Down Expand Up @@ -144,7 +146,7 @@ func TestClientRequestValidation(t *testing.T) {
if err := state.Begin(clientDeal.ProposalCid, &clientDeal); err != nil {
t.Fatal("deal tracking failed")
}
payloadCid := clientDeal.PayloadCid
payloadCid := clientDeal.DataRef.Root
if !xerrors.Is(crv.ValidatePull(minerID, &deals.StorageDataTransferVoucher{clientDeal.ProposalCid}, payloadCid, nil), deals.ErrWrongPeer) {
t.Fatal("Pull should fail if miner address is incorrect")
}
Expand All @@ -169,7 +171,7 @@ func TestClientRequestValidation(t *testing.T) {
if err := state.Begin(clientDeal.ProposalCid, &clientDeal); err != nil {
t.Fatal("deal tracking failed")
}
payloadCid := clientDeal.PayloadCid
payloadCid := clientDeal.DataRef.Root
if !xerrors.Is(crv.ValidatePull(minerID, &deals.StorageDataTransferVoucher{clientDeal.ProposalCid}, payloadCid, nil), deals.ErrInacceptableDealState) {
t.Fatal("Pull should fail if deal is in a state that cannot be data transferred")
}
Expand All @@ -182,7 +184,7 @@ func TestClientRequestValidation(t *testing.T) {
if err := state.Begin(clientDeal.ProposalCid, &clientDeal); err != nil {
t.Fatal("deal tracking failed")
}
payloadCid := clientDeal.PayloadCid
payloadCid := clientDeal.DataRef.Root
if crv.ValidatePull(minerID, &deals.StorageDataTransferVoucher{clientDeal.ProposalCid}, payloadCid, nil) != nil {
t.Fatal("Pull should should succeed when all parameters are correct")
}
Expand Down Expand Up @@ -229,7 +231,7 @@ func TestProviderRequestValidation(t *testing.T) {
t.Fatal("deal tracking failed")
}
ref := minerDeal.Ref
if !xerrors.Is(mrv.ValidatePush(clientID, &deals.StorageDataTransferVoucher{minerDeal.ProposalCid}, ref, nil), deals.ErrWrongPeer) {
if !xerrors.Is(mrv.ValidatePush(clientID, &deals.StorageDataTransferVoucher{minerDeal.ProposalCid}, ref.Root, nil), deals.ErrWrongPeer) {
t.Fatal("Push should fail if miner address is incorrect")
}
})
Expand All @@ -254,7 +256,7 @@ func TestProviderRequestValidation(t *testing.T) {
t.Fatal("deal tracking failed")
}
ref := minerDeal.Ref
if !xerrors.Is(mrv.ValidatePush(clientID, &deals.StorageDataTransferVoucher{minerDeal.ProposalCid}, ref, nil), deals.ErrInacceptableDealState) {
if !xerrors.Is(mrv.ValidatePush(clientID, &deals.StorageDataTransferVoucher{minerDeal.ProposalCid}, ref.Root, nil), deals.ErrInacceptableDealState) {
t.Fatal("Push should fail if deal is in a state that cannot be data transferred")
}
})
Expand All @@ -267,7 +269,7 @@ func TestProviderRequestValidation(t *testing.T) {
t.Fatal("deal tracking failed")
}
ref := minerDeal.Ref
if mrv.ValidatePush(clientID, &deals.StorageDataTransferVoucher{minerDeal.ProposalCid}, ref, nil) != nil {
if mrv.ValidatePush(clientID, &deals.StorageDataTransferVoucher{minerDeal.ProposalCid}, ref.Root, nil) != nil {
t.Fatal("Push should should succeed when all parameters are correct")
}
})
Expand Down
2 changes: 1 addition & 1 deletion storagemarket/network/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
type Proposal struct {
DealProposal *storagemarket.StorageDealProposal

Piece cid.Cid // Used for retrieving from the client
Piece *storagemarket.DataRef
}

var ProposalUndefined = Proposal{}
Expand Down
Loading

0 comments on commit 4e8b26b

Please sign in to comment.