From 27dab231d372cf162c1f157be9448d6a1d92cbad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Mur=C3=A9?= Date: Thu, 4 May 2023 13:50:02 +0200 Subject: [PATCH] pin: follow async pinner changes See https://github.com/ipfs/boxo/pull/290 This PR follow the changes in the Pinner to make listing recursive and direct pins asynchronous, which in turns allow pin/ls to build and emit results without having to wait anything, or accumulate too much in memory. Note: there is a tradeoff for pin/ls?type=all: - keep the recursive pins in memory (which I chose) - ask the pinner twice for the recursive pins, and limit memory usage Also, follow the changes in the GC with similar benefit of not having to wait the full pin list. Add a test. Also, follow the changes in pin.Verify. --- core/commands/pin/pin.go | 14 ++--- core/coreapi/pin.go | 132 ++++++++++++++++++++++----------------- gc/gc.go | 71 ++++++++++++--------- go.mod | 3 + go.sum | 4 +- 5 files changed, 128 insertions(+), 96 deletions(-) diff --git a/core/commands/pin/pin.go b/core/commands/pin/pin.go index 6650fb93dd72..94072864c0eb 100644 --- a/core/commands/pin/pin.go +++ b/core/commands/pin/pin.go @@ -675,10 +675,6 @@ func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc ci bs := n.Blocks.Blockstore() DAG := dag.NewDAGService(bserv.New(bs, offline.Exchange(bs))) getLinks := dag.GetLinksWithDAG(DAG) - recPins, err := n.Pinning.RecursiveKeys(ctx) - if err != nil { - return nil, err - } var checkPin func(root cid.Cid) PinStatus checkPin = func(root cid.Cid) PinStatus { @@ -722,11 +718,15 @@ func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc ci out := make(chan interface{}) go func() { defer close(out) - for _, cid := range recPins { - pinStatus := checkPin(cid) + for p := range n.Pinning.RecursiveKeys(ctx) { + if p.Err != nil { + out <- p.Err + return + } + pinStatus := checkPin(p.C) if !pinStatus.Ok || opts.includeOk { select { - case out <- &PinVerifyRes{enc.Encode(cid), pinStatus}: + case out <- &PinVerifyRes{enc.Encode(p.C), pinStatus}: case <-ctx.Done(): return } diff --git a/core/coreapi/pin.go b/core/coreapi/pin.go index 4aea8dfd387d..98be600b2e4d 100644 --- a/core/coreapi/pin.go +++ b/core/coreapi/pin.go @@ -12,9 +12,10 @@ import ( "github.com/ipfs/boxo/ipld/merkledag" pin "github.com/ipfs/boxo/pinning/pinner" "github.com/ipfs/go-cid" - "github.com/ipfs/kubo/tracing" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + + "github.com/ipfs/kubo/tracing" ) type PinAPI CoreAPI @@ -156,6 +157,7 @@ func (api *PinAPI) Update(ctx context.Context, from path.Path, to path.Path, opt } type pinStatus struct { + err error cid cid.Cid ok bool badNodes []coreiface.BadPinNode @@ -175,6 +177,10 @@ func (s *pinStatus) BadNodes() []coreiface.BadPinNode { return s.badNodes } +func (s *pinStatus) Err() error { + return s.err +} + func (n *badNode) Path() path.Resolved { return n.path } @@ -191,10 +197,6 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, erro bs := api.blockstore DAG := merkledag.NewDAGService(bserv.New(bs, offline.Exchange(bs))) getLinks := merkledag.GetLinksWithDAG(DAG) - recPins, err := api.pinning.RecursiveKeys(ctx) - if err != nil { - return nil, err - } var checkPin func(root cid.Cid) *pinStatus checkPin = func(root cid.Cid) *pinStatus { @@ -229,8 +231,18 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, erro out := make(chan coreiface.PinStatus) go func() { defer close(out) - for _, c := range recPins { - out <- checkPin(c) + for p := range api.pinning.RecursiveKeys(ctx) { + var res *pinStatus + if p.Err != nil { + res = &pinStatus{err: p.Err} + } else { + res = checkPin(p.C) + } + select { + case <-ctx.Done(): + return + case out <- res: + } } }() @@ -262,63 +274,57 @@ func (p *pinInfo) Err() error { func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreiface.Pin { out := make(chan coreiface.Pin, 1) - keys := cid.NewSet() - - AddToResultKeys := func(keyList []cid.Cid, typeStr string) error { - for _, c := range keyList { - if keys.Visit(c) { - select { - case out <- &pinInfo{ - pinType: typeStr, - path: path.IpldPath(c), - }: - case <-ctx.Done(): - return ctx.Err() - } + emittedSet := cid.NewSet() + + AddToResultKeys := func(c cid.Cid, typeStr string) error { + if emittedSet.Visit(c) { + select { + case out <- &pinInfo{ + pinType: typeStr, + path: path.IpldPath(c), + }: + case <-ctx.Done(): + return ctx.Err() } } return nil } - VisitKeys := func(keyList []cid.Cid) { - for _, c := range keyList { - keys.Visit(c) - } - } - go func() { defer close(out) - var dkeys, rkeys []cid.Cid + var rkeys []cid.Cid var err error if typeStr == "recursive" || typeStr == "all" { - rkeys, err = api.pinning.RecursiveKeys(ctx) - if err != nil { - out <- &pinInfo{err: err} - return - } - if err = AddToResultKeys(rkeys, "recursive"); err != nil { - out <- &pinInfo{err: err} - return + for streamedCid := range api.pinning.RecursiveKeys(ctx) { + if streamedCid.Err != nil { + out <- &pinInfo{err: streamedCid.Err} + return + } + if err = AddToResultKeys(streamedCid.C, "recursive"); err != nil { + out <- &pinInfo{err: err} + return + } } } if typeStr == "direct" || typeStr == "all" { - dkeys, err = api.pinning.DirectKeys(ctx) - if err != nil { - out <- &pinInfo{err: err} - return - } - if err = AddToResultKeys(dkeys, "direct"); err != nil { - out <- &pinInfo{err: err} - return + for streamedCid := range api.pinning.DirectKeys(ctx) { + if streamedCid.Err != nil { + out <- &pinInfo{err: streamedCid.Err} + return + } + if err = AddToResultKeys(streamedCid.C, "direct"); err != nil { + out <- &pinInfo{err: err} + return + } } } if typeStr == "all" { - set := cid.NewSet() + walkingSet := cid.NewSet() for _, k := range rkeys { err = merkledag.Walk( ctx, merkledag.GetLinksWithDAG(api.dag), k, - set.Visit, + walkingSet.Visit, merkledag.SkipRoot(), merkledag.Concurrent(), ) if err != nil { @@ -326,7 +332,10 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreifac return } } - if err = AddToResultKeys(set.Keys(), "indirect"); err != nil { + err = walkingSet.ForEach(func(c cid.Cid) error { + return AddToResultKeys(c, "indirect") + }) + if err != nil { out <- &pinInfo{err: err} return } @@ -335,25 +344,27 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreifac // We need to first visit the direct pins that have priority // without emitting them - dkeys, err = api.pinning.DirectKeys(ctx) - if err != nil { - out <- &pinInfo{err: err} - return + for streamedCid := range api.pinning.DirectKeys(ctx) { + if streamedCid.Err != nil { + out <- &pinInfo{err: streamedCid.Err} + return + } + emittedSet.Add(streamedCid.C) } - VisitKeys(dkeys) - rkeys, err = api.pinning.RecursiveKeys(ctx) - if err != nil { - out <- &pinInfo{err: err} - return + for streamedCid := range api.pinning.RecursiveKeys(ctx) { + if streamedCid.Err != nil { + out <- &pinInfo{err: streamedCid.Err} + return + } + emittedSet.Add(streamedCid.C) } - VisitKeys(rkeys) - set := cid.NewSet() + walkingSet := cid.NewSet() for _, k := range rkeys { err = merkledag.Walk( ctx, merkledag.GetLinksWithDAG(api.dag), k, - set.Visit, + walkingSet.Visit, merkledag.SkipRoot(), merkledag.Concurrent(), ) if err != nil { @@ -361,7 +372,10 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreifac return } } - if err = AddToResultKeys(set.Keys(), "indirect"); err != nil { + err = emittedSet.ForEach(func(c cid.Cid) error { + return AddToResultKeys(c, "indirect") + }) + if err != nil { out <- &pinInfo{err: err} return } diff --git a/gc/gc.go b/gc/gc.go index 7e81ef557e7b..6df59874157d 100644 --- a/gc/gc.go +++ b/gc/gc.go @@ -154,7 +154,7 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, dstor dstore.Datastore, pn // Descendants recursively finds all the descendants of the given roots and // adds them to the given cid.Set, using the provided dag.GetLinks function // to walk the tree. -func Descendants(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots []cid.Cid) error { +func Descendants(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots <-chan pin.StreamedCid) error { verifyGetLinks := func(ctx context.Context, c cid.Cid) ([]*ipld.Link, error) { err := verifcid.ValidateCid(c) if err != nil { @@ -167,7 +167,7 @@ func Descendants(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots verboseCidError := func(err error) error { if strings.Contains(err.Error(), verifcid.ErrBelowMinimumHashLength.Error()) || strings.Contains(err.Error(), verifcid.ErrPossiblyInsecureHashFunction.Error()) { - err = fmt.Errorf("\"%s\"\nPlease run 'ipfs pin verify'"+ //nolint + err = fmt.Errorf("\"%s\"\nPlease run 'ipfs pin verify'"+ // nolint " to list insecure hashes. If you want to read them,"+ " please downgrade your go-ipfs to 0.4.13\n", err) log.Error(err) @@ -175,19 +175,29 @@ func Descendants(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots return err } - for _, c := range roots { - // Walk recursively walks the dag and adds the keys to the given set - err := dag.Walk(ctx, verifyGetLinks, c, func(k cid.Cid) bool { - return set.Visit(toCidV1(k)) - }, dag.Concurrent()) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case wrapper, ok := <-roots: + if !ok { + return nil + } + if wrapper.Err != nil { + return wrapper.Err + } - if err != nil { - err = verboseCidError(err) - return err + // Walk recursively walks the dag and adds the keys to the given set + err := dag.Walk(ctx, verifyGetLinks, wrapper.C, func(k cid.Cid) bool { + return set.Visit(toCidV1(k)) + }, dag.Concurrent()) + + if err != nil { + err = verboseCidError(err) + return err + } } } - - return nil } // toCidV1 converts any CIDv0s to CIDv1s. @@ -217,11 +227,8 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ng ipld.NodeGetter, bestEffo } return links, nil } - rkeys, err := pn.RecursiveKeys(ctx) - if err != nil { - return nil, err - } - err = Descendants(ctx, getLinks, gcs, rkeys) + rkeys := pn.RecursiveKeys(ctx) + err := Descendants(ctx, getLinks, gcs, rkeys) if err != nil { errors = true select { @@ -243,7 +250,18 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ng ipld.NodeGetter, bestEffo } return links, nil } - err = Descendants(ctx, bestEffortGetLinks, gcs, bestEffortRoots) + bestEffortRootsChan := make(chan pin.StreamedCid) + go func() { + defer close(bestEffortRootsChan) + for _, root := range bestEffortRoots { + select { + case <-ctx.Done(): + return + case bestEffortRootsChan <- pin.StreamedCid{C: root}: + } + } + }() + err = Descendants(ctx, bestEffortGetLinks, gcs, bestEffortRootsChan) if err != nil { errors = true select { @@ -253,18 +271,15 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ng ipld.NodeGetter, bestEffo } } - dkeys, err := pn.DirectKeys(ctx) - if err != nil { - return nil, err - } - for _, k := range dkeys { - gcs.Add(toCidV1(k)) + dkeys := pn.DirectKeys(ctx) + for k := range dkeys { + if k.Err != nil { + return nil, k.Err + } + gcs.Add(toCidV1(k.C)) } - ikeys, err := pn.InternalPins(ctx) - if err != nil { - return nil, err - } + ikeys := pn.InternalPins(ctx) err = Descendants(ctx, getLinks, gcs, ikeys) if err != nil { errors = true diff --git a/go.mod b/go.mod index 159bbd3e4fd3..d2b1f834f2b6 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,8 @@ module github.com/ipfs/kubo +// https://github.com/ipfs/boxo/pull/290 +replace github.com/ipfs/boxo => github.com/MichaelMure/boxo v0.0.0-20230505145003-9207501a615f + require ( bazil.org/fuse v0.0.0-20200117225306-7b5117fecadc contrib.go.opencensus.io/exporter/prometheus v0.4.2 diff --git a/go.sum b/go.sum index 16d312ef5d71..eb9a48dc04b6 100644 --- a/go.sum +++ b/go.sum @@ -49,6 +49,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/Kubuxu/go-os-helper v0.0.1 h1:EJiD2VUQyh5A9hWJLmc6iWg6yIcJ7jpBcwC8GMGXfDk= github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= +github.com/MichaelMure/boxo v0.0.0-20230505145003-9207501a615f h1:2UbpOJ6cIC43V/hIDxgvP0VLbJIk+cBofPAWmXBlSrg= +github.com/MichaelMure/boxo v0.0.0-20230505145003-9207501a615f/go.mod h1:bORAHrH6hUtDZjbzTEaLrSpTdyhHKDIpjDRT+A14B7w= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= @@ -356,8 +358,6 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= -github.com/ipfs/boxo v0.8.2-0.20230503105907-8059f183d866 h1:ThRTXD/EyoLb/jz+YW+ZlOLbjX9FyaxP0dEpgUp3cCE= -github.com/ipfs/boxo v0.8.2-0.20230503105907-8059f183d866/go.mod h1:bORAHrH6hUtDZjbzTEaLrSpTdyhHKDIpjDRT+A14B7w= github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA= github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU= github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJgCQF+KIgOPJY=