diff --git a/go.mod b/go.mod index 2867420..bf25c31 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.18 retract [v1.0.0, v1.0.1] require ( + github.com/Jorropo/channel v0.0.0-20230302184439-7ec509945d60 github.com/cenkalti/backoff v2.2.1+incompatible github.com/ipfs/go-block-format v0.0.3 github.com/ipfs/go-blockservice v0.4.0 diff --git a/go.sum b/go.sum index 5c69367..ee2b6da 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/Jorropo/channel v0.0.0-20230302184439-7ec509945d60 h1:/PGSDln05TokWwDCltOgcf/uYfuVPK8JV2hcQ3Nz2zo= +github.com/Jorropo/channel v0.0.0-20230302184439-7ec509945d60/go.mod h1:mI95Zfa5HM2woyGuaxl2tTnfZKKzPAyjwcbvmMk7hwI= github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= diff --git a/simple/reprovide.go b/simple/reprovide.go index 0225340..3b6e469 100644 --- a/simple/reprovide.go +++ b/simple/reprovide.go @@ -4,8 +4,10 @@ import ( "context" "errors" "fmt" + "io" "time" + "github.com/Jorropo/channel" "github.com/cenkalti/backoff" "github.com/ipfs/go-cid" "github.com/ipfs/go-cidutil" @@ -180,8 +182,8 @@ func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc { // Pinner interface defines how the simple.Reprovider wants to interact // with a Pinning service type Pinner interface { - DirectKeys(ctx context.Context) ([]cid.Cid, error) - RecursiveKeys(ctx context.Context) ([]cid.Cid, error) + DirectKeys(ctx context.Context) channel.ReadOnly[cid.Cid] + RecursiveKeys(ctx context.Context) channel.ReadOnly[cid.Cid] } // NewPinnedProvider returns provider supplying pinned keys @@ -217,23 +219,31 @@ func pinSet(ctx context.Context, pinning Pinner, fetchConfig fetcher.Factory, on defer cancel() defer close(set.New) - dkeys, err := pinning.DirectKeys(ctx) - if err != nil { - logR.Errorf("reprovide direct pins: %s", err) - return - } - for _, key := range dkeys { + dkeys := pinning.DirectKeys(ctx) + for { + key, err := dkeys.ReadContext(ctx) + if err == io.EOF { + break + } + if err != nil { + logR.Errorf("reprovide direct pins: %s", err) + return + } set.Visitor(ctx)(key) } - rkeys, err := pinning.RecursiveKeys(ctx) - if err != nil { - logR.Errorf("reprovide indirect pins: %s", err) - return - } + rkeys := pinning.RecursiveKeys(ctx) session := fetchConfig.NewSession(ctx) - for _, key := range rkeys { + for { + key, err := rkeys.ReadContext(ctx) + if err == io.EOF { + break + } + if err != nil { + logR.Errorf("reprovide indirect pins: %s", err) + return + } set.Visitor(ctx)(key) if !onlyRoots { err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: key}, func(res fetcher.FetchResult) error { diff --git a/simple/reprovide_test.go b/simple/reprovide_test.go index e29d6e4..9642ad6 100644 --- a/simple/reprovide_test.go +++ b/simple/reprovide_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/Jorropo/channel" blocks "github.com/ipfs/go-block-format" bsrv "github.com/ipfs/go-blockservice" "github.com/ipfs/go-cid" @@ -220,12 +221,32 @@ type mockPinner struct { direct []cid.Cid } -func (mp *mockPinner) DirectKeys(ctx context.Context) ([]cid.Cid, error) { - return mp.direct, nil +func (mp *mockPinner) DirectKeys(ctx context.Context) channel.ReadOnly[cid.Cid] { + c := channel.New[cid.Cid]() + go func() { + defer c.Close() + for _, p := range mp.direct { + err := c.WriteContext(ctx, p) + if err != nil { + return + } + } + }() + return c.ReadOnly() } -func (mp *mockPinner) RecursiveKeys(ctx context.Context) ([]cid.Cid, error) { - return mp.recursive, nil +func (mp *mockPinner) RecursiveKeys(ctx context.Context) channel.ReadOnly[cid.Cid] { + c := channel.New[cid.Cid]() + go func() { + defer c.Close() + for _, p := range mp.recursive { + err := c.WriteContext(ctx, p) + if err != nil { + return + } + } + }() + return c.ReadOnly() } func TestReprovidePinned(t *testing.T) {