diff --git a/core/commands/pin/pin.go b/core/commands/pin/pin.go index 6650fb93dd7..94072864c0e 100644 --- a/core/commands/pin/pin.go +++ b/core/commands/pin/pin.go @@ -675,10 +675,6 @@ func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc ci bs := n.Blocks.Blockstore() DAG := dag.NewDAGService(bserv.New(bs, offline.Exchange(bs))) getLinks := dag.GetLinksWithDAG(DAG) - recPins, err := n.Pinning.RecursiveKeys(ctx) - if err != nil { - return nil, err - } var checkPin func(root cid.Cid) PinStatus checkPin = func(root cid.Cid) PinStatus { @@ -722,11 +718,15 @@ func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc ci out := make(chan interface{}) go func() { defer close(out) - for _, cid := range recPins { - pinStatus := checkPin(cid) + for p := range n.Pinning.RecursiveKeys(ctx) { + if p.Err != nil { + out <- p.Err + return + } + pinStatus := checkPin(p.C) if !pinStatus.Ok || opts.includeOk { select { - case out <- &PinVerifyRes{enc.Encode(cid), pinStatus}: + case out <- &PinVerifyRes{enc.Encode(p.C), pinStatus}: case <-ctx.Done(): return } diff --git a/core/coreapi/pin.go b/core/coreapi/pin.go index 4aea8dfd387..98be600b2e4 100644 --- a/core/coreapi/pin.go +++ b/core/coreapi/pin.go @@ -12,9 +12,10 @@ import ( "github.com/ipfs/boxo/ipld/merkledag" pin "github.com/ipfs/boxo/pinning/pinner" "github.com/ipfs/go-cid" - "github.com/ipfs/kubo/tracing" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + + "github.com/ipfs/kubo/tracing" ) type PinAPI CoreAPI @@ -156,6 +157,7 @@ func (api *PinAPI) Update(ctx context.Context, from path.Path, to path.Path, opt } type pinStatus struct { + err error cid cid.Cid ok bool badNodes []coreiface.BadPinNode @@ -175,6 +177,10 @@ func (s *pinStatus) BadNodes() []coreiface.BadPinNode { return s.badNodes } +func (s *pinStatus) Err() error { + return s.err +} + func (n *badNode) Path() path.Resolved { return n.path } @@ -191,10 +197,6 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, erro bs := api.blockstore DAG := merkledag.NewDAGService(bserv.New(bs, offline.Exchange(bs))) getLinks := merkledag.GetLinksWithDAG(DAG) - recPins, err := api.pinning.RecursiveKeys(ctx) - if err != nil { - return nil, err - } var checkPin func(root cid.Cid) *pinStatus checkPin = func(root cid.Cid) *pinStatus { @@ -229,8 +231,18 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, erro out := make(chan coreiface.PinStatus) go func() { defer close(out) - for _, c := range recPins { - out <- checkPin(c) + for p := range api.pinning.RecursiveKeys(ctx) { + var res *pinStatus + if p.Err != nil { + res = &pinStatus{err: p.Err} + } else { + res = checkPin(p.C) + } + select { + case <-ctx.Done(): + return + case out <- res: + } } }() @@ -262,63 +274,57 @@ func (p *pinInfo) Err() error { func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreiface.Pin { out := make(chan coreiface.Pin, 1) - keys := cid.NewSet() - - AddToResultKeys := func(keyList []cid.Cid, typeStr string) error { - for _, c := range keyList { - if keys.Visit(c) { - select { - case out <- &pinInfo{ - pinType: typeStr, - path: path.IpldPath(c), - }: - case <-ctx.Done(): - return ctx.Err() - } + emittedSet := cid.NewSet() + + AddToResultKeys := func(c cid.Cid, typeStr string) error { + if emittedSet.Visit(c) { + select { + case out <- &pinInfo{ + pinType: typeStr, + path: path.IpldPath(c), + }: + case <-ctx.Done(): + return ctx.Err() } } return nil } - VisitKeys := func(keyList []cid.Cid) { - for _, c := range keyList { - keys.Visit(c) - } - } - go func() { defer close(out) - var dkeys, rkeys []cid.Cid + var rkeys []cid.Cid var err error if typeStr == "recursive" || typeStr == "all" { - rkeys, err = api.pinning.RecursiveKeys(ctx) - if err != nil { - out <- &pinInfo{err: err} - return - } - if err = AddToResultKeys(rkeys, "recursive"); err != nil { - out <- &pinInfo{err: err} - return + for streamedCid := range api.pinning.RecursiveKeys(ctx) { + if streamedCid.Err != nil { + out <- &pinInfo{err: streamedCid.Err} + return + } + if err = AddToResultKeys(streamedCid.C, "recursive"); err != nil { + out <- &pinInfo{err: err} + return + } } } if typeStr == "direct" || typeStr == "all" { - dkeys, err = api.pinning.DirectKeys(ctx) - if err != nil { - out <- &pinInfo{err: err} - return - } - if err = AddToResultKeys(dkeys, "direct"); err != nil { - out <- &pinInfo{err: err} - return + for streamedCid := range api.pinning.DirectKeys(ctx) { + if streamedCid.Err != nil { + out <- &pinInfo{err: streamedCid.Err} + return + } + if err = AddToResultKeys(streamedCid.C, "direct"); err != nil { + out <- &pinInfo{err: err} + return + } } } if typeStr == "all" { - set := cid.NewSet() + walkingSet := cid.NewSet() for _, k := range rkeys { err = merkledag.Walk( ctx, merkledag.GetLinksWithDAG(api.dag), k, - set.Visit, + walkingSet.Visit, merkledag.SkipRoot(), merkledag.Concurrent(), ) if err != nil { @@ -326,7 +332,10 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreifac return } } - if err = AddToResultKeys(set.Keys(), "indirect"); err != nil { + err = walkingSet.ForEach(func(c cid.Cid) error { + return AddToResultKeys(c, "indirect") + }) + if err != nil { out <- &pinInfo{err: err} return } @@ -335,25 +344,27 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreifac // We need to first visit the direct pins that have priority // without emitting them - dkeys, err = api.pinning.DirectKeys(ctx) - if err != nil { - out <- &pinInfo{err: err} - return + for streamedCid := range api.pinning.DirectKeys(ctx) { + if streamedCid.Err != nil { + out <- &pinInfo{err: streamedCid.Err} + return + } + emittedSet.Add(streamedCid.C) } - VisitKeys(dkeys) - rkeys, err = api.pinning.RecursiveKeys(ctx) - if err != nil { - out <- &pinInfo{err: err} - return + for streamedCid := range api.pinning.RecursiveKeys(ctx) { + if streamedCid.Err != nil { + out <- &pinInfo{err: streamedCid.Err} + return + } + emittedSet.Add(streamedCid.C) } - VisitKeys(rkeys) - set := cid.NewSet() + walkingSet := cid.NewSet() for _, k := range rkeys { err = merkledag.Walk( ctx, merkledag.GetLinksWithDAG(api.dag), k, - set.Visit, + walkingSet.Visit, merkledag.SkipRoot(), merkledag.Concurrent(), ) if err != nil { @@ -361,7 +372,10 @@ func (api *PinAPI) pinLsAll(ctx context.Context, typeStr string) <-chan coreifac return } } - if err = AddToResultKeys(set.Keys(), "indirect"); err != nil { + err = emittedSet.ForEach(func(c cid.Cid) error { + return AddToResultKeys(c, "indirect") + }) + if err != nil { out <- &pinInfo{err: err} return } diff --git a/gc/gc.go b/gc/gc.go index 7e81ef557e7..6df59874157 100644 --- a/gc/gc.go +++ b/gc/gc.go @@ -154,7 +154,7 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, dstor dstore.Datastore, pn // Descendants recursively finds all the descendants of the given roots and // adds them to the given cid.Set, using the provided dag.GetLinks function // to walk the tree. -func Descendants(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots []cid.Cid) error { +func Descendants(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots <-chan pin.StreamedCid) error { verifyGetLinks := func(ctx context.Context, c cid.Cid) ([]*ipld.Link, error) { err := verifcid.ValidateCid(c) if err != nil { @@ -167,7 +167,7 @@ func Descendants(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots verboseCidError := func(err error) error { if strings.Contains(err.Error(), verifcid.ErrBelowMinimumHashLength.Error()) || strings.Contains(err.Error(), verifcid.ErrPossiblyInsecureHashFunction.Error()) { - err = fmt.Errorf("\"%s\"\nPlease run 'ipfs pin verify'"+ //nolint + err = fmt.Errorf("\"%s\"\nPlease run 'ipfs pin verify'"+ // nolint " to list insecure hashes. If you want to read them,"+ " please downgrade your go-ipfs to 0.4.13\n", err) log.Error(err) @@ -175,19 +175,29 @@ func Descendants(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots return err } - for _, c := range roots { - // Walk recursively walks the dag and adds the keys to the given set - err := dag.Walk(ctx, verifyGetLinks, c, func(k cid.Cid) bool { - return set.Visit(toCidV1(k)) - }, dag.Concurrent()) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case wrapper, ok := <-roots: + if !ok { + return nil + } + if wrapper.Err != nil { + return wrapper.Err + } - if err != nil { - err = verboseCidError(err) - return err + // Walk recursively walks the dag and adds the keys to the given set + err := dag.Walk(ctx, verifyGetLinks, wrapper.C, func(k cid.Cid) bool { + return set.Visit(toCidV1(k)) + }, dag.Concurrent()) + + if err != nil { + err = verboseCidError(err) + return err + } } } - - return nil } // toCidV1 converts any CIDv0s to CIDv1s. @@ -217,11 +227,8 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ng ipld.NodeGetter, bestEffo } return links, nil } - rkeys, err := pn.RecursiveKeys(ctx) - if err != nil { - return nil, err - } - err = Descendants(ctx, getLinks, gcs, rkeys) + rkeys := pn.RecursiveKeys(ctx) + err := Descendants(ctx, getLinks, gcs, rkeys) if err != nil { errors = true select { @@ -243,7 +250,18 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ng ipld.NodeGetter, bestEffo } return links, nil } - err = Descendants(ctx, bestEffortGetLinks, gcs, bestEffortRoots) + bestEffortRootsChan := make(chan pin.StreamedCid) + go func() { + defer close(bestEffortRootsChan) + for _, root := range bestEffortRoots { + select { + case <-ctx.Done(): + return + case bestEffortRootsChan <- pin.StreamedCid{C: root}: + } + } + }() + err = Descendants(ctx, bestEffortGetLinks, gcs, bestEffortRootsChan) if err != nil { errors = true select { @@ -253,18 +271,15 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ng ipld.NodeGetter, bestEffo } } - dkeys, err := pn.DirectKeys(ctx) - if err != nil { - return nil, err - } - for _, k := range dkeys { - gcs.Add(toCidV1(k)) + dkeys := pn.DirectKeys(ctx) + for k := range dkeys { + if k.Err != nil { + return nil, k.Err + } + gcs.Add(toCidV1(k.C)) } - ikeys, err := pn.InternalPins(ctx) - if err != nil { - return nil, err - } + ikeys := pn.InternalPins(ctx) err = Descendants(ctx, getLinks, gcs, ikeys) if err != nil { errors = true diff --git a/gc/gc_test.go b/gc/gc_test.go new file mode 100644 index 00000000000..4fb6dbf09be --- /dev/null +++ b/gc/gc_test.go @@ -0,0 +1,96 @@ +package gc + +import ( + "context" + "testing" + + "github.com/ipfs/boxo/blockservice" + "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/exchange/offline" + "github.com/ipfs/boxo/ipld/merkledag" + mdutils "github.com/ipfs/boxo/ipld/merkledag/test" + pin "github.com/ipfs/boxo/pinning/pinner" + "github.com/ipfs/boxo/pinning/pinner/dspinner" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/require" +) + +func TestGC(t *testing.T) { + ctx := context.Background() + + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + bs := blockstore.NewGCBlockstore(blockstore.NewBlockstore(ds), blockstore.NewGCLocker()) + bserv := blockservice.New(bs, offline.Exchange(bs)) + dserv := merkledag.NewDAGService(bserv) + pinner, err := dspinner.New(ctx, ds, dserv) + require.NoError(t, err) + + daggen := mdutils.NewDAGGenerator() + + var expectedKept []multihash.Multihash + var expectedDiscarded []multihash.Multihash + + // add some pins + for i := 0; i < 5; i++ { + // direct + root, _, err := daggen.MakeDagNode(dserv.Add, 0, 1) + require.NoError(t, err) + err = pinner.PinWithMode(ctx, root, pin.Direct) + require.NoError(t, err) + expectedKept = append(expectedKept, root.Hash()) + + // recursive + root, allCids, err := daggen.MakeDagNode(dserv.Add, 5, 2) + require.NoError(t, err) + err = pinner.PinWithMode(ctx, root, pin.Recursive) + require.NoError(t, err) + expectedKept = append(expectedKept, toMHs(allCids)...) + } + + err = pinner.Flush(ctx) + require.NoError(t, err) + + // add more dags to be GCed + for i := 0; i < 5; i++ { + _, allCids, err := daggen.MakeDagNode(dserv.Add, 5, 2) + require.NoError(t, err) + expectedDiscarded = append(expectedDiscarded, toMHs(allCids)...) + } + + // and some other as "best effort roots" + var bestEffortRoots []cid.Cid + for i := 0; i < 5; i++ { + root, allCids, err := daggen.MakeDagNode(dserv.Add, 5, 2) + require.NoError(t, err) + bestEffortRoots = append(bestEffortRoots, root) + expectedKept = append(expectedKept, toMHs(allCids)...) + } + + ch := GC(ctx, bs, ds, pinner, bestEffortRoots) + var discarded []multihash.Multihash + for res := range ch { + require.NoError(t, res.Error) + discarded = append(discarded, res.KeyRemoved.Hash()) + } + + allKeys, err := bs.AllKeysChan(ctx) + require.NoError(t, err) + var kept []multihash.Multihash + for key := range allKeys { + kept = append(kept, key.Hash()) + } + + require.ElementsMatch(t, expectedDiscarded, discarded) + require.ElementsMatch(t, expectedKept, kept) +} + +func toMHs(cids []cid.Cid) []multihash.Multihash { + res := make([]multihash.Multihash, len(cids)) + for i, c := range cids { + res[i] = c.Hash() + } + return res +} diff --git a/go.mod b/go.mod index 159bbd3e4fd..d2b1f834f2b 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,8 @@ module github.com/ipfs/kubo +// https://github.com/ipfs/boxo/pull/290 +replace github.com/ipfs/boxo => github.com/MichaelMure/boxo v0.0.0-20230505145003-9207501a615f + require ( bazil.org/fuse v0.0.0-20200117225306-7b5117fecadc contrib.go.opencensus.io/exporter/prometheus v0.4.2 diff --git a/go.sum b/go.sum index 16d312ef5d7..eb9a48dc04b 100644 --- a/go.sum +++ b/go.sum @@ -49,6 +49,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/Kubuxu/go-os-helper v0.0.1 h1:EJiD2VUQyh5A9hWJLmc6iWg6yIcJ7jpBcwC8GMGXfDk= github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= +github.com/MichaelMure/boxo v0.0.0-20230505145003-9207501a615f h1:2UbpOJ6cIC43V/hIDxgvP0VLbJIk+cBofPAWmXBlSrg= +github.com/MichaelMure/boxo v0.0.0-20230505145003-9207501a615f/go.mod h1:bORAHrH6hUtDZjbzTEaLrSpTdyhHKDIpjDRT+A14B7w= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= @@ -356,8 +358,6 @@ github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1: github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= -github.com/ipfs/boxo v0.8.2-0.20230503105907-8059f183d866 h1:ThRTXD/EyoLb/jz+YW+ZlOLbjX9FyaxP0dEpgUp3cCE= -github.com/ipfs/boxo v0.8.2-0.20230503105907-8059f183d866/go.mod h1:bORAHrH6hUtDZjbzTEaLrSpTdyhHKDIpjDRT+A14B7w= github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA= github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU= github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJgCQF+KIgOPJY=