Skip to content

Commit

Permalink
change the Pinner interface to have async pin listing
Browse files Browse the repository at this point in the history
The rational is that if the pin list get big, a synchronous call to get the complete list can delay handling unnecessarily. For example, when listing indirect pins, you can start walking the DAGs immediately with the first recursive pin instead of waiting for the full list.

This matters even more on low power device, of if the pin list is stored remotely.

Replace:
- ipfs/go-ipfs-pinner#24
- ipfs/go-ipfs-provider#48
  • Loading branch information
MichaelMure authored and Jorropo committed Jun 2, 2023
1 parent 20e2aae commit 071cd58
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 80 deletions.
90 changes: 43 additions & 47 deletions pinning/pinner/dspinner/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"path"
"sync"

"github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/boxo/ipld/merkledag/dagutils"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
Expand All @@ -20,6 +18,9 @@ import (
"github.com/polydawn/refmt/cbor"
"github.com/polydawn/refmt/obj/atlas"

"github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/boxo/ipld/merkledag/dagutils"

ipfspinner "github.com/ipfs/boxo/pinning/pinner"
"github.com/ipfs/boxo/pinning/pinner/dsindex"
)
Expand Down Expand Up @@ -665,61 +666,56 @@ func (p *pinner) loadPin(ctx context.Context, pid string) (*pin, error) {
}

// DirectKeys returns a slice containing the directly pinned keys
func (p *pinner) DirectKeys(ctx context.Context) ([]cid.Cid, error) {
p.lock.RLock()
defer p.lock.RUnlock()

cidSet := cid.NewSet()
var e error
err := p.cidDIndex.ForEach(ctx, "", func(key, value string) bool {
var c cid.Cid
c, e = cid.Cast([]byte(key))
if e != nil {
return false
}
cidSet.Add(c)
return true
})
if err != nil {
return nil, err
}
if e != nil {
return nil, e
}

return cidSet.Keys(), nil
func (p *pinner) DirectKeys(ctx context.Context) <-chan ipfspinner.StreamedCid {
return p.streamIndex(ctx, p.cidDIndex)
}

// RecursiveKeys returns a slice containing the recursively pinned keys
func (p *pinner) RecursiveKeys(ctx context.Context) ([]cid.Cid, error) {
p.lock.RLock()
defer p.lock.RUnlock()
func (p *pinner) RecursiveKeys(ctx context.Context) <-chan ipfspinner.StreamedCid {
return p.streamIndex(ctx, p.cidRIndex)
}

cidSet := cid.NewSet()
var e error
err := p.cidRIndex.ForEach(ctx, "", func(key, value string) bool {
var c cid.Cid
c, e = cid.Cast([]byte(key))
if e != nil {
return false
func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan ipfspinner.StreamedCid {
out := make(chan ipfspinner.StreamedCid)

go func() {
defer close(out)

p.lock.RLock()
defer p.lock.RUnlock()

cidSet := cid.NewSet()

err := index.ForEach(ctx, "", func(key, value string) bool {
c, err := cid.Cast([]byte(key))
if err != nil {
out <- ipfspinner.StreamedCid{Err: err}
return false
}
if !cidSet.Has(c) {
select {
case <-ctx.Done():
return false
case out <- ipfspinner.StreamedCid{C: c}:
}
cidSet.Add(c)
}
return true
})
if err != nil {
out <- ipfspinner.StreamedCid{Err: err}
}
cidSet.Add(c)
return true
})
if err != nil {
return nil, err
}
if e != nil {
return nil, e
}
}()

return cidSet.Keys(), nil
return out
}

// InternalPins returns all cids kept pinned for the internal state of the
// pinner
func (p *pinner) InternalPins(ctx context.Context) ([]cid.Cid, error) {
return nil, nil
func (p *pinner) InternalPins(ctx context.Context) <-chan ipfspinner.StreamedCid {
c := make(chan ipfspinner.StreamedCid)
close(c)
return c
}

// Update updates a recursive pin from one cid to another. This is equivalent
Expand Down
33 changes: 21 additions & 12 deletions pinning/pinner/dspinner/pin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,17 @@ import (
bs "github.com/ipfs/boxo/blockservice"
mdag "github.com/ipfs/boxo/ipld/merkledag"

blockstore "github.com/ipfs/boxo/blockstore"
offline "github.com/ipfs/boxo/exchange/offline"
util "github.com/ipfs/boxo/util"
cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
dssync "github.com/ipfs/go-datastore/sync"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"

blockstore "github.com/ipfs/boxo/blockstore"
offline "github.com/ipfs/boxo/exchange/offline"
util "github.com/ipfs/boxo/util"

ipfspin "github.com/ipfs/boxo/pinning/pinner"
)

Expand Down Expand Up @@ -198,10 +199,21 @@ func TestPinnerBasic(t *testing.T) {
dk := d.Cid()
assertPinned(t, p, dk, "pinned node not found.")

cids, err := p.RecursiveKeys(ctx)
if err != nil {
t.Fatal(err)
allCids := func(ch <-chan ipfspin.StreamedCid) (cids []cid.Cid) {
for {
val, ok := <-ch
if !ok {
break
}
if val.Err != nil {
t.Fatal(val.Err)
}
cids = append(cids, val.C)
}
return cids
}

cids := allCids(p.RecursiveKeys(ctx))
if len(cids) != 2 {
t.Error("expected 2 recursive pins")
}
Expand Down Expand Up @@ -243,20 +255,17 @@ func TestPinnerBasic(t *testing.T) {
}
}

cids, err = p.DirectKeys(ctx)
if err != nil {
t.Fatal(err)
}
cids = allCids(p.DirectKeys(ctx))
if len(cids) != 1 {
t.Error("expected 1 direct pin")
}
if cids[0] != ak {
t.Error("wrong direct pin")
}

cids, _ = p.InternalPins(ctx)
cids = allCids(p.InternalPins(ctx))
if len(cids) != 0 {
t.Error("shound not have internal keys")
t.Error("should not have internal keys")
}

err = p.Unpin(ctx, dk, false)
Expand Down
16 changes: 11 additions & 5 deletions pinning/pinner/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (
// Internal pins are cids used to keep the internal state of the pinner.
Internal

// NotPinned
// NotPinned is a value to indicated that a cid is not pinned.
NotPinned

// Any refers to any pinned cid
Expand Down Expand Up @@ -80,7 +80,7 @@ var ErrNotPinned = fmt.Errorf("not pinned or pinned indirectly")

// A Pinner provides the necessary methods to keep track of Nodes which are
// to be kept locally, according to a pin mode. In practice, a Pinner is in
// in charge of keeping the list of items from the local storage that should
// charge of keeping the list of items from the local storage that should
// not be garbage-collected.
type Pinner interface {
// IsPinned returns whether or not the given cid is pinned
Expand Down Expand Up @@ -119,14 +119,14 @@ type Pinner interface {
Flush(ctx context.Context) error

// DirectKeys returns all directly pinned cids
DirectKeys(ctx context.Context) ([]cid.Cid, error)
DirectKeys(ctx context.Context) <-chan StreamedCid

// RecursiveKeys returns all recursively pinned cids
RecursiveKeys(ctx context.Context) ([]cid.Cid, error)
RecursiveKeys(ctx context.Context) <-chan StreamedCid

// InternalPins returns all cids kept pinned for the internal state of the
// pinner
InternalPins(ctx context.Context) ([]cid.Cid, error)
InternalPins(ctx context.Context) <-chan StreamedCid
}

// Pinned represents CID which has been pinned with a pinning strategy.
Expand Down Expand Up @@ -156,3 +156,9 @@ func (p Pinned) String() string {
return fmt.Sprintf("pinned: %s", modeStr)
}
}

// StreamedCid encapsulate a Cid and an error for a function to return a channel of Cids.
type StreamedCid struct {
C cid.Cid
Err error
}
31 changes: 15 additions & 16 deletions provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,26 +79,25 @@ func pinSet(ctx context.Context, pinning pin.Pinner, fetchConfig fetcher.Factory
defer cancel()
defer close(set.New)

dkeys, err := pinning.DirectKeys(ctx)
if err != nil {
logR.Errorf("reprovide direct pins: %s", err)
return
}
for _, key := range dkeys {
set.Visitor(ctx)(key)
}

rkeys, err := pinning.RecursiveKeys(ctx)
if err != nil {
logR.Errorf("reprovide indirect pins: %s", err)
return
dkeys := pinning.DirectKeys(ctx)
for wrapper := range dkeys {
if wrapper.Err != nil {
logR.Errorf("reprovide direct pins: %s", wrapper.Err)
return
}
set.Visitor(ctx)(wrapper.C)
}

rkeys := pinning.RecursiveKeys(ctx)
session := fetchConfig.NewSession(ctx)
for _, key := range rkeys {
set.Visitor(ctx)(key)
for wrapper := range rkeys {
if wrapper.Err != nil {
logR.Errorf("reprovide indirect pins: %s", wrapper.Err)
return
}
set.Visitor(ctx)(wrapper.C)
if !onlyRoots {
err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: key}, func(res fetcher.FetchResult) error {
err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: wrapper.C}, func(res fetcher.FetchResult) error {
clink, ok := res.LastBlockLink.(cidlink.Link)
if ok {
set.Visitor(ctx)(clink.Cid)
Expand Down

0 comments on commit 071cd58

Please sign in to comment.