Skip to content

Commit

Permalink
Extract common parts, add selector support to filc retrieve too
Browse files Browse the repository at this point in the history
  • Loading branch information
ribasushi committed Jul 11, 2021
1 parent 9c074df commit 699ea2f
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 63 deletions.
38 changes: 35 additions & 3 deletions filclient/filc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ import (
lcli "github.com/filecoin-project/lotus/cli"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
chunker "github.com/ipfs/go-ipfs-chunker"
logging "github.com/ipfs/go-log"
"github.com/ipfs/go-merkledag"
"github.com/ipfs/go-unixfs/importer"
"github.com/ipld/go-ipld-prime"
textselector "github.com/ipld/go-ipld-selector-text-lite"
"github.com/mitchellh/go-homedir"
cli "github.com/urfave/cli/v2"
"github.com/whyrusleeping/estuary/lib/retrievehelper"
Expand Down Expand Up @@ -336,7 +339,15 @@ var listDealsCmd = &cli.Command{
var retrieveFileCmd = &cli.Command{
Name: "retrieve",
Flags: []cli.Flag{
&cli.StringFlag{Name: "miner", Aliases: []string{"m"}, Required: true},
&cli.StringFlag{
Name: "miner",
Aliases: []string{"m"},
Required: true,
},
&cli.StringFlag{
Name: "datamodel-path-selector",
Usage: "a rudimentary (DM-level-only) text-path selector, allowing for sub-selection within a deal",
},
},
Action: func(cctx *cli.Context) error {
ctx := context.Background()
Expand All @@ -361,6 +372,16 @@ var retrieveFileCmd = &cli.Command{
return err
}

var dagSubselect ipld.Node
selText := textselector.Expression(cctx.String("datamodel-path-selector"))
if selText != "" {
selSpec, err := textselector.SelectorSpecFromPath(selText, retrievehelper.RecurseAllSelectorBuilder)
if err != nil {
return xerrors.Errorf("unable to parse selector '%s': %w", selText, err)
}
dagSubselect = selSpec.Node()
}

ddir := ddir(cctx)

fc, closer, err := getClient(cctx, ddir)
Expand All @@ -369,12 +390,12 @@ var retrieveFileCmd = &cli.Command{
}
defer closer()

ask, err := fc.RetrievalQuery(ctx, miner, c, nil)
ask, err := fc.RetrievalQuery(ctx, miner, c, dagSubselect)
if err != nil {
return err
}

proposal, err := retrievehelper.RetrievalProposalForAsk(ask, c, nil)
proposal, err := retrievehelper.RetrievalProposalForAsk(ask, c, dagSubselect)
if err != nil {
return err
}
Expand All @@ -384,7 +405,17 @@ var retrieveFileCmd = &cli.Command{
return err
}

if selText != "" {
// FIXME - do we get some access to things here? or just delete the entire block...
var blockstoreTakenFromSomewhere blockstore.Blockstore
c, err = retrievehelper.ResolvePath(ctx, blockstoreTakenFromSomewhere, c, selText)
if err != nil {
return err
}
}

fmt.Println("retrieved content")
fmt.Println("Root CID: ", c.String())
fmt.Println("Total Payment: ", stats.TotalPayment)
fmt.Println("Num Payments: ", stats.NumPayments)
fmt.Println("Size: ", stats.Size)
Expand Down Expand Up @@ -432,6 +463,7 @@ var queryRetrievalCmd = &cli.Command{
}
defer closer()

// TODO - pass in a selector once miners know how to respond to one
query, err := fc.RetrievalQuery(context.TODO(), miner, cid, nil)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion filclient/filclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ func (fc *FilClient) CheckOngoingTransfer(ctx context.Context, miner address.Add

}

func (fc *FilClient) RetrievalQuery(ctx context.Context, maddr address.Address, pcid cid.Cid, optionalSelector ipld.Node) (*retrievalmarket.QueryResponse, error) {
func (fc *FilClient) RetrievalQuery(ctx context.Context, maddr address.Address, pcid cid.Cid, optionalSelectorCurrentlyIgnored ipld.Node) (*retrievalmarket.QueryResponse, error) {
ctx, span := Tracer.Start(ctx, "retrievalQuery", trace.WithAttributes(
attribute.Stringer("miner", maddr),
))
Expand Down
72 changes: 58 additions & 14 deletions selector.go → lib/retrievehelper/selector.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,76 @@
package main
package retrievehelper

import (
"bytes"
"context"
"fmt"
"io"

"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
exchangeoffline "github.com/ipfs/go-ipfs-exchange-offline"
mdagipld "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
"golang.org/x/xerrors"

// must be imported to init() raw-codec support
_ "github.com/ipld/go-ipld-prime/codec/raw"

"github.com/ipfs/go-cid"
mdagipld "github.com/ipfs/go-ipld-format"
dagpb "github.com/ipld/go-codec-dagpb"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
ipldbasicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
selectorbuilder "github.com/ipld/go-ipld-prime/traversal/selector/builder"
textselector "github.com/ipld/go-ipld-selector-text-lite"
)

var ssb = selectorbuilder.NewSelectorSpecBuilder(ipldbasicnode.Prototype.Any)
var RecurseAllSelectorBuilder = ssb.ExploreRecursive(
selector.RecursionLimitNone(),
ssb.ExploreUnion(
ssb.Matcher(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge()),
),
)

func ResolvePath(ctx context.Context, bs blockstore.Blockstore, startCid cid.Cid, path textselector.Expression) (cid.Cid, error) {

selSpec, err := textselector.SelectorSpecFromPath(path, nil)
if err != nil {
return cid.Undef, err
}

var subDagFound bool

if err := TraverseDag(
ctx,
merkledag.NewDAGService(blockservice.New(bs, exchangeoffline.Exchange(bs))),
startCid,
selSpec.Node(),
func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error {
if r == traversal.VisitReason_SelectionMatch {
cidLnk, castOK := p.LastBlock.Link.(cidlink.Link)
if !castOK {
return xerrors.Errorf("cidlink cast unexpectedly failed on '%s'", p.LastBlock.Link.String())
}
startCid = cidLnk.Cid
subDagFound = true
return traversal.SkipMe{}
}
return nil
},
); err != nil {
return cid.Undef, err
}
if !subDagFound {
return cid.Undef, xerrors.Errorf("path selection '%s' does not match a node within %s", path, startCid)
}
return startCid, nil
}

func TraverseDag(
ctx context.Context,
ds mdagipld.DAGService,
Expand All @@ -31,14 +82,7 @@ func TraverseDag(
// If no selector is given - use *.*
// See discusion at https://github.com/ipld/go-ipld-prime/issues/171
if optionalSelector == nil {
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
optionalSelector = ssb.ExploreRecursive(
selector.RecursionLimitNone(),
ssb.ExploreUnion(
ssb.Matcher(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge()),
),
).Node()
optionalSelector = RecurseAllSelectorBuilder.Node()
}

parsedSelector, err := selector.ParseSelector(optionalSelector)
Expand All @@ -52,7 +96,7 @@ func TraverseDag(
// this is what allows us to understand dagpb
nodePrototypeChooser := dagpb.AddSupportToChooser(
func(ipld.Link, ipld.LinkContext) (ipld.NodePrototype, error) {
return basicnode.Prototype.Any, nil
return ipldbasicnode.Prototype.Any, nil
},
)

Expand Down
50 changes: 5 additions & 45 deletions replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,17 @@ import (
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/specs-actors/actors/builtin/market"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
exchangeoffline "github.com/ipfs/go-ipfs-exchange-offline"
batched "github.com/ipfs/go-ipfs-provider/batched"
ipldformat "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
"github.com/ipfs/go-unixfs"
ipld "github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
ipldbasicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal"
"github.com/ipld/go-ipld-prime/traversal/selector"
selectorbuilder "github.com/ipld/go-ipld-prime/traversal/selector/builder"
textselector "github.com/ipld/go-ipld-selector-text-lite"
"github.com/labstack/echo/v4"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/whyrusleeping/estuary/filclient"
"github.com/whyrusleeping/estuary/lib/retrievehelper"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/xerrors"
Expand Down Expand Up @@ -1916,14 +1909,7 @@ func (cm *ContentManager) runRetrieval(ctx context.Context, contentToFetch uint)
return cid.Undef, err
}

ssb := selectorbuilder.NewSelectorSpecBuilder(ipldbasicnode.Prototype.Any)
selSpecDag, err := textselector.SelectorSpecFromPath(
pathSelection,
ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreUnion(
ssb.Matcher(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge()),
)),
)
selSpecDag, err := textselector.SelectorSpecFromPath(pathSelection, retrievehelper.RecurseAllSelectorBuilder)
if err != nil {
return cid.Undef, err
}
Expand Down Expand Up @@ -1979,35 +1965,9 @@ func (cm *ContentManager) runRetrieval(ctx context.Context, contentToFetch uint)

// we sub-selected: need to find the new root
if pathSelection != "" {

// no error checks - we just compiled this earlier
selSpecRoot, _ := textselector.SelectorSpecFromPath(pathSelection, nil)

var newRootFound bool

if err := TraverseDag(
ctx,
merkledag.NewDAGService(blockservice.New(cm.Blockstore, exchangeoffline.Exchange(cm.Blockstore))),
rootCid,
selSpecRoot.Node(),
func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error {
if r == traversal.VisitReason_SelectionMatch {
cidLnk, castOK := p.LastBlock.Link.(cidlink.Link)
if !castOK {
return xerrors.Errorf("cidlink cast unexpectedly failed on '%s'", p.LastBlock.Link.String())
}
rootCid = cidLnk.Cid
newRootFound = true
return traversal.SkipMe{}
}
return nil
},
); err != nil {
return cid.Undef, xerrors.Errorf("Finding partial retrieval sub-root: %w", err)
}

if !newRootFound {
return cid.Undef, xerrors.Errorf("Path selection '%s' does not match a node within %s", pathSelection, rootCid)
rootCid, err = retrievehelper.ResolvePath(ctx, cm.Blockstore, rootCid, pathSelection)
if err != nil {
return cid.Undef, err
}
}

Expand Down

0 comments on commit 699ea2f

Please sign in to comment.