From a197125b8f3f85cc5070678f19b4d5853f3ff4d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Mur=C3=A9?= Date: Thu, 4 May 2023 13:50:02 +0200 Subject: [PATCH 1/2] pin: follow async pinner changes See https://github.com/ipfs/boxo/pull/290 This PR follow the changes in the Pinner to make listing recursive and direct pins asynchronous, which in turns allow pin/ls to build and emit results without having to wait anything, or accumulate too much in memory. Note: there is a tradeoff for pin/ls?type=all: - keep the recursive pins in memory (which I chose) - ask the pinner twice for the recursive pins, and limit memory usage Also, follow the changes in the GC with similar benefit of not having to wait the full pin list. Add a test. Also, follow the changes in pin.Verify. --- core/commands/pin/pin.go | 14 ++--- core/coreapi/pin.go | 132 ++++++++++++++++++++++----------------- gc/gc.go | 71 ++++++++++++--------- gc/gc_test.go | 96 ++++++++++++++++++++++++++++ go.mod | 3 + go.sum | 4 +- 6 files changed, 224 insertions(+), 96 deletions(-) create mode 100644 gc/gc_test.go diff --git a/core/commands/pin/pin.go b/core/commands/pin/pin.go index 6650fb93dd7..94072864c0e 100644 --- a/core/commands/pin/pin.go +++ b/core/commands/pin/pin.go @@ -675,10 +675,6 @@ func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc ci bs := n.Blocks.Blockstore() DAG := dag.NewDAGService(bserv.New(bs, offline.Exchange(bs))) getLinks := dag.GetLinksWithDAG(DAG) - recPins, err := n.Pinning.RecursiveKeys(ctx) - if err != nil { - return nil, err - } var checkPin func(root cid.Cid) PinStatus checkPin = func(root cid.Cid) PinStatus { @@ -722,11 +718,15 @@ func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc ci out := make(chan interface{}) go func() { defer close(out) - for _, cid := range recPins { - pinStatus := checkPin(cid) + for p := range n.Pinning.RecursiveKeys(ctx) { + if p.Err != nil { + out <- p.Err + return + } + pinStatus := checkPin(p.C) if !pinStatus.Ok || opts.includeOk { select { - case out <- &PinVerifyRes{enc.Encode(cid), pinStatus}: + case out <- &PinVerifyRes{enc.Encode(p.C), pinStatus}: case <-ctx.Done(): return } diff --git a/core/coreapi/pin.go b/core/coreapi/pin.go index 4aea8dfd387..98be600b2e4 100644 --- a/core/coreapi/pin.go +++ b/core/coreapi/pin.go @@ -12,9 +12,10 @@ import ( "github.com/ipfs/boxo/ipld/merkledag" pin "github.com/ipfs/boxo/pinning/pinner" "github.com/ipfs/go-cid" - "github.com/ipfs/kubo/tracing" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + + "github.com/ipfs/kubo/tracing" ) type PinAPI CoreAPI @@ -156,6 +157,7 @@ func (api *PinAPI) Update(ctx context.Context, from path.Path, to path.Path, opt } type pinStatus struct { + err error cid cid.Cid ok bool badNodes []coreiface.BadPinNode @@ -175,6 +177,10 @@ func (s *pinStatus) BadNodes() []coreiface.BadPinNode { return s.badNodes } +func (s *pinStatus) Err() error { + return s.err +} + func (n *badNode) Path() path.Resolved { return n.path } @@ -191,10 +197,6 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, erro bs := api.blockstore DAG := merkledag.NewDAGService(bserv.New(bs, offline.Exchange(bs))) getLinks := merkledag.GetLinksWithDAG(DAG) - recPins, err := api.pinning.RecursiveKeys(ctx) - if err != nil { - return nil, err - } var checkPin func(root cid.Cid) *pinStatus checkPin = func(root cid.Cid) *pinStatus { @@ -229,8 +231,18 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, erro out := make(chan coreiface.PinStatus) go func() { defer close(out) - for _, c := range recPins { - out <- checkPin(c) + for p := range api.pinning.RecursiveKeys(ctx) { + var res *pinStatus + if p.Err != nil { + res = &pinStatus{err: p.Err} + } else { + res = checkPin(p.C) + } + select { + case <-ctx.Done(): + return + case out <- res: + } } }() @@ -262,63 +274,57 @@ func (p *pinInfo) Err() error { func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreiface.Pin { out := make(chan coreiface.Pin, 1) - keys := cid.NewSet() - - AddToResultKeys := func(keyList []cid.Cid, typeStr string) error { - for _, c := range keyList { - if keys.Visit(c) { - select { - case out <- &pinInfo{ - pinType: typeStr, - path: path.IpldPath(c), - }: - case <-ctx.Done(): - return ctx.Err() - } + emittedSet := cid.NewSet() + + AddToResultKeys := func(c cid.Cid, typeStr string) error { + if emittedSet.Visit(c) { + select { + case out <- &pinInfo{ + pinType: typeStr, + path: path.IpldPath(c), + }: + case <-ctx.Done(): + return ctx.Err() } } return nil } - VisitKeys := func(keyList []cid.Cid) { - for _, c := range keyList { - keys.Visit(c) - } - } - go func() { defer close(out) - var dkeys, rkeys []cid.Cid + var rkeys []cid.Cid var err error if typeStr == "recursive" || typeStr == "all" { - rkeys, err = api.pinning.RecursiveKeys(ctx) - if err != nil { - out <- &pinInfo{err: err} - return - } - if err = AddToResultKeys(rkeys, "recursive"); err != nil { - out <- &pinInfo{err: err} - return + for streamedCid := range api.pinning.RecursiveKeys(ctx) { + if streamedCid.Err != nil { + out <- &pinInfo{err: streamedCid.Err} + return + } + if err = AddToResultKeys(streamedCid.C, "recursive"); err != nil { + out <- &pinInfo{err: err} + return + } } } if typeStr == "direct" || typeStr == "all" { - dkeys, err = api.pinning.DirectKeys(ctx) - if err != nil { - out <- &pinInfo{err: err} - return - } - if err = AddToResultKeys(dkeys, "direct"); err != nil { - out <- &pinInfo{err: err} - return + for streamedCid := range api.pinning.DirectKeys(ctx) { + if streamedCid.Err != nil { + out <- &pinInfo{err: streamedCid.Err} + return + } + if err = AddToResultKeys(streamedCid.C, "direct"); err != nil { + out <- &pinInfo{err: err} + return + } } } if typeStr == "all" { - set := cid.NewSet() + walkingSet := cid.NewSet() for _, k := range rkeys { err = merkledag.Walk( ctx, merkledag.GetLinksWithDAG(api.dag), k, - set.Visit, + walkingSet.Visit, merkledag.SkipRoot(), merkledag.Concurrent(), ) if err != nil { @@ -326,7 +332,10 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreifac return } } - if err = AddToResultKeys(set.Keys(), "indirect"); err != nil { + err = walkingSet.ForEach(func(c cid.Cid) error { + return AddToResultKeys(c, "indirect") + }) + if err != nil { out <- &pinInfo{err: err} return } @@ -335,25 +344,27 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreifac // We need to first visit the direct pins that have priority // without emitting them - dkeys, err = api.pinning.DirectKeys(ctx) - if err != nil { - out <- &pinInfo{err: err} - return + for streamedCid := range api.pinning.DirectKeys(ctx) { + if streamedCid.Err != nil { + out <- &pinInfo{err: streamedCid.Err} + return + } + emittedSet.Add(streamedCid.C) } - VisitKeys(dkeys) - rkeys, err = api.pinning.RecursiveKeys(ctx) - if err != nil { - out <- &pinInfo{err: err} - return + for streamedCid := range api.pinning.RecursiveKeys(ctx) { + if streamedCid.Err != nil { + out <- &pinInfo{err: streamedCid.Err} + return + } + emittedSet.Add(streamedCid.C) } - VisitKeys(rkeys) - set := cid.NewSet() + walkingSet := cid.NewSet() for _, k := range rkeys { err = merkledag.Walk( ctx, merkledag.GetLinksWithDAG(api.dag), k, - set.Visit, + walkingSet.Visit, merkledag.SkipRoot(), merkledag.Concurrent(), ) if err != nil { @@ -361,7 +372,10 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreifac return } } - if err = AddToResultKeys(set.Keys(), "indirect"); err != nil { + err = emittedSet.ForEach(func(c cid.Cid) error { + return AddToResultKeys(c, "indirect") + }) + if err != nil { out <- &pinInfo{err: err} return } diff --git a/gc/gc.go b/gc/gc.go index 7e81ef557e7..6df59874157 100644 --- a/gc/gc.go +++ b/gc/gc.go @@ -154,7 +154,7 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, dstor dstore.Datastore, pn // Descendants recursively finds all the descendants of the given roots and // adds them to the given cid.Set, using the provided dag.GetLinks function // to walk the tree. -func Descendants(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots []cid.Cid) error { +func Descendants(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots <-chan pin.StreamedCid) error { verifyGetLinks := func(ctx context.Context, c cid.Cid) ([]*ipld.Link, error) { err := verifcid.ValidateCid(c) if err != nil { @@ -167,7 +167,7 @@ func Descendants(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots verboseCidError := func(err error) error { if strings.Contains(err.Error(), verifcid.ErrBelowMinimumHashLength.Error()) || strings.Contains(err.Error(), verifcid.ErrPossiblyInsecureHashFunction.Error()) { - err = fmt.Errorf("\"%s\"\nPlease run 'ipfs pin verify'"+ //nolint + err = fmt.Errorf("\"%s\"\nPlease run 'ipfs pin verify'"+ // nolint " to list insecure hashes. If you want to read them,"+ " please downgrade your go-ipfs to 0.4.13\n", err) log.Error(err) @@ -175,19 +175,29 @@ func Descendants(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots return err } - for _, c := range roots { - // Walk recursively walks the dag and adds the keys to the given set - err := dag.Walk(ctx, verifyGetLinks, c, func(k cid.Cid) bool { - return set.Visit(toCidV1(k)) - }, dag.Concurrent()) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case wrapper, ok := <-roots: + if !ok { + return nil + } + if wrapper.Err != nil { + return wrapper.Err + } - if err != nil { - err = verboseCidError(err) - return err + // Walk recursively walks the dag and adds the keys to the given set + err := dag.Walk(ctx, verifyGetLinks, wrapper.C, func(k cid.Cid) bool { + return set.Visit(toCidV1(k)) + }, dag.Concurrent()) + + if err != nil { + err = verboseCidError(err) + return err + } } } - - return nil } // toCidV1 converts any CIDv0s to CIDv1s. @@ -217,11 +227,8 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ng ipld.NodeGetter, bestEffo } return links, nil } - rkeys, err := pn.RecursiveKeys(ctx) - if err != nil { - return nil, err - } - err = Descendants(ctx, getLinks, gcs, rkeys) + rkeys := pn.RecursiveKeys(ctx) + err := Descendants(ctx, getLinks, gcs, rkeys) if err != nil { errors = true select { @@ -243,7 +250,18 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ng ipld.NodeGetter, bestEffo } return links, nil } - err = Descendants(ctx, bestEffortGetLinks, gcs, bestEffortRoots) + bestEffortRootsChan := make(chan pin.StreamedCid) + go func() { + defer close(bestEffortRootsChan) + for _, root := range bestEffortRoots { + select { + case <-ctx.Done(): + return + case bestEffortRootsChan <- pin.StreamedCid{C: root}: + } + } + }() + err = Descendants(ctx, bestEffortGetLinks, gcs, bestEffortRootsChan) if err != nil { errors = true select { @@ -253,18 +271,15 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ng ipld.NodeGetter, bestEffo } } - dkeys, err := pn.DirectKeys(ctx) - if err != nil { - return nil, err - } - for _, k := range dkeys { - gcs.Add(toCidV1(k)) + dkeys := pn.DirectKeys(ctx) + for k := range dkeys { + if k.Err != nil { + return nil, k.Err + } + gcs.Add(toCidV1(k.C)) } - ikeys, err := pn.InternalPins(ctx) - if err != nil { - return nil, err - } + ikeys := pn.InternalPins(ctx) err = Descendants(ctx, getLinks, gcs, ikeys) if err != nil { errors = true diff --git a/gc/gc_test.go b/gc/gc_test.go new file mode 100644 index 00000000000..4fb6dbf09be --- /dev/null +++ b/gc/gc_test.go @@ -0,0 +1,96 @@ +package gc + +import ( + "context" + "testing" + + "github.com/ipfs/boxo/blockservice" + "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/exchange/offline" + "github.com/ipfs/boxo/ipld/merkledag" + mdutils "github.com/ipfs/boxo/ipld/merkledag/test" + pin "github.com/ipfs/boxo/pinning/pinner" + "github.com/ipfs/boxo/pinning/pinner/dspinner" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/require" +) + +func TestGC(t *testing.T) { + ctx := context.Background() + + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + bs := blockstore.NewGCBlockstore(blockstore.NewBlockstore(ds), blockstore.NewGCLocker()) + bserv := blockservice.New(bs, offline.Exchange(bs)) + dserv := merkledag.NewDAGService(bserv) + pinner, err := dspinner.New(ctx, ds, dserv) + require.NoError(t, err) + + daggen := mdutils.NewDAGGenerator() + + var expectedKept []multihash.Multihash + var expectedDiscarded []multihash.Multihash + + // add some pins + for i := 0; i < 5; i++ { + // direct + root, _, err := daggen.MakeDagNode(dserv.Add, 0, 1) + require.NoError(t, err) + err = pinner.PinWithMode(ctx, root, pin.Direct) + require.NoError(t, err) + expectedKept = append(expectedKept, root.Hash()) + + // recursive + root, allCids, err := daggen.MakeDagNode(dserv.Add, 5, 2) + require.NoError(t, err) + err = pinner.PinWithMode(ctx, root, pin.Recursive) + require.NoError(t, err) + expectedKept = append(expectedKept, toMHs(allCids)...) + } + + err = pinner.Flush(ctx) + require.NoError(t, err) + + // add more dags to be GCed + for i := 0; i < 5; i++ { + _, allCids, err := daggen.MakeDagNode(dserv.Add, 5, 2) + require.NoError(t, err) + expectedDiscarded = append(expectedDiscarded, toMHs(allCids)...) + } + + // and some other as "best effort roots" + var bestEffortRoots []cid.Cid + for i := 0; i < 5; i++ { + root, allCids, err := daggen.MakeDagNode(dserv.Add, 5, 2) + require.NoError(t, err) + bestEffortRoots = append(bestEffortRoots, root) + expectedKept = append(expectedKept, toMHs(allCids)...) + } + + ch := GC(ctx, bs, ds, pinner, bestEffortRoots) + var discarded []multihash.Multihash + for res := range ch { + require.NoError(t, res.Error) + discarded = append(discarded, res.KeyRemoved.Hash()) + } + + allKeys, err := bs.AllKeysChan(ctx) + require.NoError(t, err) + var kept []multihash.Multihash + for key := range allKeys { + kept = append(kept, key.Hash()) + } + + require.ElementsMatch(t, expectedDiscarded, discarded) + require.ElementsMatch(t, expectedKept, kept) +} + +func toMHs(cids []cid.Cid) []multihash.Multihash { + res := make([]multihash.Multihash, len(cids)) + for i, c := range cids { + res[i] = c.Hash() + } + return res +} diff --git a/go.mod b/go.mod index 159bbd3e4fd..d2b1f834f2b 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,8 @@ module github.com/ipfs/kubo +// https://github.com/ipfs/boxo/pull/290 +replace github.com/ipfs/boxo => github.com/MichaelMure/boxo v0.0.0-20230505145003-9207501a615f + require ( bazil.org/fuse v0.0.0-20200117225306-7b5117fecadc contrib.go.opencensus.io/exporter/prometheus v0.4.2 diff --git a/go.sum b/go.sum index 16d312ef5d7..eb9a48dc04b 100644 --- a/go.sum +++ b/go.sum @@ -49,6 +49,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/Kubuxu/go-os-helper v0.0.1 h1:EJiD2VUQyh5A9hWJLmc6iWg6yIcJ7jpBcwC8GMGXfDk= github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= +github.com/MichaelMure/boxo v0.0.0-20230505145003-9207501a615f h1:2UbpOJ6cIC43V/hIDxgvP0VLbJIk+cBofPAWmXBlSrg= +github.com/MichaelMure/boxo v0.0.0-20230505145003-9207501a615f/go.mod h1:bORAHrH6hUtDZjbzTEaLrSpTdyhHKDIpjDRT+A14B7w= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= @@ -356,8 +358,6 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= -github.com/ipfs/boxo v0.8.2-0.20230503105907-8059f183d866 h1:ThRTXD/EyoLb/jz+YW+ZlOLbjX9FyaxP0dEpgUp3cCE= -github.com/ipfs/boxo v0.8.2-0.20230503105907-8059f183d866/go.mod h1:bORAHrH6hUtDZjbzTEaLrSpTdyhHKDIpjDRT+A14B7w= github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA= github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU= github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJgCQF+KIgOPJY= From a2c66abc52864bcd7bbb099f5959ec66557c73cd Mon Sep 17 00:00:00 2001 From: Jorropo Date: Fri, 2 Jun 2023 18:16:46 +0200 Subject: [PATCH 2/2] pinning: fix pin listings --- core/commands/pin/pin.go | 23 ++++++++++--------- core/coreapi/pin.go | 48 +++++++++++++++------------------------- 2 files changed, 31 insertions(+), 40 deletions(-) diff --git a/core/commands/pin/pin.go b/core/commands/pin/pin.go index 626c536dbd2..8312c1fb4ce 100644 --- a/core/commands/pin/pin.go +++ b/core/commands/pin/pin.go @@ -342,14 +342,17 @@ Example: } // For backward compatibility, we accumulate the pins in the same output type as before. - emit := res.Emit + var emit func(PinLsOutputWrapper) error lgcList := map[string]PinLsType{} if !stream { - emit = func(v interface{}) error { - obj := v.(*PinLsOutputWrapper) - lgcList[obj.PinLsObject.Cid] = PinLsType{Type: obj.PinLsObject.Type} + emit = func(v PinLsOutputWrapper) error { + lgcList[v.PinLsObject.Cid] = PinLsType{Type: v.PinLsObject.Type} return nil } + } else { + emit = func(v PinLsOutputWrapper) error { + return res.Emit(v) + } } if len(req.Arguments) > 0 { @@ -371,7 +374,7 @@ Example: }, Type: &PinLsOutputWrapper{}, Encoders: cmds.EncoderMap{ - cmds.JSON: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *PinLsOutputWrapper) error { + cmds.JSON: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out PinLsOutputWrapper) error { stream, _ := req.Options[pinStreamOptionName].(bool) enc := json.NewEncoder(w) @@ -382,7 +385,7 @@ Example: return enc.Encode(out.PinLsList) }), - cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *PinLsOutputWrapper) error { + cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out PinLsOutputWrapper) error { quiet, _ := req.Options[pinQuietOptionName].(bool) stream, _ := req.Options[pinStreamOptionName].(bool) @@ -432,7 +435,7 @@ type PinLsObject struct { Type string `json:",omitempty"` } -func pinLsKeys(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit func(value interface{}) error) error { +func pinLsKeys(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit func(value PinLsOutputWrapper) error) error { enc, err := cmdenv.GetCidEncoder(req) if err != nil { return err @@ -470,7 +473,7 @@ func pinLsKeys(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit fu pinType = "indirect through " + pinType } - err = emit(&PinLsOutputWrapper{ + err = emit(PinLsOutputWrapper{ PinLsObject: PinLsObject{ Type: pinType, Cid: enc.Encode(rp.Cid()), @@ -484,7 +487,7 @@ func pinLsKeys(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit fu return nil } -func pinLsAll(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit func(value interface{}) error) error { +func pinLsAll(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit func(value PinLsOutputWrapper) error) error { enc, err := cmdenv.GetCidEncoder(req) if err != nil { return err @@ -511,7 +514,7 @@ func pinLsAll(req *cmds.Request, typeStr string, api coreiface.CoreAPI, emit fun if err := p.Err(); err != nil { return err } - err = emit(&PinLsOutputWrapper{ + err = emit(PinLsOutputWrapper{ PinLsObject: PinLsObject{ Type: p.Type(), Cid: enc.Encode(p.Path().Cid()), diff --git a/core/coreapi/pin.go b/core/coreapi/pin.go index 98be600b2e4..ec2cedb83fd 100644 --- a/core/coreapi/pin.go +++ b/core/coreapi/pin.go @@ -305,6 +305,7 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreifac out <- &pinInfo{err: err} return } + rkeys = append(rkeys, streamedCid.C) } } if typeStr == "direct" || typeStr == "all" { @@ -319,27 +320,6 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreifac } } } - if typeStr == "all" { - walkingSet := cid.NewSet() - for _, k := range rkeys { - err = merkledag.Walk( - ctx, merkledag.GetLinksWithDAG(api.dag), k, - walkingSet.Visit, - merkledag.SkipRoot(), merkledag.Concurrent(), - ) - if err != nil { - out <- &pinInfo{err: err} - return - } - } - err = walkingSet.ForEach(func(c cid.Cid) error { - return AddToResultKeys(c, "indirect") - }) - if err != nil { - out <- &pinInfo{err: err} - return - } - } if typeStr == "indirect" { // We need to first visit the direct pins that have priority // without emitting them @@ -358,13 +338,28 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreifac return } emittedSet.Add(streamedCid.C) + rkeys = append(rkeys, streamedCid.C) } - + } + if typeStr == "indirect" || typeStr == "all" { walkingSet := cid.NewSet() for _, k := range rkeys { err = merkledag.Walk( ctx, merkledag.GetLinksWithDAG(api.dag), k, - walkingSet.Visit, + func(c cid.Cid) bool { + if !walkingSet.Visit(c) { + return false + } + if emittedSet.Has(c) { + return true // skipped + } + err := AddToResultKeys(c, "indirect") + if err != nil { + out <- &pinInfo{err: err} + return false + } + return true + }, merkledag.SkipRoot(), merkledag.Concurrent(), ) if err != nil { @@ -372,13 +367,6 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreifac return } } - err = emittedSet.ForEach(func(c cid.Cid) error { - return AddToResultKeys(c, "indirect") - }) - if err != nil { - out <- &pinInfo{err: err} - return - } } }()