From e2fc7f2fd0237afad200d7b0eec8b7a60bdc6644 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Mur=C3=A9?= Date: Fri, 2 Jun 2023 16:49:03 +0200 Subject: [PATCH] pinner: change the interface to have async pin listing The rational is that if the pin list get big, a synchronous call to get the complete list can delay handling unnecessarily. For example, when listing indirect pins, you can start walking the DAGs immediately with the first recursive pin instead of waiting for the full list. This matters even more on low power device, of if the pin list is stored remotely. * coreiface: allow to return an error not linked to a specific Cid * merkledag/test: add a DAG generator Rationale is that generating a test DAG is quite difficult, and anything that helps writing better tests is helpful. --- coreiface/pin.go | 3 + coreiface/tests/pin.go | 5 +- ipld/merkledag/test/dag_generator.go | 86 +++++++++++++++++++++ ipld/merkledag/test/dag_generator_test.go | 92 +++++++++++++++++++++++ pinning/pinner/dspinner/pin.go | 89 +++++++++++----------- pinning/pinner/dspinner/pin_test.go | 29 ++++--- pinning/pinner/pin.go | 16 ++-- provider/simple/reprovide.go | 45 +++++------ provider/simple/reprovide_test.go | 44 ++++++++--- 9 files changed, 312 insertions(+), 97 deletions(-) create mode 100644 ipld/merkledag/test/dag_generator.go create mode 100644 ipld/merkledag/test/dag_generator_test.go diff --git a/coreiface/pin.go b/coreiface/pin.go index ba5df5354..6b97c6ca5 100644 --- a/coreiface/pin.go +++ b/coreiface/pin.go @@ -27,6 +27,9 @@ type PinStatus interface { // BadNodes returns any bad (usually missing) nodes from the pin BadNodes() []BadPinNode + + // if not nil, an error happened. Everything else should be ignored. + Err() error } // BadPinNode is a node that has been marked as bad by Pin.Verify diff --git a/coreiface/tests/pin.go b/coreiface/tests/pin.go index bbf602994..4b0fea01d 100644 --- a/coreiface/tests/pin.go +++ b/coreiface/tests/pin.go @@ -198,6 +198,9 @@ func (tp *TestSuite) TestPinRecursive(t *testing.T) { } n := 0 for r := range res { + if err := r.Err(); err != nil { + t.Error(err) + } if !r.Ok() { t.Error("expected pin to be ok") } @@ -208,7 +211,7 @@ func (tp *TestSuite) TestPinRecursive(t *testing.T) { t.Errorf("unexpected verify result count: %d", n) } - //TODO: figure out a way to test verify without touching IpfsNode + // TODO: figure out a way to test verify without touching IpfsNode /* err = api.Block().Rm(ctx, p0, opt.Block.Force(true)) if err != nil { diff --git a/ipld/merkledag/test/dag_generator.go b/ipld/merkledag/test/dag_generator.go new file mode 100644 index 000000000..ec6fba091 --- /dev/null +++ b/ipld/merkledag/test/dag_generator.go @@ -0,0 +1,86 @@ +package mdutils + +import ( + "context" + "fmt" + + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + format "github.com/ipfs/go-ipld-format" + + "github.com/ipfs/boxo/ipld/merkledag" +) + +// NewDAGGenerator returns an object capable of +// producing IPLD DAGs. +func NewDAGGenerator() *DAGGenerator { + return &DAGGenerator{} +} + +// DAGGenerator generates BasicBlocks on demand. +// For each instance of DAGGenerator, each new DAG is different from the +// previous, although two different instances will produce the same, given the +// same parameters. +type DAGGenerator struct { + seq int +} + +// MakeDagBlock generate a balanced DAG with the given fanout and depth, and add the blocks to the adder. +// This adder can be for example a blockstore.Put or a blockservice.AddBlock. +func (dg *DAGGenerator) MakeDagBlock(adder func(ctx context.Context, block blocks.Block) error, fanout uint, depth uint) (c cid.Cid, allCids []cid.Cid, err error) { + return dg.MakeDagNode(func(ctx context.Context, node format.Node) error { + return adder(ctx, node.(blocks.Block)) + }, fanout, depth) +} + +// MakeDagNode generate a balanced DAG with the given fanout and depth, and add the blocks to the adder. +// This adder can be for example a DAGService.Add. +func (dg *DAGGenerator) MakeDagNode(adder func(ctx context.Context, node format.Node) error, fanout uint, depth uint) (c cid.Cid, allCids []cid.Cid, err error) { + c, _, allCids, err = dg.generate(adder, fanout, depth) + return c, allCids, err +} + +func (dg *DAGGenerator) generate(adder func(ctx context.Context, node format.Node) error, fanout uint, depth uint) (c cid.Cid, size uint64, allCids []cid.Cid, err error) { + if depth == 0 { + panic("depth should be at least 1") + } + if depth == 1 { + c, size, err = dg.encodeBlock(adder) + if err != nil { + return cid.Undef, 0, nil, err + } + return c, size, []cid.Cid{c}, nil + } + links := make([]*format.Link, fanout) + for i := uint(0); i < fanout; i++ { + root, size, children, err := dg.generate(adder, fanout, depth-1) + if err != nil { + return cid.Undef, 0, nil, err + } + links[i] = &format.Link{Cid: root, Size: size} + allCids = append(allCids, children...) + } + c, size, err = dg.encodeBlock(adder, links...) + if err != nil { + return cid.Undef, 0, nil, err + } + return c, size, append([]cid.Cid{c}, allCids...), nil +} + +func (dg *DAGGenerator) encodeBlock(adder func(ctx context.Context, node format.Node) error, links ...*format.Link) (cid.Cid, uint64, error) { + dg.seq++ + nd := &merkledag.ProtoNode{} + nd.SetData([]byte(fmt.Sprint(dg.seq))) + for i, link := range links { + err := nd.AddRawLink(fmt.Sprintf("link-%d", i), link) + if err != nil { + return cid.Undef, 0, err + } + } + err := adder(context.Background(), nd) + if err != nil { + return cid.Undef, 0, err + } + size, err := nd.Size() + return nd.Cid(), size, err +} diff --git a/ipld/merkledag/test/dag_generator_test.go b/ipld/merkledag/test/dag_generator_test.go new file mode 100644 index 000000000..0b900f8b1 --- /dev/null +++ b/ipld/merkledag/test/dag_generator_test.go @@ -0,0 +1,92 @@ +package mdutils + +import ( + "context" + "sync" + "testing" + + "github.com/ipfs/go-cid" + format "github.com/ipfs/go-ipld-format" +) + +type testDagServ struct { + mu sync.Mutex + nodes map[string]format.Node +} + +func newTestDagServ() *testDagServ { + return &testDagServ{nodes: make(map[string]format.Node)} +} + +func (d *testDagServ) Get(_ context.Context, cid cid.Cid) (format.Node, error) { + d.mu.Lock() + defer d.mu.Unlock() + if n, ok := d.nodes[cid.KeyString()]; ok { + return n, nil + } + return nil, format.ErrNotFound{Cid: cid} +} + +func (d *testDagServ) Add(_ context.Context, node format.Node) error { + d.mu.Lock() + defer d.mu.Unlock() + d.nodes[node.Cid().KeyString()] = node + return nil +} + +func TestNodesAreDifferent(t *testing.T) { + dserv := newTestDagServ() + gen := NewDAGGenerator() + + var allCids []cid.Cid + var allNodes []format.Node + + const nbDag = 5 + + for i := 0; i < nbDag; i++ { + c, cids, err := gen.MakeDagNode(dserv.Add, 5, 3) + if err != nil { + t.Fatal(err) + } + + allCids = append(allCids, cids...) + + // collect all nodes + var getChildren func(n format.Node) + getChildren = func(n format.Node) { + for _, link := range n.Links() { + n, err = dserv.Get(context.Background(), link.Cid) + if err != nil { + t.Fatal(err) + } + allNodes = append(allNodes, n) + getChildren(n) + } + } + n, err := dserv.Get(context.Background(), c) + if err != nil { + t.Fatal(err) + } + allNodes = append(allNodes, n) + getChildren(n) + + // make sure they are all different + for i, node1 := range allNodes { + for j, node2 := range allNodes { + if i != j { + if node1.Cid().String() == node2.Cid().String() { + t.Error("Found duplicate node") + } + } + } + } + } + + // expected count + if len(allNodes) != nbDag*31 { + t.Error("expected nbDag*31 nodes (1+5+5*5)") + } + if len(allCids) != nbDag*31 { + t.Error("expected nbDag*31 cids (1+5+5*5)") + } +} diff --git a/pinning/pinner/dspinner/pin.go b/pinning/pinner/dspinner/pin.go index 7441ca65b..bad23c693 100644 --- a/pinning/pinner/dspinner/pin.go +++ b/pinning/pinner/dspinner/pin.go @@ -10,8 +10,6 @@ import ( "path" "sync" - "github.com/ipfs/boxo/ipld/merkledag" - "github.com/ipfs/boxo/ipld/merkledag/dagutils" "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/query" @@ -20,6 +18,8 @@ import ( "github.com/polydawn/refmt/cbor" "github.com/polydawn/refmt/obj/atlas" + "github.com/ipfs/boxo/ipld/merkledag" + "github.com/ipfs/boxo/ipld/merkledag/dagutils" ipfspinner "github.com/ipfs/boxo/pinning/pinner" "github.com/ipfs/boxo/pinning/pinner/dsindex" ) @@ -665,61 +665,56 @@ func (p *pinner) loadPin(ctx context.Context, pid string) (*pin, error) { } // DirectKeys returns a slice containing the directly pinned keys -func (p *pinner) DirectKeys(ctx context.Context) ([]cid.Cid, error) { - p.lock.RLock() - defer p.lock.RUnlock() - - cidSet := cid.NewSet() - var e error - err := p.cidDIndex.ForEach(ctx, "", func(key, value string) bool { - var c cid.Cid - c, e = cid.Cast([]byte(key)) - if e != nil { - return false - } - cidSet.Add(c) - return true - }) - if err != nil { - return nil, err - } - if e != nil { - return nil, e - } - - return cidSet.Keys(), nil +func (p *pinner) DirectKeys(ctx context.Context) <-chan ipfspinner.StreamedCid { + return p.streamIndex(ctx, p.cidDIndex) } // RecursiveKeys returns a slice containing the recursively pinned keys -func (p *pinner) RecursiveKeys(ctx context.Context) ([]cid.Cid, error) { - p.lock.RLock() - defer p.lock.RUnlock() +func (p *pinner) RecursiveKeys(ctx context.Context) <-chan ipfspinner.StreamedCid { + return p.streamIndex(ctx, p.cidRIndex) +} - cidSet := cid.NewSet() - var e error - err := p.cidRIndex.ForEach(ctx, "", func(key, value string) bool { - var c cid.Cid - c, e = cid.Cast([]byte(key)) - if e != nil { - return false +func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan ipfspinner.StreamedCid { + out := make(chan ipfspinner.StreamedCid) + + go func() { + defer close(out) + + p.lock.RLock() + defer p.lock.RUnlock() + + cidSet := cid.NewSet() + + err := index.ForEach(ctx, "", func(key, value string) bool { + c, err := cid.Cast([]byte(key)) + if err != nil { + out <- ipfspinner.StreamedCid{Err: err} + return false + } + if !cidSet.Has(c) { + select { + case <-ctx.Done(): + return false + case out <- ipfspinner.StreamedCid{C: c}: + } + cidSet.Add(c) + } + return true + }) + if err != nil { + out <- ipfspinner.StreamedCid{Err: err} } - cidSet.Add(c) - return true - }) - if err != nil { - return nil, err - } - if e != nil { - return nil, e - } + }() - return cidSet.Keys(), nil + return out } // InternalPins returns all cids kept pinned for the internal state of the // pinner -func (p *pinner) InternalPins(ctx context.Context) ([]cid.Cid, error) { - return nil, nil +func (p *pinner) InternalPins(ctx context.Context) <-chan ipfspinner.StreamedCid { + c := make(chan ipfspinner.StreamedCid) + close(c) + return c } // Update updates a recursive pin from one cid to another. This is equivalent diff --git a/pinning/pinner/dspinner/pin_test.go b/pinning/pinner/dspinner/pin_test.go index c6164651a..bc9588395 100644 --- a/pinning/pinner/dspinner/pin_test.go +++ b/pinning/pinner/dspinner/pin_test.go @@ -12,9 +12,6 @@ import ( bs "github.com/ipfs/boxo/blockservice" mdag "github.com/ipfs/boxo/ipld/merkledag" - blockstore "github.com/ipfs/boxo/blockstore" - offline "github.com/ipfs/boxo/exchange/offline" - util "github.com/ipfs/boxo/util" cid "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/query" @@ -22,6 +19,10 @@ import ( ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log" + blockstore "github.com/ipfs/boxo/blockstore" + offline "github.com/ipfs/boxo/exchange/offline" + util "github.com/ipfs/boxo/util" + ipfspin "github.com/ipfs/boxo/pinning/pinner" ) @@ -198,10 +199,17 @@ func TestPinnerBasic(t *testing.T) { dk := d.Cid() assertPinned(t, p, dk, "pinned node not found.") - cids, err := p.RecursiveKeys(ctx) - if err != nil { - t.Fatal(err) + allCids := func(ch <-chan ipfspin.StreamedCid) (cids []cid.Cid) { + for val := range ch { + if val.Err != nil { + t.Fatal(val.Err) + } + cids = append(cids, val.C) + } + return cids } + + cids := allCids(p.RecursiveKeys(ctx)) if len(cids) != 2 { t.Error("expected 2 recursive pins") } @@ -243,10 +251,7 @@ func TestPinnerBasic(t *testing.T) { } } - cids, err = p.DirectKeys(ctx) - if err != nil { - t.Fatal(err) - } + cids = allCids(p.DirectKeys(ctx)) if len(cids) != 1 { t.Error("expected 1 direct pin") } @@ -254,9 +259,9 @@ func TestPinnerBasic(t *testing.T) { t.Error("wrong direct pin") } - cids, _ = p.InternalPins(ctx) + cids = allCids(p.InternalPins(ctx)) if len(cids) != 0 { - t.Error("shound not have internal keys") + t.Error("should not have internal keys") } err = p.Unpin(ctx, dk, false) diff --git a/pinning/pinner/pin.go b/pinning/pinner/pin.go index fcf7d764a..5151b7e64 100644 --- a/pinning/pinner/pin.go +++ b/pinning/pinner/pin.go @@ -38,7 +38,7 @@ const ( // Internal pins are cids used to keep the internal state of the pinner. Internal - // NotPinned + // NotPinned is a value to indicated that a cid is not pinned. NotPinned // Any refers to any pinned cid @@ -80,7 +80,7 @@ var ErrNotPinned = fmt.Errorf("not pinned or pinned indirectly") // A Pinner provides the necessary methods to keep track of Nodes which are // to be kept locally, according to a pin mode. In practice, a Pinner is in -// in charge of keeping the list of items from the local storage that should +// charge of keeping the list of items from the local storage that should // not be garbage-collected. type Pinner interface { // IsPinned returns whether or not the given cid is pinned @@ -119,14 +119,14 @@ type Pinner interface { Flush(ctx context.Context) error // DirectKeys returns all directly pinned cids - DirectKeys(ctx context.Context) ([]cid.Cid, error) + DirectKeys(ctx context.Context) <-chan StreamedCid // RecursiveKeys returns all recursively pinned cids - RecursiveKeys(ctx context.Context) ([]cid.Cid, error) + RecursiveKeys(ctx context.Context) <-chan StreamedCid // InternalPins returns all cids kept pinned for the internal state of the // pinner - InternalPins(ctx context.Context) ([]cid.Cid, error) + InternalPins(ctx context.Context) <-chan StreamedCid } // Pinned represents CID which has been pinned with a pinning strategy. @@ -156,3 +156,9 @@ func (p Pinned) String() string { return fmt.Sprintf("pinned: %s", modeStr) } } + +// StreamedCid encapsulate a Cid and an error for a function to return a channel of Cids. +type StreamedCid struct { + C cid.Cid + Err error +} diff --git a/provider/simple/reprovide.go b/provider/simple/reprovide.go index a29b484fc..51056ad6a 100644 --- a/provider/simple/reprovide.go +++ b/provider/simple/reprovide.go @@ -7,15 +7,17 @@ import ( "time" "github.com/cenkalti/backoff" - blocks "github.com/ipfs/boxo/blockstore" - "github.com/ipfs/boxo/fetcher" - fetcherhelpers "github.com/ipfs/boxo/fetcher/helpers" - "github.com/ipfs/boxo/verifcid" "github.com/ipfs/go-cid" "github.com/ipfs/go-cidutil" logging "github.com/ipfs/go-log" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/libp2p/go-libp2p/core/routing" + + blocks "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/fetcher" + fetcherhelpers "github.com/ipfs/boxo/fetcher/helpers" + pin "github.com/ipfs/boxo/pinning/pinner" + "github.com/ipfs/boxo/verifcid" ) var logR = logging.Logger("reprovider.simple") @@ -180,8 +182,8 @@ func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc { // Pinner interface defines how the simple.Reprovider wants to interact // with a Pinning service type Pinner interface { - DirectKeys(ctx context.Context) ([]cid.Cid, error) - RecursiveKeys(ctx context.Context) ([]cid.Cid, error) + DirectKeys(ctx context.Context) <-chan pin.StreamedCid + RecursiveKeys(ctx context.Context) <-chan pin.StreamedCid } // NewPinnedProvider returns provider supplying pinned keys @@ -217,26 +219,25 @@ func pinSet(ctx context.Context, pinning Pinner, fetchConfig fetcher.Factory, on defer cancel() defer close(set.New) - dkeys, err := pinning.DirectKeys(ctx) - if err != nil { - logR.Errorf("reprovide direct pins: %s", err) - return - } - for _, key := range dkeys { - set.Visitor(ctx)(key) - } - - rkeys, err := pinning.RecursiveKeys(ctx) - if err != nil { - logR.Errorf("reprovide indirect pins: %s", err) - return + dkeys := pinning.DirectKeys(ctx) + for wrapper := range dkeys { + if wrapper.Err != nil { + logR.Errorf("reprovide direct pins: %s", wrapper.Err) + return + } + set.Visitor(ctx)(wrapper.C) } + rkeys := pinning.RecursiveKeys(ctx) session := fetchConfig.NewSession(ctx) - for _, key := range rkeys { - set.Visitor(ctx)(key) + for wrapper := range rkeys { + if wrapper.Err != nil { + logR.Errorf("reprovide indirect pins: %s", wrapper.Err) + return + } + set.Visitor(ctx)(wrapper.C) if !onlyRoots { - err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: key}, func(res fetcher.FetchResult) error { + err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: wrapper.C}, func(res fetcher.FetchResult) error { clink, ok := res.LastBlockLink.(cidlink.Link) if ok { set.Visitor(ctx)(clink.Cid) diff --git a/provider/simple/reprovide_test.go b/provider/simple/reprovide_test.go index 8b521ae56..6641a3315 100644 --- a/provider/simple/reprovide_test.go +++ b/provider/simple/reprovide_test.go @@ -6,12 +6,6 @@ import ( "testing" "time" - bsrv "github.com/ipfs/boxo/blockservice" - blockstore "github.com/ipfs/boxo/blockstore" - offline "github.com/ipfs/boxo/exchange/offline" - bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice" - "github.com/ipfs/boxo/internal/test" - mock "github.com/ipfs/boxo/routing/mock" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" @@ -25,6 +19,14 @@ import ( "github.com/libp2p/go-libp2p/core/peer" mh "github.com/multiformats/go-multihash" + bsrv "github.com/ipfs/boxo/blockservice" + blockstore "github.com/ipfs/boxo/blockstore" + offline "github.com/ipfs/boxo/exchange/offline" + bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice" + "github.com/ipfs/boxo/internal/test" + pin "github.com/ipfs/boxo/pinning/pinner" + mock "github.com/ipfs/boxo/routing/mock" + . "github.com/ipfs/boxo/provider/simple" ) @@ -224,12 +226,34 @@ type mockPinner struct { direct []cid.Cid } -func (mp *mockPinner) DirectKeys(ctx context.Context) ([]cid.Cid, error) { - return mp.direct, nil +func (mp *mockPinner) DirectKeys(ctx context.Context) <-chan pin.StreamedCid { + out := make(chan pin.StreamedCid) + go func() { + defer close(out) + for _, c := range mp.direct { + select { + case <-ctx.Done(): + return + case out <- pin.StreamedCid{C: c}: + } + } + }() + return out } -func (mp *mockPinner) RecursiveKeys(ctx context.Context) ([]cid.Cid, error) { - return mp.recursive, nil +func (mp *mockPinner) RecursiveKeys(ctx context.Context) <-chan pin.StreamedCid { + out := make(chan pin.StreamedCid) + go func() { + defer close(out) + for _, c := range mp.recursive { + select { + case <-ctx.Done(): + return + case out <- pin.StreamedCid{C: c}: + } + } + }() + return out } func TestReprovidePinned(t *testing.T) {