From 3faaa31f00bd4d42847f22b7d66f2d007e17b6dd Mon Sep 17 00:00:00 2001 From: hunjixin <1084400399@qq.com> Date: Thu, 23 Dec 2021 14:44:56 +0800 Subject: [PATCH] fix retrieval/merge retrieve cmd change/remove unused code --- api/api.go | 10 +- api/impl/market_clent.go | 3 + api/proxy_gen.go | 34 +- client/client.go | 527 ++++++++++++------ client/client_test.go | 6 +- client/types.go | 54 +- cmd/market-client/retrieval.go | 216 +------ cmd/venus-market/solo-run.go | 1 - go.mod | 3 + go.sum | 30 +- models/badger/retrieval_deal.go | 8 +- models/badger/storage_deal.go | 1 + retrievalprovider/client.go | 4 - .../provider_datatransfer_sub.go | 2 - retrievalprovider/revalidator.go | 2 - utils/selectors.go | 89 +++ 16 files changed, 528 insertions(+), 462 deletions(-) create mode 100644 utils/selectors.go diff --git a/api/api.go b/api/api.go index ad6578a0..8cf3b97c 100644 --- a/api/api.go +++ b/api/api.go @@ -27,7 +27,6 @@ import ( "github.com/filecoin-project/venus-market/client" "github.com/filecoin-project/venus-market/imports" "github.com/filecoin-project/venus-market/types" - "github.com/filecoin-project/venus-market/utils" ) //mock for gen @@ -180,10 +179,11 @@ type MarketClientNode interface { // ClientMinerQueryOffer returns a QueryOffer for the specific miner and file. ClientMinerQueryOffer(ctx context.Context, miner address.Address, root cid.Cid, piece *cid.Cid) (client.QueryOffer, error) //perm:read // ClientRetrieve initiates the retrieval of a file, as specified in the order. - ClientRetrieve(ctx context.Context, order client.RetrievalOrder, ref *client.FileRef) error //perm:admin - // ClientRetrieveWithEvents initiates the retrieval of a file, as specified in the order, and provides a channel - // of status updates. - ClientRetrieveWithEvents(ctx context.Context, order client.RetrievalOrder, ref *client.FileRef) (<-chan utils.RetrievalEvent, error) //perm:admin + ClientRetrieve(ctx context.Context, params client.RetrievalOrder) (*client.RestrievalRes, error) //perm:admin + // ClientRetrieveWait waits for retrieval to be complete + ClientRetrieveWait(ctx context.Context, deal retrievalmarket.DealID) error //perm:admin + // ClientExport exports a file stored in the local filestore to a system file + ClientExport(ctx context.Context, exportRef client.ExportRef, fileRef client.FileRef) error //perm:admin // ClientListRetrievals returns information about retrievals made by the local client ClientListRetrievals(ctx context.Context) ([]client.RetrievalInfo, error) //perm:write // ClientGetRetrievalUpdates returns status of updated retrieval deals diff --git a/api/impl/market_clent.go b/api/impl/market_clent.go index 7c205fa8..c7242098 100644 --- a/api/impl/market_clent.go +++ b/api/impl/market_clent.go @@ -2,6 +2,7 @@ package impl import ( "context" + "github.com/filecoin-project/venus-market/api" clients2 "github.com/filecoin-project/venus-market/api/clients" "github.com/filecoin-project/venus-market/client" mTypes "github.com/filecoin-project/venus-messager/types" @@ -11,6 +12,8 @@ import ( "github.com/ipfs/go-cid" ) +var _ api.MarketClientNode = (*MarketClientNodeImpl)(nil) + type MarketClientNodeImpl struct { client.API FundAPI diff --git a/api/proxy_gen.go b/api/proxy_gen.go index 2b2314bd..ecc1e6b8 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -4,8 +4,6 @@ package api import ( "context" - "time" - "github.com/filecoin-project/go-address" datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-fil-markets/piecestore" @@ -16,7 +14,6 @@ import ( "github.com/filecoin-project/venus-market/client" "github.com/filecoin-project/venus-market/imports" "github.com/filecoin-project/venus-market/types" - "github.com/filecoin-project/venus-market/utils" mTypes "github.com/filecoin-project/venus-messager/types" "github.com/filecoin-project/venus/app/submodule/apitypes" vTypes "github.com/filecoin-project/venus/pkg/types" @@ -25,6 +22,7 @@ import ( "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p-core/peer" "golang.org/x/xerrors" + "time" ) type MarketClientNodeStruct struct { @@ -41,6 +39,8 @@ type MarketClientNodeStruct struct { ClientDealSize func(p0 context.Context, p1 cid.Cid) (client.DataSize, error) `perm:"read"` + ClientExport func(p0 context.Context, p1 client.ExportRef, p2 client.FileRef) error `perm:"admin"` + ClientFindData func(p0 context.Context, p1 cid.Cid, p2 *cid.Cid) ([]client.QueryOffer, error) `perm:"read"` ClientGenCar func(p0 context.Context, p1 client.FileRef, p2 string) error `perm:"write"` @@ -73,11 +73,11 @@ type MarketClientNodeStruct struct { ClientRestartDataTransfer func(p0 context.Context, p1 datatransfer.TransferID, p2 peer.ID, p3 bool) error `perm:"write"` - ClientRetrieve func(p0 context.Context, p1 client.RetrievalOrder, p2 *client.FileRef) error `perm:"admin"` + ClientRetrieve func(p0 context.Context, p1 client.RetrievalOrder) (*client.RestrievalRes, error) `perm:"admin"` ClientRetrieveTryRestartInsufficientFunds func(p0 context.Context, p1 address.Address) error `perm:"write"` - ClientRetrieveWithEvents func(p0 context.Context, p1 client.RetrievalOrder, p2 *client.FileRef) (<-chan utils.RetrievalEvent, error) `perm:"admin"` + ClientRetrieveWait func(p0 context.Context, p1 retrievalmarket.DealID) error `perm:"admin"` ClientStartDeal func(p0 context.Context, p1 *client.StartDealParams) (*cid.Cid, error) `perm:"admin"` @@ -293,6 +293,14 @@ func (s *MarketClientNodeStub) ClientDealSize(p0 context.Context, p1 cid.Cid) (c return *new(client.DataSize), xerrors.New("method not supported") } +func (s *MarketClientNodeStruct) ClientExport(p0 context.Context, p1 client.ExportRef, p2 client.FileRef) error { + return s.Internal.ClientExport(p0, p1, p2) +} + +func (s *MarketClientNodeStub) ClientExport(p0 context.Context, p1 client.ExportRef, p2 client.FileRef) error { + return xerrors.New("method not supported") +} + func (s *MarketClientNodeStruct) ClientFindData(p0 context.Context, p1 cid.Cid, p2 *cid.Cid) ([]client.QueryOffer, error) { return s.Internal.ClientFindData(p0, p1, p2) } @@ -421,12 +429,12 @@ func (s *MarketClientNodeStub) ClientRestartDataTransfer(p0 context.Context, p1 return xerrors.New("method not supported") } -func (s *MarketClientNodeStruct) ClientRetrieve(p0 context.Context, p1 client.RetrievalOrder, p2 *client.FileRef) error { - return s.Internal.ClientRetrieve(p0, p1, p2) +func (s *MarketClientNodeStruct) ClientRetrieve(p0 context.Context, p1 client.RetrievalOrder) (*client.RestrievalRes, error) { + return s.Internal.ClientRetrieve(p0, p1) } -func (s *MarketClientNodeStub) ClientRetrieve(p0 context.Context, p1 client.RetrievalOrder, p2 *client.FileRef) error { - return xerrors.New("method not supported") +func (s *MarketClientNodeStub) ClientRetrieve(p0 context.Context, p1 client.RetrievalOrder) (*client.RestrievalRes, error) { + return nil, xerrors.New("method not supported") } func (s *MarketClientNodeStruct) ClientRetrieveTryRestartInsufficientFunds(p0 context.Context, p1 address.Address) error { @@ -437,12 +445,12 @@ func (s *MarketClientNodeStub) ClientRetrieveTryRestartInsufficientFunds(p0 cont return xerrors.New("method not supported") } -func (s *MarketClientNodeStruct) ClientRetrieveWithEvents(p0 context.Context, p1 client.RetrievalOrder, p2 *client.FileRef) (<-chan utils.RetrievalEvent, error) { - return s.Internal.ClientRetrieveWithEvents(p0, p1, p2) +func (s *MarketClientNodeStruct) ClientRetrieveWait(p0 context.Context, p1 retrievalmarket.DealID) error { + return s.Internal.ClientRetrieveWait(p0, p1) } -func (s *MarketClientNodeStub) ClientRetrieveWithEvents(p0 context.Context, p1 client.RetrievalOrder, p2 *client.FileRef) (<-chan utils.RetrievalEvent, error) { - return nil, xerrors.New("method not supported") +func (s *MarketClientNodeStub) ClientRetrieveWait(p0 context.Context, p1 retrievalmarket.DealID) error { + return xerrors.New("method not supported") } func (s *MarketClientNodeStruct) ClientStartDeal(p0 context.Context, p1 *client.StartDealParams) (*cid.Cid, error) { diff --git a/client/client.go b/client/client.go index 17410410..7105bb7b 100644 --- a/client/client.go +++ b/client/client.go @@ -6,10 +6,17 @@ import ( "context" "fmt" "github.com/filecoin-project/venus-auth/log" + format "github.com/ipfs/go-ipld-format" + "github.com/ipld/go-car/util" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/datamodel" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/traversal" selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" "io" "os" "sort" + "strings" "time" bstore "github.com/ipfs/go-ipfs-blockstore" @@ -41,7 +48,6 @@ import ( "github.com/filecoin-project/go-commp-utils/ffiwrapper" "github.com/filecoin-project/go-commp-utils/writer" datatransfer "github.com/filecoin-project/go-data-transfer" - "github.com/filecoin-project/go-fil-markets/discovery" "github.com/filecoin-project/go-fil-markets/retrievalmarket" rm "github.com/filecoin-project/go-fil-markets/retrievalmarket" @@ -50,6 +56,7 @@ import ( "github.com/filecoin-project/go-fil-markets/stores" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/specs-actors/v3/actors/builtin/market" + textselector "github.com/ipld/go-ipld-selector-text-lite" "github.com/filecoin-project/venus-market/config" "github.com/filecoin-project/venus-market/imports" @@ -748,249 +755,405 @@ func (a *API) ClientCancelRetrievalDeal(ctx context.Context, dealID retrievalmar } } -func (a *API) ClientRetrieve(ctx context.Context, order RetrievalOrder, ref *FileRef) error { - events := make(chan utils.RetrievalEvent) - go a.clientRetrieve(ctx, order, ref, events) +func (a *API) ClientRetrieve(ctx context.Context, params RetrievalOrder) (*RestrievalRes, error) { + sel, err := getDataSelector(params.DataSelector, false) + if err != nil { + return nil, err + } - for { - select { - case evt, ok := <-events: - if !ok { // done successfully - return nil - } + di, err := a.doRetrieval(ctx, params, sel) + if err != nil { + return nil, err + } - if evt.Err != "" { - return xerrors.Errorf("retrieval failed: %s", evt.Err) - } - case <-ctx.Done(): - return xerrors.Errorf("retrieval timed out") + return &RestrievalRes{ + DealID: di, + }, nil +} + +func (a *API) doRetrieval(ctx context.Context, order RetrievalOrder, sel datamodel.Node) (rm.DealID, error) { + if order.MinerPeer == nil || order.MinerPeer.ID == "" { + mi, err := a.Full.StateMinerInfo(ctx, order.Miner, types.EmptyTSK) + if err != nil { + return 0, err + } + + order.MinerPeer = &rm.RetrievalPeer{ + ID: *mi.PeerId, + Address: order.Miner, } } -} -func (a *API) ClientRetrieveWithEvents(ctx context.Context, order RetrievalOrder, ref *FileRef) (<-chan utils.RetrievalEvent, error) { - events := make(chan utils.RetrievalEvent) - go a.clientRetrieve(ctx, order, ref, events) - return events, nil -} + if order.Total.Int == nil { + return 0, xerrors.Errorf("cannot make retrieval deal for null total") + } + + if order.Size == 0 { + return 0, xerrors.Errorf("cannot make retrieval deal for zero bytes") + } -type retrievalSubscribeEvent struct { - event rm.ClientEvent - state rm.ClientDealState + ppb := types.BigDiv(order.Total, types.NewInt(order.Size)) + + params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, sel, order.Piece, order.UnsealPrice) + if err != nil { + return 0, xerrors.Errorf("Error in retrieval params: %s", err) + } + + id := a.Retrieval.NextID() + id, err = a.Retrieval.Retrieve( + ctx, + id, + order.Root, + params, + order.Total, + *order.MinerPeer, + order.Client, + order.Miner, + ) + + if err != nil { + return 0, xerrors.Errorf("Retrieve failed: %w", err) + } + + return id, nil } -func consumeAllEvents(ctx context.Context, dealID retrievalmarket.DealID, subscribeEvents chan retrievalSubscribeEvent, events chan utils.RetrievalEvent) error { - for { - var subscribeEvent retrievalSubscribeEvent +func (a *API) ClientRetrieveWait(ctx context.Context, deal rm.DealID) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + subscribeEvents := make(chan rm.ClientDealState, 1) + + unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) { + // We'll check the deal IDs inside consumeAllEvents. + if state.ID != deal { + return + } select { case <-ctx.Done(): - return xerrors.New("Retrieval Timed Out") - case subscribeEvent = <-subscribeEvents: - if subscribeEvent.state.ID != dealID { - // we can't check the deal ID ahead of time because: - // 1. We need to subscribe before retrieving. - // 2. We won't know the deal ID until after retrieving. - continue - } + case subscribeEvents <- state: } + }) + defer unsubscribe() + { + state, err := a.Retrieval.GetDeal(deal) + if err != nil { + return xerrors.Errorf("getting deal state: %w", err) + } select { - case <-ctx.Done(): - return xerrors.New("Retrieval Timed Out") - case events <- utils.RetrievalEvent{ - Event: subscribeEvent.event, - Status: subscribeEvent.state.Status, - BytesReceived: subscribeEvent.state.TotalReceived, - FundsSpent: subscribeEvent.state.FundsSpent, - }: + case subscribeEvents <- state: + default: // already have an event queued from the subscription } + } - state := subscribeEvent.state - switch state.Status { - case rm.DealStatusCompleted: - return nil - case rm.DealStatusRejected: - return xerrors.Errorf("Retrieval Proposal Rejected: %s", state.Message) - case rm.DealStatusCancelled: - return xerrors.Errorf("Retrieval was cancelled externally: %s", state.Message) - case - rm.DealStatusDealNotFound, - rm.DealStatusErrored: - return xerrors.Errorf("Retrieval Error: %s", state.Message) + for { + select { + case <-ctx.Done(): + return xerrors.New("Retrieval Timed Out") + case state := <-subscribeEvents: + switch state.Status { + case rm.DealStatusCompleted: + return nil + case rm.DealStatusRejected: + return xerrors.Errorf("Retrieval Proposal Rejected: %s", state.Message) + case rm.DealStatusCancelled: + return xerrors.Errorf("Retrieval was cancelled externally: %s", state.Message) + case + rm.DealStatusDealNotFound, + rm.DealStatusErrored: + return xerrors.Errorf("Retrieval Error: %s", state.Message) + } } } } -func (a *API) clientRetrieve(ctx context.Context, order RetrievalOrder, ref *FileRef, events chan utils.RetrievalEvent) { - defer close(events) +type ExportDest struct { + Writer io.Writer + Path string +} - finish := func(e error) { - if e != nil { - events <- utils.RetrievalEvent{Err: e.Error(), FundsSpent: big.Zero()} - } +func (ed *ExportDest) doWrite(cb func(io.Writer) error) error { + if ed.Writer != nil { + return cb(ed.Writer) + } + + f, err := os.OpenFile(ed.Path, os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err } - // summary: - // 1. if we're retrieving from an import, FromLocalCAR will be informed. - // Open as a Filestore and populate the target CAR or UnixFS export from it. - // (cannot use ExtractV1File because user wants a dense CAR, not a ref CAR/filestore) - // 2. if we're using an IPFS blockstore for retrieval, retrieve into it, - // then extract the CAR or UnixFS export from it. - // 3. if we have to retrieve, perform a CARv2 retrieval, then extract - // the CARv1 (with ExtractV1File) or UnixFS export from it. + if err := cb(f); err != nil { + _ = f.Close() + return err + } - // this indicates we're proxying to IPFS. + return f.Close() +} + +func (a *API) ClientExport(ctx context.Context, exportRef ExportRef, ref FileRef) error { + return a.ClientExportInto(ctx, exportRef, ref.IsCAR, ExportDest{Path: ref.Path}) +} + +func (a *API) ClientExportInto(ctx context.Context, exportRef ExportRef, car bool, dest ExportDest) error { proxyBss, retrieveIntoIPFS := a.RtvlBlockstoreAccessor.(*retrievalprovider.ProxyBlockstoreAccessor) carBss, retrieveIntoCAR := a.RtvlBlockstoreAccessor.(*retrievalprovider.CARBlockstoreAccessor) + carPath := exportRef.FromLocalCAR - carPath := order.FromLocalCAR if carPath == "" { if !retrieveIntoIPFS && !retrieveIntoCAR { - // we actually need to retrieve from the network, but we don't - // recognize the blockstore accessor. - finish(xerrors.Errorf("unsupported retrieval blockstore accessor")) - return + return xerrors.Errorf("unsupported retrieval blockstore accessor") } - if order.MinerPeer == nil || order.MinerPeer.ID == "" { - mi, err := a.Full.StateMinerInfo(ctx, order.Miner, types.EmptyTSK) - if err != nil { - finish(err) - return - } - - order.MinerPeer = &retrievalmarket.RetrievalPeer{ - ID: *mi.PeerId, - Address: order.Miner, - } + if retrieveIntoCAR { + carPath = carBss.PathFor(exportRef.DealID) } + } - if order.Total.Int == nil { - finish(xerrors.Errorf("cannot make retrieval deal for null total")) - return + var retrievalBs bstore.Blockstore + if retrieveIntoIPFS { + retrievalBs = proxyBss.Blockstore + } else { + cbs, err := stores.ReadOnlyFilestore(carPath) + if err != nil { + return err } + defer cbs.Close() //nolint:errcheck + retrievalBs = cbs + } - if order.Size == 0 { - finish(xerrors.Errorf("cannot make retrieval deal for zero bytes")) - return + dserv := merkledag.NewDAGService(blockservice.New(retrievalBs, offline.Exchange(retrievalBs))) + + // Are we outputting a CAR? + if car { + // not IPFS and we do full selection - just extract the CARv1 from the CARv2 we stored the retrieval in + if !retrieveIntoIPFS && len(exportRef.DAGs) == 0 && dest.Writer == nil { + return carv2.ExtractV1File(carPath, dest.Path) } + } - ppb := types.BigDiv(order.Total, types.NewInt(order.Size)) + roots, err := parseDagSpec(ctx, exportRef.Root, exportRef.DAGs, dserv, car) + if err != nil { + return xerrors.Errorf("parsing dag spec: %w", err) + } + if car { + return a.outputCAR(ctx, dserv, retrievalBs, exportRef.Root, roots, dest) + } - params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, selectorparse.CommonSelector_ExploreAllRecursively, order.Piece, order.UnsealPrice) - if err != nil { - finish(xerrors.Errorf("Error in retrieval params: %s", err)) - return + if len(roots) != 1 { + return xerrors.Errorf("unixfs retrieval requires one root node, got %d", len(roots)) + } + + return a.outputUnixFS(ctx, roots[0].root, dserv, dest) +} + +func (a *API) outputCAR(ctx context.Context, ds format.DAGService, bs bstore.Blockstore, root cid.Cid, dags []dagSpec, dest ExportDest) error { + // generating a CARv1 from the configured blockstore + roots := make([]cid.Cid, len(dags)) + for i, dag := range dags { + roots[i] = dag.root + } + + return dest.doWrite(func(w io.Writer) error { + + if err := car.WriteHeader(&car.CarHeader{ + Roots: roots, + Version: 1, + }, w); err != nil { + return fmt.Errorf("failed to write car header: %s", err) } - // Subscribe to events before retrieving to avoid losing events. - subscribeEvents := make(chan retrievalSubscribeEvent, 1) - subscribeCtx, cancel := context.WithCancel(ctx) - defer cancel() - unsubscribe := a.Retrieval.SubscribeToEvents(func(event rm.ClientEvent, state rm.ClientDealState) { - // We'll check the deal IDs inside consumeAllEvents. - if state.PayloadCID.Equals(order.Root) { - select { - case <-subscribeCtx.Done(): - case subscribeEvents <- retrievalSubscribeEvent{event, state}: - } + cs := cid.NewSet() + + for _, dagSpec := range dags { + if err := utils.TraverseDag( + ctx, + ds, + root, + dagSpec.selector, + func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error { + if r == traversal.VisitReason_SelectionMatch { + var c cid.Cid + if p.LastBlock.Link == nil { + c = root + } else { + cidLnk, castOK := p.LastBlock.Link.(cidlink.Link) + if !castOK { + return xerrors.Errorf("cidlink cast unexpectedly failed on '%s'", p.LastBlock.Link) + } + + c = cidLnk.Cid + } + + if cs.Visit(c) { + nb, err := bs.Get(c) + if err != nil { + return xerrors.Errorf("getting block data: %w", err) + } + + err = util.LdWrite(w, c.Bytes(), nb.RawData()) + if err != nil { + return xerrors.Errorf("writing block data: %w", err) + } + } + + return nil + } + return nil + }, + ); err != nil { + return xerrors.Errorf("error while traversing car dag: %w", err) } - }) + } - id := a.Retrieval.NextID() - id, err = a.Retrieval.Retrieve( - ctx, - id, - order.Root, - params, - order.Total, - *order.MinerPeer, - order.Client, - order.Miner, - ) + return nil + }) +} - if err != nil { - unsubscribe() - finish(xerrors.Errorf("Retrieve failed: %w", err)) - return - } +func (a *API) outputUnixFS(ctx context.Context, root cid.Cid, ds format.DAGService, dest ExportDest) error { + nd, err := ds.Get(ctx, root) + if err != nil { + return xerrors.Errorf("ClientRetrieve: %w", err) + } + file, err := unixfile.NewUnixfsFile(ctx, ds, nd) + if err != nil { + return xerrors.Errorf("ClientRetrieve: %w", err) + } - err = consumeAllEvents(ctx, id, subscribeEvents, events) + if dest.Writer == nil { + return files.WriteTo(file, dest.Path) + } - unsubscribe() + switch f := file.(type) { + case files.File: + _, err = io.Copy(dest.Writer, f) if err != nil { - finish(xerrors.Errorf("Retrieve: %w", err)) - return - } - - if retrieveIntoCAR { - carPath = carBss.PathFor(id) + return err } + return nil + default: + return fmt.Errorf("file type %T is not supported", nd) } +} + +type dagSpec struct { + root cid.Cid + selector ipld.Node +} - if ref == nil { - // If ref is nil, it only fetches the data into the configured blockstore - // (if fetching from network). - finish(nil) - return +func parseDagSpec(ctx context.Context, root cid.Cid, dsp []DagSpec, ds format.DAGService, car bool) ([]dagSpec, error) { + if len(dsp) == 0 { + return []dagSpec{ + { + root: root, + selector: nil, + }, + }, nil } - // Are we outputting a CAR? - if ref.IsCAR { - if retrieveIntoIPFS { - // generating a CARv1 from IPFS. - f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - finish(err) - return - } + out := make([]dagSpec, len(dsp)) + for i, spec := range dsp { + + if spec.DataSelector == nil { + return nil, xerrors.Errorf("invalid DagSpec at position %d: `DataSelector` can not be nil", i) + } + + // reify selector + var err error + out[i].selector, err = getDataSelector(spec.DataSelector, car && spec.ExportMerkleProof) + if err != nil { + return nil, err + } + + // find the pointed-at root node within the containing ds + var rsn ipld.Node - bs := proxyBss.Blockstore - dags := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) - err = car.WriteCar(ctx, dags, []cid.Cid{order.Root}, f) + if strings.HasPrefix(string(*spec.DataSelector), "{") { + var err error + rsn, err = selectorparse.ParseJSONSelector(string(*spec.DataSelector)) if err != nil { - finish(err) - return + return nil, xerrors.Errorf("failed to parse json-selector '%s': %w", *spec.DataSelector, err) } - finish(f.Close()) - return + } else { + selspec, _ := textselector.SelectorSpecFromPath(textselector.Expression(*spec.DataSelector), car && spec.ExportMerkleProof, nil) //nolint:errcheck + rsn = selspec.Node() } - // generating a CARv1 from the CARv2 where we stored the retrieval. - err := carv2.ExtractV1File(carPath, ref.Path) - finish(err) - return - } + var newRoot cid.Cid + var errHalt = xerrors.New("halt walk") + if err := utils.TraverseDag( + ctx, + ds, + root, + rsn, + func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error { + if r == traversal.VisitReason_SelectionMatch { + if !car && p.LastBlock.Path.String() != p.Path.String() { + return xerrors.Errorf("unsupported selection path '%s' does not correspond to a block boundary (a.k.a. CID link)", p.Path.String()) + } + + if p.LastBlock.Link == nil { + // this is likely the root node that we've matched here + newRoot = root + return errHalt + } + + cidLnk, castOK := p.LastBlock.Link.(cidlink.Link) + if !castOK { + return xerrors.Errorf("cidlink cast unexpectedly failed on '%s'", p.LastBlock.Link) + } + + newRoot = cidLnk.Cid + + return errHalt + } + return nil + }, + ); err != nil && err != errHalt { + return nil, xerrors.Errorf("error while locating partial retrieval sub-root: %w", err) + } - // we are extracting a UnixFS file. - var bs bstore.Blockstore - if retrieveIntoIPFS { - bs = proxyBss.Blockstore - } else { - cbs, err := stores.ReadOnlyFilestore(carPath) - if err != nil { - finish(err) - return + if newRoot == cid.Undef { + return nil, xerrors.Errorf("path selection does not match a node within %s", root) } - defer cbs.Close() //nolint:errcheck - bs = cbs + + out[i].root = newRoot } - bsvc := blockservice.New(bs, offline.Exchange(bs)) - dag := merkledag.NewDAGService(bsvc) + return out, nil +} - nd, err := dag.Get(ctx, order.Root) - if err != nil { - finish(xerrors.Errorf("ClientRetrieve: %w", err)) - return - } - file, err := unixfile.NewUnixfsFile(ctx, dag, nd) - if err != nil { - finish(xerrors.Errorf("ClientRetrieve: %w", err)) - return +func getDataSelector(dps *Selector, matchPath bool) (datamodel.Node, error) { + sel := selectorparse.CommonSelector_ExploreAllRecursively + if dps != nil { + + if strings.HasPrefix(string(*dps), "{") { + var err error + sel, err = selectorparse.ParseJSONSelector(string(*dps)) + if err != nil { + return nil, xerrors.Errorf("failed to parse json-selector '%s': %w", *dps, err) + } + } else { + ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) + + selspec, err := textselector.SelectorSpecFromPath( + textselector.Expression(*dps), matchPath, + + ssb.ExploreRecursive( + selector.RecursionLimitNone(), + ssb.ExploreUnion(ssb.Matcher(), ssb.ExploreAll(ssb.ExploreRecursiveEdge())), + ), + ) + if err != nil { + return nil, xerrors.Errorf("failed to parse text-selector '%s': %w", *dps, err) + } + + sel = selspec.Node() + log.Infof("partial retrieval of datamodel-path-selector %s/*", *dps) + } } - finish(files.WriteTo(file, ref.Path)) + return sel, nil } func (a *API) ClientListRetrievals(ctx context.Context) ([]RetrievalInfo, error) { diff --git a/client/client_test.go b/client/client_test.go index 6f4061d9..fd8ba1f1 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -59,7 +59,7 @@ func TestImportLocal(t *testing.T) { require.NoError(t, err) require.True(t, local) - order := RetrievalOrder{ + order := ExportRef{ Root: root, FromLocalCAR: it.CARPath, } @@ -67,7 +67,7 @@ func TestImportLocal(t *testing.T) { // retrieve as UnixFS. out1 := filepath.Join(dir, "retrieval1.data") // as unixfs out2 := filepath.Join(dir, "retrieval2.data") // as car - err = a.ClientRetrieve(ctx, order, &FileRef{ + err = a.ClientExport(ctx, order, FileRef{ Path: out1, }) require.NoError(t, err) @@ -76,7 +76,7 @@ func TestImportLocal(t *testing.T) { require.NoError(t, err) require.Equal(t, b, outBytes) - err = a.ClientRetrieve(ctx, order, &FileRef{ + err = a.ClientExport(ctx, order, FileRef{ Path: out2, IsCAR: true, }) diff --git a/client/types.go b/client/types.go index 74a5b3bb..211fe38d 100644 --- a/client/types.go +++ b/client/types.go @@ -15,6 +15,10 @@ import ( "time" ) +type RestrievalRes struct { + DealID retrievalmarket.DealID +} + type ClientImportMgr *imports.Manager type DealInfo struct { @@ -59,13 +63,13 @@ type ImportRes struct { type RetrievalOrder struct { // TODO: make this less unixfs specific - Root cid.Cid - Piece *cid.Cid + Root cid.Cid + Piece *cid.Cid + DataSelector *Selector + Size uint64 + Total types.BigInt - FromLocalCAR string // if specified, get data from a local CARv2 file. - // TODO: support offset - Total types.BigInt UnsealPrice types.BigInt PaymentInterval uint64 PaymentIntervalIncrease uint64 @@ -74,6 +78,42 @@ type RetrievalOrder struct { MinerPeer *retrievalmarket.RetrievalPeer } +// Selector specifies ipld selector string +// - if the string starts with '{', it's interpreted as json selector string +// see https://ipld.io/specs/selectors/ and https://ipld.io/specs/selectors/fixtures/selector-fixtures-1/ +// - otherwise the string is interpreted as ipld-selector-text-lite (simple ipld path) +// see https://github.com/ipld/go-ipld-selector-text-lite +type Selector string + +type DagSpec struct { + // DataSelector matches data to be retrieved + // - when using textselector, the path specifies subtree + // - the matched graph must have a single root + DataSelector *Selector + + // ExportMerkleProof is applicable only when exporting to a CAR file via a path textselector + // When true, in addition to the selection target, the resulting CAR will contain every block along the + // path back to, and including the original root + // When false the resulting CAR contains only the blocks of the target subdag + ExportMerkleProof bool +} + +type ExportRef struct { + Root cid.Cid + + // DAGs array specifies a list of DAGs to export + // - If exporting into unixfs files, only one DAG is supported, DataSelector is only used to find the targeted root node + // - If exporting into a car file + // - When exactly one text-path DataSelector is specified exports the subgraph and its full merkle-path from the original root + // - Otherwise ( multiple paths and/or JSON selector specs) determines each individual subroot and exports the subtrees as a multi-root car + // - When not specified defaults to a single DAG: + // - Data - the entire DAG: `{"R":{"l":{"none":{}},":>":{"a":{">":{"@":{}}}}}}` + DAGs []DagSpec + + FromLocalCAR string // if specified, get data from a local CARv2 file. + DealID retrievalmarket.DealID +} + type FileRef struct { Path string IsCAR bool @@ -93,6 +133,7 @@ type DataCIDSize struct { PieceSize abi.PaddedPieceSize PieceCID cid.Cid } + type RetrievalInfo struct { PayloadCID cid.Cid ID retrievalmarket.DealID @@ -109,6 +150,9 @@ type RetrievalInfo struct { TransferChannelID *datatransfer.ChannelID DataTransfer *types2.DataTransferChannel + + // optional event if part of ClientGetRetrievalUpdates + Event *retrievalmarket.ClientEvent } type QueryOffer struct { diff --git a/cmd/market-client/retrieval.go b/cmd/market-client/retrieval.go index 1426557f..9f103bb2 100644 --- a/cmd/market-client/retrieval.go +++ b/cmd/market-client/retrieval.go @@ -2,25 +2,17 @@ package main import ( "context" - "encoding/json" "errors" "fmt" - "io" - "sort" - tm "github.com/buger/goterm" "github.com/docker/go-units" "github.com/fatih/color" - "github.com/ipfs/go-cid" - "github.com/urfave/cli/v2" - "golang.org/x/xerrors" - - "github.com/filecoin-project/go-address" datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-fil-markets/retrievalmarket" - "github.com/filecoin-project/go-state-types/big" - "github.com/filecoin-project/venus/pkg/types" + "github.com/ipfs/go-cid" + "github.com/urfave/cli/v2" + "io" cli2 "github.com/filecoin-project/venus-market/cli" "github.com/filecoin-project/venus-market/cli/tablewriter" @@ -32,7 +24,7 @@ var retrievalCmd = &cli.Command{ Usage: "manage retrieval deals", Subcommands: []*cli.Command{ retrievalFindCmd, - retrievalRetrieveCmd, + clientRetrieveCmd, retrievalCancelCmd, retrievalListCmd, }, @@ -103,206 +95,6 @@ var retrievalFindCmd = &cli.Command{ }, } -const DefaultMaxRetrievePrice = "0.01" - -var retrievalRetrieveCmd = &cli.Command{ - Name: "retrieve", - Usage: "Retrieve data from network", - ArgsUsage: "[dataCid outputPath]", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "from", - Usage: "address to send transactions from", - }, - &cli.BoolFlag{ - Name: "car", - Usage: "export to a car file instead of a regular file", - }, - &cli.StringFlag{ - Name: "miner", - Usage: "miner address for retrieval, if not present it'll use local discovery", - }, - &cli.StringFlag{ - Name: "maxPrice", - Usage: fmt.Sprintf("maximum price the client is willing to consider (default: %s FIL)", DefaultMaxRetrievePrice), - }, - &cli.StringFlag{ - Name: "pieceCid", - Usage: "require data to be retrieved from a specific Piece CID", - }, - &cli.BoolFlag{ - Name: "allow-local", - }, - }, - Action: func(cctx *cli.Context) error { - if cctx.NArg() != 2 { - return cli2.ShowHelp(cctx, fmt.Errorf("incorrect number of arguments")) - } - mapi, closer, err := cli2.NewMarketClientNode(cctx) - if err != nil { - return err - } - defer closer() - ctx := cli2.ReqContext(cctx) - afmt := cli2.NewAppFmt(cctx.App) - - var payer address.Address - if cctx.String("from") != "" { - payer, err = address.NewFromString(cctx.String("from")) - } else { - payer, err = mapi.DefaultAddress(ctx) - } - if err != nil { - return err - } - - file, err := cid.Parse(cctx.Args().Get(0)) - if err != nil { - return err - } - - var pieceCid *cid.Cid - if cctx.String("pieceCid") != "" { - parsed, err := cid.Parse(cctx.String("pieceCid")) - if err != nil { - return err - } - pieceCid = &parsed - } - - var order *client.RetrievalOrder - if cctx.Bool("allow-local") { - imports, err := mapi.ClientListImports(ctx) - if err != nil { - return err - } - - for _, i := range imports { - if i.Root != nil && i.Root.Equals(file) { - order = &client.RetrievalOrder{ - Root: file, - FromLocalCAR: i.CARPath, - - Total: big.Zero(), - UnsealPrice: big.Zero(), - } - break - } - } - } - - if order == nil { - var offer client.QueryOffer - minerStrAddr := cctx.String("miner") - if minerStrAddr == "" { // Local discovery - offers, err := mapi.ClientFindData(ctx, file, pieceCid) - - var cleaned []client.QueryOffer - // filter out offers that errored - for _, o := range offers { - if o.Err == "" { - cleaned = append(cleaned, o) - } - } - - offers = cleaned - - // sort by price low to high - sort.Slice(offers, func(i, j int) bool { - return offers[i].MinPrice.LessThan(offers[j].MinPrice) - }) - if err != nil { - return err - } - - // TODO: parse offer strings from `client find`, make this smarter - if len(offers) < 1 { - fmt.Println("Failed to find file") - return nil - } - offer = offers[0] - } else { // Directed retrieval - minerAddr, err := address.NewFromString(minerStrAddr) - if err != nil { - return err - } - offer, err = mapi.ClientMinerQueryOffer(ctx, minerAddr, file, pieceCid) - if err != nil { - return err - } - } - if offer.Err != "" { - return fmt.Errorf("The received offer errored: %s", offer.Err) - } - - maxPrice := types.MustParseFIL(DefaultMaxRetrievePrice) - - if cctx.String("maxPrice") != "" { - maxPrice, err = types.ParseFIL(cctx.String("maxPrice")) - if err != nil { - return xerrors.Errorf("parsing maxPrice: %w", err) - } - } - - if offer.MinPrice.GreaterThan(big.Int(maxPrice)) { - return xerrors.Errorf("failed to find offer satisfying maxPrice: %s", maxPrice) - } - - o := offer.Order(payer) - order = &o - } - ref := &client.FileRef{ - Path: cctx.Args().Get(1), - IsCAR: cctx.Bool("car"), - } - - fmt.Println("Size:", order.Size) - fmt.Println("Unseal Price:", order.UnsealPrice) - fmt.Println("Total Fee:", order.Total) - - data, _ := json.MarshalIndent(order, " ", "\t") - fmt.Println(string(data)) - updates, err := mapi.ClientRetrieveWithEvents(ctx, *order, ref) - if err != nil { - return xerrors.Errorf("error setting up retrieval: %w", err) - } - - var prevStatus retrievalmarket.DealStatus - - for { - select { - case evt, ok := <-updates: - if ok { - afmt.Printf("> Recv: %s, Paid %s, %s (%s)\n", - types.SizeStr(types.NewInt(evt.BytesReceived)), - types.FIL(evt.FundsSpent), - retrievalmarket.ClientEvents[evt.Event], - retrievalmarket.DealStatuses[evt.Status], - ) - prevStatus = evt.Status - } - - if evt.Err != "" { - return xerrors.Errorf("retrieval failed: %s", evt.Err) - } - - if !ok { - if prevStatus == retrievalmarket.DealStatusCompleted { - afmt.Println("Success") - } else { - afmt.Printf("saw final deal state %s instead of expected success state DealStatusCompleted\n", - retrievalmarket.DealStatuses[prevStatus]) - } - return nil - } - - case <-ctx.Done(): - return xerrors.Errorf("retrieval timed out") - } - } - }, -} - func retrievalStatusString(status retrievalmarket.DealStatus) string { s := retrievalmarket.DealStatuses[status] diff --git a/cmd/venus-market/solo-run.go b/cmd/venus-market/solo-run.go index 51c971fe..dda555e5 100644 --- a/cmd/venus-market/solo-run.go +++ b/cmd/venus-market/solo-run.go @@ -33,7 +33,6 @@ import ( "github.com/filecoin-project/venus-market/utils" ) - var soloRunCmd = &cli.Command{ Name: "solo-run", Usage: "Run the market daemon in solo mode", diff --git a/go.mod b/go.mod index 1ce1f886..cc36edab 100644 --- a/go.mod +++ b/go.mod @@ -65,7 +65,9 @@ require ( github.com/ipfs/go-unixfs v0.2.6 github.com/ipld/go-car v0.3.2-0.20211001225732-32d0d9933823 github.com/ipld/go-car/v2 v2.0.3-0.20210811121346-c514a30114d7 + github.com/ipld/go-codec-dagpb v1.3.0 github.com/ipld/go-ipld-prime v0.12.3 + github.com/ipld/go-ipld-selector-text-lite v0.0.1 github.com/libp2p/go-buffer-pool v0.0.2 github.com/libp2p/go-libp2p v0.14.2 github.com/libp2p/go-libp2p-core v0.8.6 @@ -103,6 +105,7 @@ require ( replace ( github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi + github.com/filecoin-project/go-fil-markets => /Users/lijunlong/code/go-fil-markets github.com/filecoin-project/go-jsonrpc => github.com/ipfs-force-community/go-jsonrpc v0.1.4-0.20210721095535-a67dff16de21 github.com/ipfs/go-ipfs-cmds => github.com/ipfs-force-community/go-ipfs-cmds v0.6.1-0.20210521090123-4587df7fa0ab ) diff --git a/go.sum b/go.sum index f13c267a..c6d8c612 100644 --- a/go.sum +++ b/go.sum @@ -301,15 +301,12 @@ github.com/filecoin-project/go-bitfield v0.2.4/go.mod h1:CNl9WG8hgR5mttCnUErjcQj github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg= github.com/filecoin-project/go-cbor-util v0.0.0-20201016124514-d0bbec7bfcc4 h1:YmE80qPn5K0txSqxnRNiCRAWyXI1LTO//I4c4H0QwbM= github.com/filecoin-project/go-cbor-util v0.0.0-20201016124514-d0bbec7bfcc4/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg= -github.com/filecoin-project/go-commp-utils v0.0.0-20201119054358-b88f7a96a434/go.mod h1:6s95K91mCyHY51RPWECZieD3SGWTqIFLf1mPOes9l5U= github.com/filecoin-project/go-commp-utils v0.1.0/go.mod h1:6s95K91mCyHY51RPWECZieD3SGWTqIFLf1mPOes9l5U= github.com/filecoin-project/go-commp-utils v0.1.1-0.20210427191551-70bf140d31c7/go.mod h1:6s95K91mCyHY51RPWECZieD3SGWTqIFLf1mPOes9l5U= github.com/filecoin-project/go-commp-utils v0.1.3 h1:rTxbkNXZU7FLgdkBk8RsQIEOuPONHykEoX3xGk41Fkw= github.com/filecoin-project/go-commp-utils v0.1.3/go.mod h1:3ENlD1pZySaUout0p9ANQrY3fDFoXdqyX04J+dWpK30= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= -github.com/filecoin-project/go-data-transfer v1.0.1/go.mod h1:UxvfUAY9v3ub0a21BSK9u3pB2aq30Y0KMsG+w9/ysyo= -github.com/filecoin-project/go-data-transfer v1.2.7/go.mod h1:mvjZ+C3NkBX10JP4JMu27DCjUouHFjHwUGh+Xc4yvDA= github.com/filecoin-project/go-data-transfer v1.5.0/go.mod h1:E3WW4mCEYwU2y65swPEajSZoFWFmfXt7uwGduoACZQc= github.com/filecoin-project/go-data-transfer v1.10.0/go.mod h1:uQtqy6vUAY5v70ZHdkF5mJ8CjVtjj/JA3aOoaqzWTVw= github.com/filecoin-project/go-data-transfer v1.10.1/go.mod h1:CSDMCrPK2lVGodNB1wPEogjFvM9nVGyiL1GNbBRTSdw= @@ -323,10 +320,6 @@ github.com/filecoin-project/go-fil-commcid v0.1.0 h1:3R4ds1A9r6cr8mvZBfMYxTS88Oq github.com/filecoin-project/go-fil-commcid v0.1.0/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 h1:imrrpZWEHRnNqqv0tN7LXep5bFEVOVmQWHJvl2mgsGo= github.com/filecoin-project/go-fil-commp-hashhash v0.1.0/go.mod h1:73S8WSEWh9vr0fDJVnKADhfIv/d6dCbAGaAGWbdJEI8= -github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c= -github.com/filecoin-project/go-fil-markets v1.1.9/go.mod h1:0yQu5gvrjFoAIyzPSSJ+xUdCG83vjInAFbTswIB5/hk= -github.com/filecoin-project/go-fil-markets v1.12.0 h1:RpU5bLaMADVrU4CgLxKMGHC2ZUocNV35uINxogQCf00= -github.com/filecoin-project/go-fil-markets v1.12.0/go.mod h1:XuuZFaFujI47nrgfQJiq7jWB+6rRya6nm7Sj6uXQ80U= github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM= github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24= github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM= @@ -357,14 +350,10 @@ github.com/filecoin-project/go-statemachine v1.0.1/go.mod h1:jZdXXiHa61n4NmgWFG4 github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI= github.com/filecoin-project/go-statestore v0.1.1 h1:ufMFq00VqnT2CAuDpcGnwLnCX1I/c3OROw/kXVNSTZk= github.com/filecoin-project/go-statestore v0.1.1/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI= -github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b/go.mod h1:Q0GQOBtKf1oE10eSXSlhN45kDBdGvEcVOqMiffqX+N8= github.com/filecoin-project/specs-actors v0.9.4/go.mod h1:BStZQzx5x7TmCkLv0Bpa07U6cPKol6fd3w9KjMPZ6Z4= -github.com/filecoin-project/specs-actors v0.9.12/go.mod h1:TS1AW/7LbG+615j4NsjMK1qlpAwaFsG9w0V2tg2gSao= github.com/filecoin-project/specs-actors v0.9.13/go.mod h1:TS1AW/7LbG+615j4NsjMK1qlpAwaFsG9w0V2tg2gSao= github.com/filecoin-project/specs-actors v0.9.14 h1:68PVstg2UB3ZsMLF+DKFTAs/YKsqhKWynkr0IqmVRQY= github.com/filecoin-project/specs-actors v0.9.14/go.mod h1:TS1AW/7LbG+615j4NsjMK1qlpAwaFsG9w0V2tg2gSao= -github.com/filecoin-project/specs-actors/v2 v2.0.1/go.mod h1:v2NZVYinNIKA9acEMBm5wWXxqv5+frFEbekBFemYghY= -github.com/filecoin-project/specs-actors/v2 v2.3.2/go.mod h1:UuJQLoTx/HPvvWeqlIFmC/ywlOLHNe8SNQ3OunFbu2Y= github.com/filecoin-project/specs-actors/v2 v2.3.5-0.20210114162132-5b58b773f4fb/go.mod h1:LljnY2Mn2homxZsmokJZCpRuhOPxfXhvcek5gWkmqAc= github.com/filecoin-project/specs-actors/v2 v2.3.5 h1:PbT4tPlSXZ8sRgajhb4D8AOEmiaaZ+jg6tc6BBv8VQc= github.com/filecoin-project/specs-actors/v2 v2.3.5/go.mod h1:LljnY2Mn2homxZsmokJZCpRuhOPxfXhvcek5gWkmqAc= @@ -788,9 +777,6 @@ github.com/ipfs/go-filestore v1.0.0/go.mod h1:/XOCuNtIe2f1YPbiXdYvD0BKLA0JR1MgPi github.com/ipfs/go-fs-lock v0.0.6 h1:sn3TWwNVQqSeNjlWy6zQ1uUGAZrV3hPOyEA6y1/N2a0= github.com/ipfs/go-fs-lock v0.0.6/go.mod h1:OTR+Rj9sHiRubJh3dRhD15Juhd/+w6VPOY28L7zESmM= github.com/ipfs/go-graphsync v0.1.0/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE= -github.com/ipfs/go-graphsync v0.4.2/go.mod h1:/VmbZTUdUMTbNkgzAiCEucIIAU3BkLE2cZrDCVUhyi0= -github.com/ipfs/go-graphsync v0.4.3/go.mod h1:mPOwDYv128gf8gxPFgXnz4fNrSYPsWyqisJ7ych+XDY= -github.com/ipfs/go-graphsync v0.5.2/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk= github.com/ipfs/go-graphsync v0.6.1/go.mod h1:e2ZxnClqBBYAtd901g9vXMJzS47labjAtOzsWtOzKNk= github.com/ipfs/go-graphsync v0.9.0/go.mod h1:J62ahWT9JbPsFL2UWsUM5rOu0lZJ0LOIH1chHdxGGcw= github.com/ipfs/go-graphsync v0.9.1/go.mod h1:J62ahWT9JbPsFL2UWsUM5rOu0lZJ0LOIH1chHdxGGcw= @@ -840,7 +826,6 @@ github.com/ipfs/go-ipfs-util v0.0.2/go.mod h1:CbPtkWJzjLdEcezDns2XYaehFVNXG9zrdr github.com/ipfs/go-ipld-cbor v0.0.2/go.mod h1:wTBtrQZA3SoFKMVkp6cn6HMRteIB1VsmHA0AQFOn7Nc= github.com/ipfs/go-ipld-cbor v0.0.3/go.mod h1:wTBtrQZA3SoFKMVkp6cn6HMRteIB1VsmHA0AQFOn7Nc= github.com/ipfs/go-ipld-cbor v0.0.4/go.mod h1:BkCduEx3XBCO6t2Sfo5BaHzuok7hbhdMm9Oh8B2Ftq4= -github.com/ipfs/go-ipld-cbor v0.0.5-0.20200204214505-252690b78669/go.mod h1:BkCduEx3XBCO6t2Sfo5BaHzuok7hbhdMm9Oh8B2Ftq4= github.com/ipfs/go-ipld-cbor v0.0.5 h1:ovz4CHKogtG2KB/h1zUp5U0c/IzZrL435rCh5+K/5G8= github.com/ipfs/go-ipld-cbor v0.0.5/go.mod h1:BkCduEx3XBCO6t2Sfo5BaHzuok7hbhdMm9Oh8B2Ftq4= github.com/ipfs/go-ipld-format v0.0.1/go.mod h1:kyJtbkDALmFHv3QR6et67i35QzO3S0dCDnkOJhcZkms= @@ -889,7 +874,6 @@ github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2 github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0= github.com/ipfs/iptb v1.4.0/go.mod h1:1rzHpCYtNp87/+hTxG5TfCVn/yMY3dKnLn8tBiMfdmg= github.com/ipld/go-car v0.1.0/go.mod h1:RCWzaUh2i4mOEkB3W45Vc+9jnS/M6Qay5ooytiBHl3g= -github.com/ipld/go-car v0.1.1-0.20200923150018-8cdef32e2da4/go.mod h1:xrMEcuSq+D1vEwl+YAXsg/JfA98XGpXDwnkIL4Aimqw= github.com/ipld/go-car v0.1.1-0.20201119040415-11b6074b6d4d/go.mod h1:2Gys8L8MJ6zkh1gktTSXreY63t4UbyvNp5JaudTyxHQ= github.com/ipld/go-car v0.3.1-0.20210601190600-f512dac51e8e/go.mod h1:wUxBdwOLA9/0HZBi3fnTBzla0MuwlqgJLyrhOg1XaKI= github.com/ipld/go-car v0.3.2-0.20211001225732-32d0d9933823 h1:8JMSJ0k71fU9lIUrpVwEdoX4KoxiTEX8cZG97v/hTDw= @@ -906,6 +890,7 @@ github.com/ipld/go-ipld-prime v0.0.2-0.20200428162820-8b59dc292b8e/go.mod h1:uVI github.com/ipld/go-ipld-prime v0.5.1-0.20200828233916-988837377a7f/go.mod h1:0xEgdD6MKbZ1vF0GC+YcR/C4SQCAlRuOjIJ2i0HxqzM= github.com/ipld/go-ipld-prime v0.5.1-0.20201021195245-109253e8a018/go.mod h1:0xEgdD6MKbZ1vF0GC+YcR/C4SQCAlRuOjIJ2i0HxqzM= github.com/ipld/go-ipld-prime v0.9.0/go.mod h1:KvBLMr4PX1gWptgkzRjVZCrLmSGcZCb/jioOQwCqZN8= +github.com/ipld/go-ipld-prime v0.10.0/go.mod h1:KvBLMr4PX1gWptgkzRjVZCrLmSGcZCb/jioOQwCqZN8= github.com/ipld/go-ipld-prime v0.11.0/go.mod h1:+WIAkokurHmZ/KwzDOMUuoeJgaRQktHtEaLglS3ZeV8= github.com/ipld/go-ipld-prime v0.12.0/go.mod h1:hy8b93WleDMRKumOJnTIrr0MbbFbx9GD6Kzxa53Xppc= github.com/ipld/go-ipld-prime v0.12.3-0.20210930132912-0b3aef3ca569/go.mod h1:PaeLYq8k6dJLmDUSLrzkEpoGV4PEfe/1OtFN/eALOc8= @@ -915,6 +900,8 @@ github.com/ipld/go-ipld-prime-proto v0.0.0-20191113031812-e32bd156a1e5/go.mod h1 github.com/ipld/go-ipld-prime-proto v0.0.0-20200428191222-c1ffdadc01e1/go.mod h1:OAV6xBmuTLsPZ+epzKkPB1e25FHk/vCtyatkdHcArLs= github.com/ipld/go-ipld-prime-proto v0.0.0-20200922192210-9a2bfd4440a6/go.mod h1:3pHYooM9Ea65jewRwrb2u5uHZCNkNTe9ABsVB+SrkH0= github.com/ipld/go-ipld-prime-proto v0.1.0/go.mod h1:11zp8f3sHVgIqtb/c9Kr5ZGqpnCLF1IVTNOez9TopzE= +github.com/ipld/go-ipld-selector-text-lite v0.0.1 h1:lNqFsQpBHc3p5xHob2KvEg/iM5dIFn6iw4L/Hh+kS1Y= +github.com/ipld/go-ipld-selector-text-lite v0.0.1/go.mod h1:U2CQmFb+uWzfIEF3I1arrDa5rwtj00PrpiwwCO+k1RM= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4= github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA= @@ -1063,7 +1050,6 @@ github.com/libp2p/go-libp2p v0.7.4/go.mod h1:oXsBlTLF1q7pxr+9w6lqzS1ILpyHsaBPniV github.com/libp2p/go-libp2p v0.8.1/go.mod h1:QRNH9pwdbEBpx5DTJYg+qxcVaDMAz3Ee/qDKwXujH5o= github.com/libp2p/go-libp2p v0.8.3/go.mod h1:EsH1A+8yoWK+L4iKcbPYu6MPluZ+CHWI9El8cTaefiM= github.com/libp2p/go-libp2p v0.9.2/go.mod h1:cunHNLDVus66Ct9iXXcjKRLdmHdFdHVe1TAnbubJQqQ= -github.com/libp2p/go-libp2p v0.10.0/go.mod h1:yBJNpb+mGJdgrwbKAKrhPU0u3ogyNFTfjJ6bdM+Q/G8= github.com/libp2p/go-libp2p v0.12.0/go.mod h1:FpHZrfC1q7nA8jitvdjKBDF31hguaC676g/nT9PgQM0= github.com/libp2p/go-libp2p v0.13.0/go.mod h1:pM0beYdACRfHO1WcJlp65WXyG2A6NqYM+t2DTVAJxMo= github.com/libp2p/go-libp2p v0.14.0/go.mod h1:dsQrWLAoIn+GkHPN/U+yypizkHiB9tnv79Os+kSgQ4Q= @@ -1092,7 +1078,6 @@ github.com/libp2p/go-libp2p-circuit v0.1.0/go.mod h1:Ahq4cY3V9VJcHcn1SBXjr78AbFk github.com/libp2p/go-libp2p-circuit v0.1.4/go.mod h1:CY67BrEjKNDhdTk8UgBX1Y/H5c3xkAcs3gnksxY7osU= github.com/libp2p/go-libp2p-circuit v0.2.1/go.mod h1:BXPwYDN5A8z4OEY9sOfr2DUQMLQvKt/6oku45YUmjIo= github.com/libp2p/go-libp2p-circuit v0.2.2/go.mod h1:nkG3iE01tR3FoQ2nMm06IUrCpCyJp1Eo4A1xYdpjfs4= -github.com/libp2p/go-libp2p-circuit v0.2.3/go.mod h1:nkG3iE01tR3FoQ2nMm06IUrCpCyJp1Eo4A1xYdpjfs4= github.com/libp2p/go-libp2p-circuit v0.4.0 h1:eqQ3sEYkGTtybWgr6JLqJY6QLtPWRErvFjFDfAOO1wc= github.com/libp2p/go-libp2p-circuit v0.4.0/go.mod h1:t/ktoFIUzM6uLQ+o1G6NuBl2ANhBKN9Bc8jRIk31MoA= github.com/libp2p/go-libp2p-connmgr v0.2.3/go.mod h1:Gqjg29zI8CwXX21zRxy6gOg8VYu3zVerJRt2KyktzH4= @@ -1197,7 +1182,6 @@ github.com/libp2p/go-libp2p-pubsub v0.3.2-0.20200527132641-c0712c6e92cf/go.mod h github.com/libp2p/go-libp2p-pubsub v0.4.2-0.20210212194758-6c1addf493eb/go.mod h1:izkeMLvz6Ht8yAISXjx60XUQZMq9ZMe5h2ih4dLIBIQ= github.com/libp2p/go-libp2p-pubsub v0.5.4 h1:rHl9/Xok4zX3zgi0pg0XnUj9Xj2OeXO8oTu85q2+YA8= github.com/libp2p/go-libp2p-pubsub v0.5.4/go.mod h1:gVOzwebXVdSMDQBTfH8ACO5EJ4SQrvsHqCmYsCZpD0E= -github.com/libp2p/go-libp2p-quic-transport v0.5.0/go.mod h1:IEcuC5MLxvZ5KuHKjRu+dr3LjCT1Be3rcD/4d8JrX8M= github.com/libp2p/go-libp2p-quic-transport v0.10.0/go.mod h1:RfJbZ8IqXIhxBRm5hqUEJqjiiY8xmEuq3HUDS993MkA= github.com/libp2p/go-libp2p-quic-transport v0.11.2 h1:p1YQDZRHH4Cv2LPtHubqlQ9ggz4CKng/REZuXZbZMhM= github.com/libp2p/go-libp2p-quic-transport v0.11.2/go.mod h1:wlanzKtIh6pHrq+0U3p3DY9PJfGqxMgPaGKaK5LifwQ= @@ -1219,7 +1203,6 @@ github.com/libp2p/go-libp2p-swarm v0.1.0/go.mod h1:wQVsCdjsuZoc730CgOvh5ox6K8evl github.com/libp2p/go-libp2p-swarm v0.2.2/go.mod h1:fvmtQ0T1nErXym1/aa1uJEyN7JzaTNyBcHImCxRpPKU= github.com/libp2p/go-libp2p-swarm v0.2.3/go.mod h1:P2VO/EpxRyDxtChXz/VPVXyTnszHvokHKRhfkEgFKNM= github.com/libp2p/go-libp2p-swarm v0.2.4/go.mod h1:/xIpHFPPh3wmSthtxdGbkHZ0OET1h/GGZes8Wku/M5Y= -github.com/libp2p/go-libp2p-swarm v0.2.7/go.mod h1:ZSJ0Q+oq/B1JgfPHJAT2HTall+xYRNYp1xs4S2FBWKA= github.com/libp2p/go-libp2p-swarm v0.2.8/go.mod h1:JQKMGSth4SMqonruY0a8yjlPVIkb0mdNSwckW7OYziM= github.com/libp2p/go-libp2p-swarm v0.3.0/go.mod h1:hdv95GWCTmzkgeJpP+GK/9D9puJegb7H57B5hWQR5Kk= github.com/libp2p/go-libp2p-swarm v0.3.1/go.mod h1:hdv95GWCTmzkgeJpP+GK/9D9puJegb7H57B5hWQR5Kk= @@ -1349,7 +1332,6 @@ github.com/libp2p/go-yamux/v2 v2.2.0/go.mod h1:3So6P6TV6r75R9jiBpiIKgU/66lOarCZj github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= github.com/logrusorgru/aurora v0.0.0-20181002194514-a7b3b318ed4e/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= -github.com/lucas-clemente/quic-go v0.16.0/go.mod h1:I0+fcNTdb9eS1ZcjQZbDVPGchJ86chcIxPALn9lEJqE= github.com/lucas-clemente/quic-go v0.19.3/go.mod h1:ADXpNbTQjq1hIzCpB+y/k5iz4n4z4IwqoLb94Kh5Hu8= github.com/lucas-clemente/quic-go v0.21.2 h1:8LqqL7nBQFDUINadW0fHV/xSaCQJgmJC0Gv+qUnjd78= github.com/lucas-clemente/quic-go v0.21.2/go.mod h1:vF5M1XqhBAHgbjKcJOXY3JZz3GP0T3FQhz/uyOUS38Q= @@ -1365,9 +1347,7 @@ github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czP github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/maratori/testpackage v1.0.1/go.mod h1:ddKdw+XG0Phzhx8BFDTKgpWP4i7MpApTE5fXSKAqwDU= -github.com/marten-seemann/qpack v0.1.0/go.mod h1:LFt1NU/Ptjip0C2CPkhimBz5CGE3WGDAUWqna+CNTrI= github.com/marten-seemann/qpack v0.2.1/go.mod h1:F7Gl5L1jIgN1D11ucXefiuJS9UMVP2opoCp2jDKb7wc= -github.com/marten-seemann/qtls v0.9.1/go.mod h1:T1MmAdDPyISzxlK6kjRr0pcZFBVd1OZbBb/j3cvzHhk= github.com/marten-seemann/qtls v0.10.0/go.mod h1:UvMd1oaYDACI99/oZUYLzMCkBXQVT0aGm99sJhbT8hs= github.com/marten-seemann/qtls-go1-15 v0.1.1/go.mod h1:GyFwywLKkRt+6mfU99csTEY1joMZz5vmB1WNZH3P81I= github.com/marten-seemann/qtls-go1-15 v0.1.4/go.mod h1:GyFwywLKkRt+6mfU99csTEY1joMZz5vmB1WNZH3P81I= @@ -1575,7 +1555,6 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.0/go.mod h1:oUhWkIvk5aDxtKvDDuw8gItl8pKl42LzjC9KZE0HfGg= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= @@ -1587,7 +1566,6 @@ github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5 github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= -github.com/onsi/gomega v1.8.1/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA= github.com/onsi/gomega v1.9.0/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48= @@ -1831,7 +1809,6 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.6.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -2132,7 +2109,6 @@ golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190227160552-c95aed5357e7/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20190228165749-92fc7df08ae7/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190313220215-9f648a60d977/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= diff --git a/models/badger/retrieval_deal.go b/models/badger/retrieval_deal.go index ed9d6cf5..0fa3299d 100644 --- a/models/badger/retrieval_deal.go +++ b/models/badger/retrieval_deal.go @@ -2,7 +2,6 @@ package badger import ( "bytes" - "fmt" cborrpc "github.com/filecoin-project/go-cbor-util" datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-fil-markets/retrievalmarket" @@ -29,9 +28,6 @@ func (r retrievalDealRepo) SaveDeal(deal *types.ProviderDealState) error { if err != nil { return err } - - fmt.Println("save deal ", deal.Identifier(), deal.Status.String()) - return r.ds.Put(statestore.ToKey(deal.Identifier()), b) } @@ -50,7 +46,6 @@ func (r retrievalDealRepo) GetDeal(id peer.ID, id2 retrievalmarket.DealID) (*typ return nil, err } - fmt.Println("get deal ", key.String(), retrievalDeal.Status.String()) return &retrievalDeal, nil } @@ -59,8 +54,9 @@ func (r retrievalDealRepo) GetDealByTransferId(chid datatransfer.ChannelID) (*ty err := travelDeals(r.ds, func(deal *types.ProviderDealState) (stop bool, err error) { if deal.ChannelID != nil && deal.ChannelID.Initiator == chid.Initiator && deal.ChannelID.Responder == chid.Responder && deal.ChannelID.ID == chid.ID { result = deal + return true, nil } - return true, nil + return false, nil }) if err != nil { return nil, err diff --git a/models/badger/storage_deal.go b/models/badger/storage_deal.go index 7dc513c1..3c2d84c4 100644 --- a/models/badger/storage_deal.go +++ b/models/badger/storage_deal.go @@ -263,6 +263,7 @@ func (sdr *storageDealRepo) GetPieceSize(pieceCID cid.Cid) (abi.UnpaddedPieceSiz err := travelDeals(sdr.ds, func(inDeal *types.MinerDeal) (stop bool, err error) { if inDeal.ClientDealProposal.Proposal.PieceCID == pieceCID { deal = inDeal + return true, nil } return false, nil }) diff --git a/retrievalprovider/client.go b/retrievalprovider/client.go index 8e70e120..df88a323 100644 --- a/retrievalprovider/client.go +++ b/retrievalprovider/client.go @@ -2,8 +2,6 @@ package retrievalprovider import ( "context" - "fmt" - "github.com/ipfs/go-cid" "github.com/multiformats/go-multiaddr" @@ -57,7 +55,6 @@ func (rcn *retrievalClientNode) AllocateLane(ctx context.Context, paymentChannel func (rcn *retrievalClientNode) CreatePaymentVoucher(ctx context.Context, paymentChannel address.Address, amount abi.TokenAmount, lane uint64, tok shared.TipSetToken) (*paych.SignedVoucher, error) { // TODO: respect the provided TipSetToken (a serialized TipSetKey) when // querying the chain - fmt.Println("PaychVoucherCreate start") voucher, err := rcn.payAPI.PaychVoucherCreate(ctx, paymentChannel, amount, lane) if err != nil { return nil, err @@ -65,7 +62,6 @@ func (rcn *retrievalClientNode) CreatePaymentVoucher(ctx context.Context, paymen if voucher.Voucher == nil { return nil, retrievalmarket.NewShortfallError(voucher.Shortfall) } - fmt.Println("CreatePaymentVoucher finish") return voucher.Voucher, nil } diff --git a/retrievalprovider/provider_datatransfer_sub.go b/retrievalprovider/provider_datatransfer_sub.go index 34e6cb29..76b887ad 100644 --- a/retrievalprovider/provider_datatransfer_sub.go +++ b/retrievalprovider/provider_datatransfer_sub.go @@ -24,8 +24,6 @@ func ProviderDataTransferSubscriber(deals IDatatransferHandler) datatransfer.Sub identify := rm.ProviderDealIdentifier{DealID: dealProposal.ID, Receiver: channelState.Recipient()} if channelState.Status() == datatransfer.Completed { - //rm.ProviderEventComplete - log.Errorf("receive datatransfer completed status") err := deals.HandleCompleteFor(ctx, identify) if err != nil { log.Errorf("processing dt event: %s", err) diff --git a/retrievalprovider/revalidator.go b/retrievalprovider/revalidator.go index 0c8c67f4..6b0a64d4 100644 --- a/retrievalprovider/revalidator.go +++ b/retrievalprovider/revalidator.go @@ -218,8 +218,6 @@ func (pr *ProviderRevalidator) OnPullDataSent(chid datatransfer.ChannelID, addit case rm.DealStatusBlocksComplete: deal.Status = rm.DealStatusFundsNeededLastPayment case rm.DealStatusNew: - //todo will come here? - log.Errorf("receive status new on data pull sent") deal.Status = rm.DealStatusFundsNeededUnseal } diff --git a/utils/selectors.go b/utils/selectors.go new file mode 100644 index 00000000..7d40ba6d --- /dev/null +++ b/utils/selectors.go @@ -0,0 +1,89 @@ +package utils + +import ( + "bytes" + "context" + "fmt" + "io" + + // must be imported to init() raw-codec support + _ "github.com/ipld/go-ipld-prime/codec/raw" + + "github.com/ipfs/go-cid" + mdagipld "github.com/ipfs/go-ipld-format" + dagpb "github.com/ipld/go-codec-dagpb" + "github.com/ipld/go-ipld-prime" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + basicnode "github.com/ipld/go-ipld-prime/node/basic" + "github.com/ipld/go-ipld-prime/traversal" + "github.com/ipld/go-ipld-prime/traversal/selector" + selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" +) + +func TraverseDag( + ctx context.Context, + ds mdagipld.DAGService, + startFrom cid.Cid, + optionalSelector ipld.Node, + visitCallback traversal.AdvVisitFn, +) error { + + if optionalSelector == nil { + optionalSelector = selectorparse.CommonSelector_MatchAllRecursively + } + + parsedSelector, err := selector.ParseSelector(optionalSelector) + if err != nil { + return err + } + + // not sure what this is for TBH: we also provide ctx in &traversal.Config{} + linkContext := ipld.LinkContext{Ctx: ctx} + + // this is what allows us to understand dagpb + nodePrototypeChooser := dagpb.AddSupportToChooser( + func(ipld.Link, ipld.LinkContext) (ipld.NodePrototype, error) { + return basicnode.Prototype.Any, nil + }, + ) + + // this is how we implement GETs + linkSystem := cidlink.DefaultLinkSystem() + linkSystem.StorageReadOpener = func(lctx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) { + cl, isCid := lnk.(cidlink.Link) + if !isCid { + return nil, fmt.Errorf("unexpected link type %#v", lnk) + } + + node, err := ds.Get(lctx.Ctx, cl.Cid) + if err != nil { + return nil, err + } + + return bytes.NewBuffer(node.RawData()), nil + } + + // this is how we pull the start node out of the DS + startLink := cidlink.Link{Cid: startFrom} + startNodePrototype, err := nodePrototypeChooser(startLink, linkContext) + if err != nil { + return err + } + startNode, err := linkSystem.Load( + linkContext, + startLink, + startNodePrototype, + ) + if err != nil { + return err + } + + // this is the actual execution, invoking the supplied callback + return traversal.Progress{ + Cfg: &traversal.Config{ + Ctx: ctx, + LinkSystem: linkSystem, + LinkTargetNodePrototypeChooser: nodePrototypeChooser, + }, + }.WalkAdv(startNode, parsedSelector, visitCallback) +}