From d53916912168f66b64db8beb28b5344eb77aac4c Mon Sep 17 00:00:00 2001 From: Peter Rabbitson Date: Fri, 4 Jun 2021 11:20:34 +0000 Subject: [PATCH] Expose basic text-based datamodel selector on retrieval Syntaxt of selection is located at https://pkg.go.dev/github.com/ipld/go-ipld-selector-text-lite#SelectorSpecFromPath Example use, assuming that: - The root of the deal is a plain dag-pb unixfs directory - The directory is not sharded - The user wants to retrieve the first entry in that directory lotus client retrieve --miner f0XXXXX --datamodel-path-selector 'Links/0/Hash' bafyROOTCID ~/output --- api/api_full.go | 8 ++-- cli/client.go | 9 ++++ go.mod | 4 +- go.sum | 5 +- markets/utils/selectors.go | 96 ++++++++++++++++++++++++++++++++++++++ node/impl/client/client.go | 57 ++++++++++++++++++++-- 6 files changed, 170 insertions(+), 9 deletions(-) create mode 100644 markets/utils/selectors.go diff --git a/api/api_full.go b/api/api_full.go index e524906e319..328c2fe01b0 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -7,6 +7,7 @@ import ( "time" "github.com/ipfs/go-cid" + textselector "github.com/ipld/go-ipld-selector-text-lite" "github.com/libp2p/go-libp2p-core/peer" "github.com/filecoin-project/go-address" @@ -896,9 +897,10 @@ type MarketDeal struct { type RetrievalOrder struct { // TODO: make this less unixfs specific - Root cid.Cid - Piece *cid.Cid - Size uint64 + Root cid.Cid + Piece *cid.Cid + DatamodelPathSelector *textselector.Expression + Size uint64 LocalStore *multistore.StoreID // if specified, get data from local store // TODO: support offset diff --git a/cli/client.go b/cli/client.go index 84e0779435e..b270b730e46 100644 --- a/cli/client.go +++ b/cli/client.go @@ -27,6 +27,7 @@ import ( "github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/ipfs/go-cid" "github.com/ipfs/go-cidutil/cidenc" + textselector "github.com/ipld/go-ipld-selector-text-lite" "github.com/libp2p/go-libp2p-core/peer" "github.com/multiformats/go-multibase" "github.com/urfave/cli/v2" @@ -1044,6 +1045,10 @@ var clientRetrieveCmd = &cli.Command{ Name: "miner", Usage: "miner address for retrieval, if not present it'll use local discovery", }, + &cli.StringFlag{ + Name: "datamodel-path-selector", + Usage: "a rudimentary (DM-level-only) text-path selector, allowing for sub-selection within a deal", + }, &cli.StringFlag{ Name: "maxPrice", Usage: fmt.Sprintf("maximum price the client is willing to consider (default: %s FIL)", DefaultMaxRetrievePrice), @@ -1179,6 +1184,10 @@ var clientRetrieveCmd = &cli.Command{ IsCAR: cctx.Bool("car"), } + if sel := textselector.Expression(cctx.String("datamodel-path-selector")); sel != "" { + order.DatamodelPathSelector = &sel + } + updates, err := fapi.ClientRetrieveWithEvents(ctx, *order, ref) if err != nil { return xerrors.Errorf("error setting up retrieval: %w", err) diff --git a/go.mod b/go.mod index b6a03a39d23..02a8b9f896c 100644 --- a/go.mod +++ b/go.mod @@ -96,7 +96,9 @@ require ( github.com/ipfs/go-unixfs v0.2.4 github.com/ipfs/interface-go-ipfs-core v0.2.3 github.com/ipld/go-car v0.3.1-0.20210601190600-f512dac51e8e - github.com/ipld/go-ipld-prime v0.9.1-0.20210324083106-dc342a9917db + github.com/ipld/go-codec-dagpb v1.2.0 + github.com/ipld/go-ipld-prime v0.10.0 + github.com/ipld/go-ipld-selector-text-lite v0.0.0-20210603153726-3feaa7573d47 github.com/kelseyhightower/envconfig v1.4.0 github.com/lib/pq v1.7.0 github.com/libp2p/go-buffer-pool v0.0.2 diff --git a/go.sum b/go.sum index a4799b9219b..e866b613d56 100644 --- a/go.sum +++ b/go.sum @@ -735,11 +735,14 @@ github.com/ipld/go-ipld-prime v0.0.2-0.20200428162820-8b59dc292b8e/go.mod h1:uVI github.com/ipld/go-ipld-prime v0.5.1-0.20200828233916-988837377a7f/go.mod h1:0xEgdD6MKbZ1vF0GC+YcR/C4SQCAlRuOjIJ2i0HxqzM= github.com/ipld/go-ipld-prime v0.5.1-0.20201021195245-109253e8a018/go.mod h1:0xEgdD6MKbZ1vF0GC+YcR/C4SQCAlRuOjIJ2i0HxqzM= github.com/ipld/go-ipld-prime v0.9.0/go.mod h1:KvBLMr4PX1gWptgkzRjVZCrLmSGcZCb/jioOQwCqZN8= -github.com/ipld/go-ipld-prime v0.9.1-0.20210324083106-dc342a9917db h1:kFwGn8rXa/Z31ev1OFNQsYeNKNCdifnTPl/NvPy5L38= github.com/ipld/go-ipld-prime v0.9.1-0.20210324083106-dc342a9917db/go.mod h1:KvBLMr4PX1gWptgkzRjVZCrLmSGcZCb/jioOQwCqZN8= +github.com/ipld/go-ipld-prime v0.10.0 h1:ZCd52SDUqvA3YUJEx9v2uIm1qWv6FAxBt2mhiFpoZ6s= +github.com/ipld/go-ipld-prime v0.10.0/go.mod h1:KvBLMr4PX1gWptgkzRjVZCrLmSGcZCb/jioOQwCqZN8= github.com/ipld/go-ipld-prime-proto v0.0.0-20200428191222-c1ffdadc01e1/go.mod h1:OAV6xBmuTLsPZ+epzKkPB1e25FHk/vCtyatkdHcArLs= github.com/ipld/go-ipld-prime-proto v0.0.0-20200922192210-9a2bfd4440a6/go.mod h1:3pHYooM9Ea65jewRwrb2u5uHZCNkNTe9ABsVB+SrkH0= github.com/ipld/go-ipld-prime-proto v0.1.0/go.mod h1:11zp8f3sHVgIqtb/c9Kr5ZGqpnCLF1IVTNOez9TopzE= +github.com/ipld/go-ipld-selector-text-lite v0.0.0-20210603153726-3feaa7573d47 h1:Dy6oM0nWaolcCLAxgTGge59SaqDC2b4WgoMUN9fmW8E= +github.com/ipld/go-ipld-selector-text-lite v0.0.0-20210603153726-3feaa7573d47/go.mod h1:LOfTTmw0lAwWTfPMrL8QZv4vgaOAhzo/0GWl0K51oTU= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4= github.com/jackpal/gateway v1.0.4/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA= diff --git a/markets/utils/selectors.go b/markets/utils/selectors.go new file mode 100644 index 00000000000..b3f88519e40 --- /dev/null +++ b/markets/utils/selectors.go @@ -0,0 +1,96 @@ +package utils + +import ( + "bytes" + "context" + "fmt" + "io" + + // must be imported to init() raw-codec support + _ "github.com/ipld/go-ipld-prime/codec/raw" + + "github.com/ipfs/go-cid" + mdagipld "github.com/ipfs/go-ipld-format" + dagpb "github.com/ipld/go-codec-dagpb" + "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + basicnode "github.com/ipld/go-ipld-prime/node/basic" + "github.com/ipld/go-ipld-prime/traversal" + "github.com/ipld/go-ipld-prime/traversal/selector" + "github.com/ipld/go-ipld-prime/traversal/selector/builder" +) + +func TraverseDag( + ctx context.Context, + ds mdagipld.DAGService, + startFrom cid.Cid, + optionalSelector ipld.Node, + visitCallback traversal.AdvVisitFn, +) error { + + // If no selector is given - use *.* + // See discusion at https://github.com/ipld/go-ipld-prime/issues/171 + if optionalSelector == nil { + ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) + optionalSelector = ssb.ExploreRecursive( + selector.RecursionLimitNone(), + ssb.ExploreUnion( + ssb.Matcher(), + ssb.ExploreAll(ssb.ExploreRecursiveEdge()), + ), + ).Node() + } + + parsedSelector, err := selector.ParseSelector(optionalSelector) + if err != nil { + return err + } + + // not sure what this is for TBH... + linkContext := ipld.LinkContext{Ctx: ctx} + + // this is what allows us to understand dagpb + nodePrototypeChooser := dagpb.AddSupportToChooser( + func(ipld.Link, ipld.LinkContext) (ipld.NodePrototype, error) { + return basicnode.Prototype.Any, nil + }, + ) + + // this is how we implement GETs + linkSystem := cidlink.DefaultLinkSystem() + linkSystem.StorageReadOpener = func(_ ipld.LinkContext, lnk ipld.Link) (io.Reader, error) { + if cl, isCid := lnk.(cidlink.Link); !isCid { + return nil, fmt.Errorf("unexpected link type %#v", lnk) + } else { + node, err := ds.Get(context.TODO(), cl.Cid) + if err != nil { + return nil, err + } + return bytes.NewBuffer(node.RawData()), nil + } + } + + // this is how we pull the start node out of the DS + startLink := cidlink.Link{Cid: startFrom} + startNodePrototype, err := nodePrototypeChooser(startLink, linkContext) + if err != nil { + return err + } + startNode, err := linkSystem.Load( + linkContext, + startLink, + startNodePrototype, + ) + if err != nil { + return err + } + + // this is the actual execution, invoking the supplied callback + return traversal.Progress{ + Cfg: &traversal.Config{ + Ctx: ctx, + LinkSystem: linkSystem, + LinkTargetNodePrototypeChooser: nodePrototypeChooser, + }, + }.WalkAdv(startNode, parsedSelector, visitCallback) +} diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 283c3203273..b089a956fe0 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -22,14 +22,19 @@ import ( offline "github.com/ipfs/go-ipfs-exchange-offline" files "github.com/ipfs/go-ipfs-files" mdagipld "github.com/ipfs/go-ipld-format" + logging "github.com/ipfs/go-log/v2" "github.com/ipfs/go-merkledag" unixfile "github.com/ipfs/go-unixfs/file" "github.com/ipfs/go-unixfs/importer/balanced" ihelper "github.com/ipfs/go-unixfs/importer/helpers" "github.com/ipld/go-car" + "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" basicnode "github.com/ipld/go-ipld-prime/node/basic" + "github.com/ipld/go-ipld-prime/traversal" "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/ipld/go-ipld-prime/traversal/selector/builder" + textselector "github.com/ipld/go-ipld-selector-text-lite" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" "github.com/multiformats/go-multibase" @@ -43,7 +48,6 @@ import ( datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-fil-markets/discovery" rm "github.com/filecoin-project/go-fil-markets/retrievalmarket" - "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket/network" "github.com/filecoin-project/go-multistore" @@ -64,6 +68,8 @@ import ( "github.com/filecoin-project/lotus/node/repo/retrievalstoremgr" ) +var log = logging.Logger("client") + var DefaultHashFunction = uint64(mh.BLAKE2B_MIN + 31) const dealStartBufferHours uint64 = 49 @@ -728,9 +734,27 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref return err }*/ + // FIXME - this is a direct copy from https://github.com/filecoin-project/go-fil-markets/blob/v1.4.0/shared/selectors.go#L11-L16 + // Unable to use it because we need the SelectorSpec, and markets exposes just a reified node + ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) + selspec := ssb.ExploreRecursive( + selector.RecursionLimitNone(), + ssb.ExploreAll(ssb.ExploreRecursiveEdge()), + ) + + if order.DatamodelPathSelector != nil { + var err error + selspec, err = textselector.SelectorSpecFromPath(*order.DatamodelPathSelector, selspec) + if err != nil { + finish(xerrors.Errorf("failed to parse selector '%s': %w", *order.DatamodelPathSelector, err)) + return + } + log.Infof("partial retrieval of datamodel-path-selector %s/*", *order.DatamodelPathSelector) + } + ppb := types.BigDiv(order.Total, types.NewInt(order.Size)) - params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, shared.AllSelector(), order.Piece, order.UnsealPrice) + params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, selspec.Node(), order.Piece, order.UnsealPrice) if err != nil { finish(xerrors.Errorf("Error in retrieval params: %s", err)) return @@ -805,13 +829,38 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref rdag := store.DAGService() + root := order.Root + if order.DatamodelPathSelector != nil { + // no err check - we just compiled this before starting, but now we do not wrap a `*` + selspec, _ := textselector.SelectorSpecFromPath(*order.DatamodelPathSelector, nil) //nolint:errcheck + if err := utils.TraverseDag( + ctx, + rdag, + root, + selspec.Node(), + func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error { + if r == traversal.VisitReason_SelectionMatch { + cidLnk, castOK := p.LastBlock.Link.(cidlink.Link) + if !castOK { + return xerrors.Errorf("cidlink cast unexpectedly failed on '%s'", p.LastBlock.Link.String()) + } + root = cidLnk.Cid + } + return nil + }, + ); err != nil { + finish(xerrors.Errorf("Finding partial retrieval sub-root: %w", err)) + return + } + } + if ref.IsCAR { f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644) if err != nil { finish(err) return } - err = car.WriteCar(ctx, rdag, []cid.Cid{order.Root}, f) + err = car.WriteCar(ctx, rdag, []cid.Cid{root}, f) if err != nil { finish(err) return @@ -820,7 +869,7 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref return } - nd, err := rdag.Get(ctx, order.Root) + nd, err := rdag.Get(ctx, root) if err != nil { finish(xerrors.Errorf("ClientRetrieve: %w", err)) return