Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
change the Pinner interface to have async Direct, Recursive and Inter…
Browse files Browse the repository at this point in the history
…nal pin listing
  • Loading branch information
MichaelMure committed Jan 20, 2023
1 parent 7e1406b commit 8a58bcf
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 56 deletions.
92 changes: 47 additions & 45 deletions dspinner/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,61 +677,63 @@ func (p *pinner) loadPin(ctx context.Context, pid string) (*pin, error) {
}

// DirectKeys returns a slice containing the directly pinned keys
func (p *pinner) DirectKeys(ctx context.Context) ([]cid.Cid, error) {
p.lock.RLock()
defer p.lock.RUnlock()

cidSet := cid.NewSet()
var e error
err := p.cidDIndex.ForEach(ctx, "", func(key, value string) bool {
var c cid.Cid
c, e = cid.Cast([]byte(key))
if e != nil {
return false
}
cidSet.Add(c)
return true
})
if err != nil {
return nil, err
}
if e != nil {
return nil, e
}

return cidSet.Keys(), nil
func (p *pinner) DirectKeys(ctx context.Context) <-chan ipfspinner.StreamedCid {
return p.streamIndex(ctx, p.cidDIndex)
}

// RecursiveKeys returns a slice containing the recursively pinned keys
func (p *pinner) RecursiveKeys(ctx context.Context) ([]cid.Cid, error) {
p.lock.RLock()
defer p.lock.RUnlock()
func (p *pinner) RecursiveKeys(ctx context.Context) <-chan ipfspinner.StreamedCid {
return p.streamIndex(ctx, p.cidRIndex)
}

cidSet := cid.NewSet()
var e error
err := p.cidRIndex.ForEach(ctx, "", func(key, value string) bool {
var c cid.Cid
c, e = cid.Cast([]byte(key))
if e != nil {
return false
func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan ipfspinner.StreamedCid {
out := make(chan ipfspinner.StreamedCid)

go func() {
defer close(out)

p.lock.RLock()
defer p.lock.RUnlock()

cidSet := cid.NewSet()

err := index.ForEach(ctx, "", func(key, value string) bool {
c, err := cid.Cast([]byte(key))
if err != nil {
select {
case <-ctx.Done():
case out <- ipfspinner.StreamedCid{Err: err}:
}
return false
}
if !cidSet.Has(c) {
select {
case <-ctx.Done():
return false
case out <- ipfspinner.StreamedCid{Cid: c}:
}
cidSet.Add(c)
}
return true
})
if err != nil {
select {
case <-ctx.Done():
case out <- ipfspinner.StreamedCid{Err: err}:
}
return
}
cidSet.Add(c)
return true
})
if err != nil {
return nil, err
}
if e != nil {
return nil, e
}
}()

return cidSet.Keys(), nil
return out
}

// InternalPins returns all cids kept pinned for the internal state of the
// pinner
func (p *pinner) InternalPins(ctx context.Context) ([]cid.Cid, error) {
return nil, nil
func (p *pinner) InternalPins(ctx context.Context) <-chan ipfspinner.StreamedCid {
out := make(chan ipfspinner.StreamedCid)
close(out)
return out
}

// Update updates a recursive pin from one cid to another. This is equivalent
Expand Down
20 changes: 12 additions & 8 deletions dspinner/pin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,17 @@ func TestPinnerBasic(t *testing.T) {
dk := d.Cid()
assertPinned(t, p, dk, "pinned node not found.")

cids, err := p.RecursiveKeys(ctx)
if err != nil {
t.Fatal(err)
allCids := func(c <-chan ipfspin.StreamedCid) (cids []cid.Cid) {
for streamedCid := range c {
if streamedCid.Err != nil {
t.Fatal(streamedCid.Err)
}
cids = append(cids, streamedCid.Cid)
}
return cids
}

cids := allCids(p.RecursiveKeys(ctx))
if len(cids) != 2 {
t.Error("expected 2 recursive pins")
}
Expand Down Expand Up @@ -243,18 +250,15 @@ func TestPinnerBasic(t *testing.T) {
}
}

cids, err = p.DirectKeys(ctx)
if err != nil {
t.Fatal(err)
}
cids = allCids(p.DirectKeys(ctx))
if len(cids) != 1 {
t.Error("expected 1 direct pin")
}
if cids[0] != ak {
t.Error("wrong direct pin")
}

cids, _ = p.InternalPins(ctx)
cids = allCids(p.InternalPins(ctx))
if len(cids) != 0 {
t.Error("shound not have internal keys")
}
Expand Down
13 changes: 10 additions & 3 deletions pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,21 @@ type Pinner interface {
Flush(ctx context.Context) error

// DirectKeys returns all directly pinned cids
DirectKeys(ctx context.Context) ([]cid.Cid, error)
DirectKeys(ctx context.Context) <-chan StreamedCid

// DirectKeys returns all recursively pinned cids
RecursiveKeys(ctx context.Context) ([]cid.Cid, error)
RecursiveKeys(ctx context.Context) <-chan StreamedCid

// InternalPins returns all cids kept pinned for the internal state of the
// pinner
InternalPins(ctx context.Context) ([]cid.Cid, error)
InternalPins(ctx context.Context) <-chan StreamedCid
}

// StreamedCid is a cid.Cid that carries an error, to be sent through a channel.
type StreamedCid struct {
// if not nil, an error happened. Everything else should be ignored.
Err error
Cid cid.Cid
}

// Pinned represents CID which has been pinned with a pinning strategy.
Expand Down

0 comments on commit 8a58bcf

Please sign in to comment.