Skip to content

Commit

Permalink
Merge pull request #7306 from filecoin-project/chore/prep-retrieval-f…
Browse files Browse the repository at this point in the history
…or-selectors_no-func-changes

Prep retrieval for selectors: no functional changes
  • Loading branch information
jennijuju committed Sep 9, 2021
2 parents 92c12f9 + 2c5f438 commit 1ce7c25
Showing 1 changed file with 59 additions and 48 deletions.
107 changes: 59 additions & 48 deletions node/impl/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
unixfile "github.com/ipfs/go-unixfs/file"
"github.com/ipld/go-car"
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/blockstore"
carv2bs "github.com/ipld/go-car/v2/blockstore"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-padreader"
Expand All @@ -41,7 +41,6 @@ import (
datatransfer "github.com/filecoin-project/go-data-transfer"

"github.com/filecoin-project/go-fil-markets/discovery"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
rm "github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket"
Expand Down Expand Up @@ -91,7 +90,7 @@ type API struct {
// accessors for imports and retrievals.
Imports dtypes.ClientImportMgr
StorageBlockstoreAccessor storagemarket.BlockstoreAccessor
RtvlBlockstoreAccessor retrievalmarket.BlockstoreAccessor
RtvlBlockstoreAccessor rm.BlockstoreAccessor

DataTransfer dtypes.ClientDataTransfer
Host host.Host
Expand Down Expand Up @@ -619,7 +618,7 @@ func (a *API) ClientImportLocal(ctx context.Context, r io.Reader) (cid.Cid, erro
return cid.Undef, xerrors.Errorf("failed to calculate placeholder root: %w", err)
}

bs, err := blockstore.OpenReadWrite(path, []cid.Cid{placeholderRoot}, blockstore.UseWholeCIDs(true))
bs, err := carv2bs.OpenReadWrite(path, []cid.Cid{placeholderRoot}, carv2bs.UseWholeCIDs(true))
if err != nil {
return cid.Undef, xerrors.Errorf("failed to create carv2 read/write blockstore: %w", err)
}
Expand Down Expand Up @@ -730,7 +729,7 @@ func (a *API) ClientListImports(_ context.Context) ([]api.Import, error) {
return out, nil
}

func (a *API) ClientCancelRetrievalDeal(ctx context.Context, dealID retrievalmarket.DealID) error {
func (a *API) ClientCancelRetrievalDeal(ctx context.Context, dealID rm.DealID) error {
cerr := make(chan error)
go func() {
err := a.Retrieval.CancelDeal(dealID)
Expand Down Expand Up @@ -784,7 +783,7 @@ type retrievalSubscribeEvent struct {
state rm.ClientDealState
}

func consumeAllEvents(ctx context.Context, dealID retrievalmarket.DealID, subscribeEvents chan retrievalSubscribeEvent, events chan marketevents.RetrievalEvent) error {
func consumeAllEvents(ctx context.Context, dealID rm.DealID, subscribeEvents chan retrievalSubscribeEvent, events chan marketevents.RetrievalEvent) error {
for {
var subscribeEvent retrievalSubscribeEvent
select {
Expand Down Expand Up @@ -835,24 +834,29 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
}
}

sel := shared.AllSelector()

// summary:
// 1. if we're retrieving from an import, FromLocalCAR will be informed.
// Open as a Filestore and populate the target CAR or UnixFS export from it.
// (cannot use ExtractV1File because user wants a dense CAR, not a ref CAR/filestore)
// 1. if we're retrieving from an import, FromLocalCAR will be set.
// Skip the retrieval itself, and use the provided car as a blockstore further down
// to extract a CAR or UnixFS export from.
// 2. if we're using an IPFS blockstore for retrieval, retrieve into it,
// then extract the CAR or UnixFS export from it.
// 3. if we have to retrieve, perform a CARv2 retrieval, then extract
// the CARv1 (with ExtractV1File) or UnixFS export from it.
// then use the virtual blockstore to extract a CAR or UnixFS export from it.
// 3. if we have to retrieve, perform a CARv2 retrieval, then either
// extract the CARv1 (with ExtractV1File) or use it as a blockstore further down.

// this indicates we're proxying to IPFS.
proxyBss, retrieveIntoIPFS := a.RtvlBlockstoreAccessor.(*retrievaladapter.ProxyBlockstoreAccessor)

carBss, retrieveIntoCAR := a.RtvlBlockstoreAccessor.(*retrievaladapter.CARBlockstoreAccessor)

carPath := order.FromLocalCAR

// we actually need to retrieve from the network
if carPath == "" {

if !retrieveIntoIPFS && !retrieveIntoCAR {
// we actually need to retrieve from the network, but we don't
// recognize the blockstore accessor.
// we don't recognize the blockstore accessor.
finish(xerrors.Errorf("unsupported retrieval blockstore accessor"))
return
}
Expand All @@ -864,7 +868,7 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
return
}

order.MinerPeer = &retrievalmarket.RetrievalPeer{
order.MinerPeer = &rm.RetrievalPeer{
ID: *mi.PeerId,
Address: order.Miner,
}
Expand All @@ -882,7 +886,7 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref

ppb := types.BigDiv(order.Total, types.NewInt(order.Size))

params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, shared.AllSelector(), order.Piece, order.UnsealPrice)
params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, sel, order.Piece, order.UnsealPrice)
if err != nil {
finish(xerrors.Errorf("Error in retrieval params: %s", err))
return
Expand Down Expand Up @@ -940,56 +944,63 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
return
}

// determine where did the retrieval go
var retrievalBs bstore.Blockstore
if retrieveIntoIPFS {
retrievalBs = proxyBss.Blockstore
} else {
cbs, err := stores.ReadOnlyFilestore(carPath)
if err != nil {
finish(err)
return
}
defer cbs.Close() //nolint:errcheck
retrievalBs = cbs
}

// Are we outputting a CAR?
if ref.IsCAR {
if retrieveIntoIPFS {
// generating a CARv1 from IPFS.
f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
finish(err)
return
}

bs := proxyBss.Blockstore
dags := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
err = car.WriteCar(ctx, dags, []cid.Cid{order.Root}, f)
if err != nil {
finish(err)
return
}
finish(f.Close())
// not IPFS - just extract the CARv1 from the CARv2 we stored the retrieval in
if !retrieveIntoIPFS {
finish(carv2.ExtractV1File(carPath, ref.Path))
return
}

// generating a CARv1 from the CARv2 where we stored the retrieval.
err := carv2.ExtractV1File(carPath, ref.Path)
finish(err)
return
}
// generating a CARv1 from the configured blockstore
f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
finish(err)
return
}

// we are extracting a UnixFS file.
var bs bstore.Blockstore
if retrieveIntoIPFS {
bs = proxyBss.Blockstore
} else {
cbs, err := stores.ReadOnlyFilestore(carPath)
err = car.NewSelectiveCar(
ctx,
retrievalBs,
[]car.Dag{{
Root: order.Root,
Selector: sel,
}},
).Write(f)
if err != nil {
finish(err)
return
}
defer cbs.Close() //nolint:errcheck
bs = cbs

finish(f.Close())
return
}

bsvc := blockservice.New(bs, offline.Exchange(bs))
dag := merkledag.NewDAGService(bsvc)
// we are extracting a UnixFS file.
ds := merkledag.NewDAGService(blockservice.New(retrievalBs, offline.Exchange(retrievalBs)))
root := order.Root

nd, err := dag.Get(ctx, order.Root)
nd, err := ds.Get(ctx, root)
if err != nil {
finish(xerrors.Errorf("ClientRetrieve: %w", err))
return
}
file, err := unixfile.NewUnixfsFile(ctx, dag, nd)
file, err := unixfile.NewUnixfsFile(ctx, ds, nd)
if err != nil {
finish(xerrors.Errorf("ClientRetrieve: %w", err))
return
Expand Down

0 comments on commit 1ce7c25

Please sign in to comment.