From 80d8715dbaba34a13c561beaa069d7b142ca56be Mon Sep 17 00:00:00 2001 From: Jorropo Date: Wed, 5 Apr 2023 15:18:33 +0200 Subject: [PATCH] provider: refactor to only maintain one batched implementation and add throughput callback --- go.mod | 1 - go.sum | 2 - provider/README.md | 30 -- provider/batched/system_test.go | 119 -------- provider/{ => internal}/queue/queue.go | 11 +- provider/{ => internal}/queue/queue_test.go | 39 +-- provider/noop.go | 35 +++ provider/offline.go | 29 -- provider/provider.go | 114 ++++++- provider/{batched/system.go => reprovider.go} | 225 ++++++++++---- provider/reprovider_test.go | 211 +++++++++++++ provider/simple/provider.go | 116 ------- provider/simple/provider_test.go | 166 ---------- provider/simple/reprovide.go | 255 ---------------- provider/simple/reprovide_test.go | 289 ------------------ provider/system.go | 60 ---- 16 files changed, 534 insertions(+), 1168 deletions(-) delete mode 100644 provider/README.md delete mode 100644 provider/batched/system_test.go rename provider/{ => internal}/queue/queue.go (93%) rename provider/{ => internal}/queue/queue_test.go (80%) create mode 100644 provider/noop.go delete mode 100644 provider/offline.go rename provider/{batched/system.go => reprovider.go} (58%) create mode 100644 provider/reprovider_test.go delete mode 100644 provider/simple/provider.go delete mode 100644 provider/simple/provider_test.go delete mode 100644 provider/simple/reprovide.go delete mode 100644 provider/simple/reprovide_test.go delete mode 100644 provider/system.go diff --git a/go.mod b/go.mod index 5be828081..2d08290fc 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.19 require ( github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a github.com/benbjohnson/clock v1.3.0 - github.com/cenkalti/backoff v2.2.1+incompatible github.com/cespare/xxhash/v2 v2.2.0 github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3 github.com/cskr/pubsub v1.0.2 diff --git a/go.sum b/go.sum index 86980317b..7b4a7f276 100644 --- a/go.sum +++ b/go.sum @@ -59,8 +59,6 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= -github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= -github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/cenkalti/backoff/v4 v4.2.0 h1:HN5dHm3WBOgndBH6E8V0q2jIYIR3s9yglV8k/+MN3u4= github.com/cenkalti/backoff/v4 v4.2.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= diff --git a/provider/README.md b/provider/README.md deleted file mode 100644 index 0e4f4650d..000000000 --- a/provider/README.md +++ /dev/null @@ -1,30 +0,0 @@ -## Usage - -Here's how you create, start, interact with, and stop the provider system: - -```golang -import ( - "context" - "time" - - "github.com/ipfs/boxo/provider" - "github.com/ipfs/boxo/provider/queue" - "github.com/ipfs/boxo/provider/simple" -) - -rsys := (your routing system here) -dstore := (your datastore here) -cid := (your cid to provide here) - -q := queue.NewQueue(context.Background(), "example", dstore) - -reprov := simple.NewReprovider(context.Background(), time.Hour * 12, rsys, simple.NewBlockstoreProvider(dstore)) -prov := simple.NewProvider(context.Background(), q, rsys) -sys := provider.NewSystem(prov, reprov) - -sys.Run() - -sys.Provide(cid) - -sys.Close() -``` diff --git a/provider/batched/system_test.go b/provider/batched/system_test.go deleted file mode 100644 index c8a7d7b84..000000000 --- a/provider/batched/system_test.go +++ /dev/null @@ -1,119 +0,0 @@ -package batched - -import ( - "context" - "strconv" - "sync" - "testing" - "time" - - "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" - mh "github.com/multiformats/go-multihash" - - "github.com/ipfs/boxo/internal/test" - q "github.com/ipfs/boxo/provider/queue" -) - -type mockProvideMany struct { - lk sync.Mutex - keys []mh.Multihash -} - -func (m *mockProvideMany) ProvideMany(ctx context.Context, keys []mh.Multihash) error { - m.lk.Lock() - defer m.lk.Unlock() - m.keys = keys - return nil -} - -func (m *mockProvideMany) Ready() bool { - return true -} - -func (m *mockProvideMany) GetKeys() []mh.Multihash { - m.lk.Lock() - defer m.lk.Unlock() - return m.keys[:] -} - -var _ provideMany = (*mockProvideMany)(nil) - -func TestBatched(t *testing.T) { - test.Flaky(t) - ctx := context.Background() - defer ctx.Done() - - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - queue, err := q.NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } - - provider := &mockProvideMany{} - - ctx, cancel := context.WithTimeout(ctx, time.Second*10) - defer cancel() - - const numProvides = 100 - keysToProvide := make(map[cid.Cid]int) - for i := 0; i < numProvides; i++ { - h, err := mh.Sum([]byte(strconv.Itoa(i)), mh.SHA2_256, -1) - if err != nil { - panic(err) - } - c := cid.NewCidV1(cid.Raw, h) - keysToProvide[c] = i - } - - batchSystem, err := New(provider, queue, KeyProvider(func(ctx context.Context) (<-chan cid.Cid, error) { - ch := make(chan cid.Cid) - go func() { - for k := range keysToProvide { - select { - case ch <- k: - case <-ctx.Done(): - return - } - } - }() - return ch, nil - }), initialReprovideDelay(0)) - if err != nil { - t.Fatal(err) - } - - batchSystem.Run() - - var keys []mh.Multihash - for { - if ctx.Err() != nil { - t.Fatal("test hung") - } - keys = provider.GetKeys() - if len(keys) != 0 { - break - } - time.Sleep(time.Millisecond * 100) - } - - if len(keys) != numProvides { - t.Fatalf("expected %d provider keys, got %d", numProvides, len(keys)) - } - - provMap := make(map[string]struct{}) - for _, k := range keys { - provMap[string(k)] = struct{}{} - } - - for i := 0; i < numProvides; i++ { - h, err := mh.Sum([]byte(strconv.Itoa(i)), mh.SHA2_256, -1) - if err != nil { - panic(err) - } - if _, found := provMap[string(h)]; !found { - t.Fatalf("could not find provider with value %d", i) - } - } -} diff --git a/provider/queue/queue.go b/provider/internal/queue/queue.go similarity index 93% rename from provider/queue/queue.go rename to provider/internal/queue/queue.go index 618256bbe..97dae36a0 100644 --- a/provider/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -3,6 +3,7 @@ package queue import ( "context" "fmt" + cid "github.com/ipfs/go-cid" datastore "github.com/ipfs/go-datastore" namespace "github.com/ipfs/go-datastore/namespace" @@ -20,7 +21,6 @@ var log = logging.Logger("provider.queue") type Queue struct { // used to differentiate queues in datastore // e.g. provider vs reprovider - name string ctx context.Context ds datastore.Datastore // Must be threadsafe dequeue chan cid.Cid @@ -32,11 +32,10 @@ type Queue struct { } // NewQueue creates a queue for cids -func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, error) { - namespaced := namespace.Wrap(ds, datastore.NewKey("/"+name+"/queue/")) - cancelCtx, cancel := context.WithCancel(ctx) +func NewQueue(ds datastore.Datastore) *Queue { + namespaced := namespace.Wrap(ds, datastore.NewKey("/queue")) + cancelCtx, cancel := context.WithCancel(context.Background()) q := &Queue{ - name: name, ctx: cancelCtx, ds: namespaced, dequeue: make(chan cid.Cid), @@ -45,7 +44,7 @@ func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, closed: make(chan struct{}, 1), } q.work() - return q, nil + return q } // Close stops the queue diff --git a/provider/queue/queue_test.go b/provider/internal/queue/queue_test.go similarity index 80% rename from provider/queue/queue_test.go rename to provider/internal/queue/queue_test.go index 9eacf4349..a2d9f0be4 100644 --- a/provider/queue/queue_test.go +++ b/provider/internal/queue/queue_test.go @@ -43,10 +43,8 @@ func TestBasicOperation(t *testing.T) { defer ctx.Done() ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue, err := NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } + queue := NewQueue(ds) + defer queue.Close() cids := makeCids(10) @@ -63,10 +61,8 @@ func TestMangledData(t *testing.T) { defer ctx.Done() ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue, err := NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } + queue := NewQueue(ds) + defer queue.Close() cids := makeCids(10) for _, c := range cids { @@ -75,7 +71,7 @@ func TestMangledData(t *testing.T) { // put bad data in the queue queueKey := datastore.NewKey("/test/0") - err = queue.ds.Put(ctx, queueKey, []byte("borked")) + err := queue.ds.Put(ctx, queueKey, []byte("borked")) if err != nil { t.Fatal(err) } @@ -91,10 +87,8 @@ func TestInitialization(t *testing.T) { defer ctx.Done() ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue, err := NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } + queue := NewQueue(ds) + defer queue.Close() cids := makeCids(10) for _, c := range cids { @@ -104,10 +98,8 @@ func TestInitialization(t *testing.T) { assertOrdered(cids[:5], queue, t) // make a new queue, same data - queue, err = NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } + queue = NewQueue(ds) + defer queue.Close() assertOrdered(cids[5:], queue, t) } @@ -118,21 +110,18 @@ func TestInitializationWithManyCids(t *testing.T) { defer ctx.Done() ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue, err := NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } + queue := NewQueue(ds) cids := makeCids(25) for _, c := range cids { queue.Enqueue(c) } + queue.Close() + // make a new queue, same data - queue, err = NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } + queue = NewQueue(ds) + defer queue.Close() assertOrdered(cids, queue, t) } diff --git a/provider/noop.go b/provider/noop.go new file mode 100644 index 000000000..109e62065 --- /dev/null +++ b/provider/noop.go @@ -0,0 +1,35 @@ +package provider + +import ( + "context" + + "github.com/ipfs/go-cid" +) + +type noopProvider struct{} + +var _ System = (*noopProvider)(nil) + +// NewNoopProvider creates a ProviderSystem that does nothing. +func NewNoopProvider() System { + return &noopProvider{} +} + +func (op *noopProvider) Run() { +} + +func (op *noopProvider) Close() error { + return nil +} + +func (op *noopProvider) Provide(cid.Cid) error { + return nil +} + +func (op *noopProvider) Reprovide(context.Context) error { + return nil +} + +func (op *noopProvider) Stat() (ReproviderStats, error) { + return ReproviderStats{}, nil +} diff --git a/provider/offline.go b/provider/offline.go deleted file mode 100644 index 030a70ab1..000000000 --- a/provider/offline.go +++ /dev/null @@ -1,29 +0,0 @@ -package provider - -import ( - "context" - - "github.com/ipfs/go-cid" -) - -type offlineProvider struct{} - -// NewOfflineProvider creates a ProviderSystem that does nothing -func NewOfflineProvider() System { - return &offlineProvider{} -} - -func (op *offlineProvider) Run() { -} - -func (op *offlineProvider) Close() error { - return nil -} - -func (op *offlineProvider) Provide(cid.Cid) error { - return nil -} - -func (op *offlineProvider) Reprovide(context.Context) error { - return nil -} diff --git a/provider/provider.go b/provider/provider.go index 3b9c6ba3e..9d84e9fa7 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -3,25 +3,121 @@ package provider import ( "context" + blocks "github.com/ipfs/boxo/blockstore" + "github.com/ipfs/boxo/fetcher" + fetcherhelpers "github.com/ipfs/boxo/fetcher/helpers" "github.com/ipfs/go-cid" + "github.com/ipfs/go-cidutil" + logging "github.com/ipfs/go-log" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" ) +var logR = logging.Logger("reprovider.simple") + // Provider announces blocks to the network type Provider interface { - // Run is used to begin processing the provider work - Run() // Provide takes a cid and makes an attempt to announce it to the network Provide(cid.Cid) error - // Close stops the provider - Close() error } // Reprovider reannounces blocks to the network type Reprovider interface { - // Run is used to begin processing the reprovider work and waiting for reprovide triggers - Run() - // Trigger a reprovide - Trigger(context.Context) error - // Close stops the reprovider + // Reprovide starts a new reprovide if one isn't running already. + Reprovide(context.Context) error +} + +// System defines the interface for interacting with the value +// provider system +type System interface { Close() error + Stat() (ReproviderStats, error) + Provider + Reprovider +} + +// KeyChanFunc is function streaming CIDs to pass to content routing +type KeyChanFunc func(context.Context) (<-chan cid.Cid, error) + +// NewBlockstoreProvider returns key provider using bstore.AllKeysChan +func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc { + return func(ctx context.Context) (<-chan cid.Cid, error) { + return bstore.AllKeysChan(ctx) + } +} + +// Pinner interface defines how the simple.Reprovider wants to interact +// with a Pinning service +type Pinner interface { + DirectKeys(ctx context.Context) ([]cid.Cid, error) + RecursiveKeys(ctx context.Context) ([]cid.Cid, error) +} + +// NewPinnedProvider returns provider supplying pinned keys +func NewPinnedProvider(onlyRoots bool, pinning Pinner, fetchConfig fetcher.Factory) KeyChanFunc { + return func(ctx context.Context) (<-chan cid.Cid, error) { + set, err := pinSet(ctx, pinning, fetchConfig, onlyRoots) + if err != nil { + return nil, err + } + + outCh := make(chan cid.Cid) + go func() { + defer close(outCh) + for c := range set.New { + select { + case <-ctx.Done(): + return + case outCh <- c: + } + } + + }() + + return outCh, nil + } +} + +func pinSet(ctx context.Context, pinning Pinner, fetchConfig fetcher.Factory, onlyRoots bool) (*cidutil.StreamingSet, error) { + set := cidutil.NewStreamingSet() + + go func() { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + defer close(set.New) + + dkeys, err := pinning.DirectKeys(ctx) + if err != nil { + logR.Errorf("reprovide direct pins: %s", err) + return + } + for _, key := range dkeys { + set.Visitor(ctx)(key) + } + + rkeys, err := pinning.RecursiveKeys(ctx) + if err != nil { + logR.Errorf("reprovide indirect pins: %s", err) + return + } + + session := fetchConfig.NewSession(ctx) + for _, key := range rkeys { + set.Visitor(ctx)(key) + if !onlyRoots { + err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: key}, func(res fetcher.FetchResult) error { + clink, ok := res.LastBlockLink.(cidlink.Link) + if ok { + set.Visitor(ctx)(clink.Cid) + } + return nil + }) + if err != nil { + logR.Errorf("reprovide indirect pins: %s", err) + return + } + } + } + }() + + return set, nil } diff --git a/provider/batched/system.go b/provider/reprovider.go similarity index 58% rename from provider/batched/system.go rename to provider/reprovider.go index e3cb0325a..af8b85c8d 100644 --- a/provider/batched/system.go +++ b/provider/reprovider.go @@ -1,26 +1,26 @@ -package batched +package provider import ( "context" "errors" "fmt" + "math" "strconv" "sync" "time" - provider "github.com/ipfs/boxo/provider" - "github.com/ipfs/boxo/provider/queue" - "github.com/ipfs/boxo/provider/simple" + "github.com/ipfs/boxo/provider/internal/queue" "github.com/ipfs/boxo/verifcid" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" + namespace "github.com/ipfs/go-datastore/namespace" logging "github.com/ipfs/go-log" "github.com/multiformats/go-multihash" ) var log = logging.Logger("provider.batched") -type BatchProvidingSystem struct { +type reprovider struct { ctx context.Context close context.CancelFunc closewg sync.WaitGroup @@ -29,39 +29,64 @@ type BatchProvidingSystem struct { initalReprovideDelay time.Duration initialReprovideDelaySet bool - rsys provideMany - keyProvider simple.KeyChanFunc + rsys Provide + keyProvider KeyChanFunc q *queue.Queue ds datastore.Batching reprovideCh chan cid.Cid - totalProvides, lastReprovideBatchSize int + maxReprovideBatchSize uint + + statLk sync.Mutex + totalProvides, lastReprovideBatchSize uint64 avgProvideDuration, lastReprovideDuration time.Duration + + throughputCallback ThroughputCallback + // throughputProvideCurrentCount counts how many provides has been done since the last call to throughputCallback + throughputProvideCurrentCount uint + // throughputDurationSum sums up durations between two calls to the throughputCallback + throughputDurationSum time.Duration + throughputMinimumProvides uint + + keyPrefix datastore.Key } -var _ provider.System = (*BatchProvidingSystem)(nil) +var _ System = (*reprovider)(nil) -type provideMany interface { +type Provide interface { + Provide(context.Context, cid.Cid, bool) error +} + +type ProvideMany interface { ProvideMany(ctx context.Context, keys []multihash.Multihash) error +} + +type Ready interface { Ready() bool } // Option defines the functional option type that can be used to configure // BatchProvidingSystem instances -type Option func(system *BatchProvidingSystem) error - -var lastReprovideKey = datastore.NewKey("/provider/reprovide/lastreprovide") - -func New(provider provideMany, q *queue.Queue, opts ...Option) (*BatchProvidingSystem, error) { - s := &BatchProvidingSystem{ - reprovideInterval: time.Hour * 24, - rsys: provider, - keyProvider: nil, - q: q, - ds: datastore.NewMapDatastore(), - reprovideCh: make(chan cid.Cid), +type Option func(system *reprovider) error + +var lastReprovideKey = datastore.NewKey("/reprovide/lastreprovide") +var DefaultKeyPrefix = datastore.NewKey("/provider") + +// New creates a new [System]. By default it is offline, that means it will +// enqueue tasks in ds. +// To have it publish records in the network use the [Online] option. +// If provider casts to [ProvideMany] the [ProvideMany.ProvideMany] method will +// be called instead. +// +// If provider casts to [Ready], it will wait until [Ready.Ready] is true. +func New(ds datastore.Batching, opts ...Option) (System, error) { + s := &reprovider{ + reprovideInterval: time.Hour * 24, + maxReprovideBatchSize: math.MaxUint, + keyPrefix: DefaultKeyPrefix, + reprovideCh: make(chan cid.Cid), } for _, o := range opts { @@ -89,50 +114,91 @@ func New(provider provideMany, q *queue.Queue, opts ...Option) (*BatchProvidingS } } + s.ds = namespace.Wrap(ds, s.keyPrefix) + s.q = queue.NewQueue(s.ds) + // This is after the options processing so we do not have to worry about leaking a context if there is an // initialization error processing the options ctx, cancel := context.WithCancel(context.Background()) s.ctx = ctx s.close = cancel - return s, nil -} + if s.rsys != nil { + if _, ok := s.rsys.(ProvideMany); !ok { + s.maxReprovideBatchSize = 1 + } -func Datastore(batching datastore.Batching) Option { - return func(system *BatchProvidingSystem) error { - system.ds = batching - return nil + s.run() } + + return s, nil } func ReproviderInterval(duration time.Duration) Option { - return func(system *BatchProvidingSystem) error { + return func(system *reprovider) error { system.reprovideInterval = duration return nil } } -func KeyProvider(fn simple.KeyChanFunc) Option { - return func(system *BatchProvidingSystem) error { +func KeyProvider(fn KeyChanFunc) Option { + return func(system *reprovider) error { system.keyProvider = fn return nil } } +// DatastorePrefix sets a prefix for internal state stored in the Datastore. +// Defaults to [DefaultKeyPrefix]. +func DatastorePrefix(k datastore.Key) Option { + return func(system *reprovider) error { + system.keyPrefix = k + return nil + } +} + +// ThroughputReport will fire the callback synchronously once at least limit +// multihashes have been advertised, it will then wait until a new set of at least +// limit multihashes has been advertised. +func ThroughputReport(f ThroughputCallback, minimumProvides uint) Option { + return func(system *reprovider) error { + system.throughputCallback = f + system.throughputMinimumProvides = minimumProvides + return nil + } +} + +type ThroughputCallback = func(reprovide bool, complete bool, totalKeysProvided uint, totalDuration time.Duration) + +// Online will enable the router and make it send publishes online. +// nil can be used to turn the router offline. +// You can't register multiple providers, if this option is passed multiple times +// it will error. +func Online(rsys Provide) Option { + return func(system *reprovider) error { + if system.rsys != nil { + return fmt.Errorf("trying to register two provider on the same reprovider") + } + system.rsys = rsys + return nil + } +} + func initialReprovideDelay(duration time.Duration) Option { - return func(system *BatchProvidingSystem) error { + return func(system *reprovider) error { system.initialReprovideDelaySet = true system.initalReprovideDelay = duration return nil } } -func (s *BatchProvidingSystem) Run() { - // how long we wait between the first provider we hear about and batching up the provides to send out - const pauseDetectionThreshold = time.Millisecond * 500 - // how long we are willing to collect providers for the batch after we receive the first one - const maxCollectionDuration = time.Minute * 10 +// how long we wait between the first provider we hear about and batching up the provides to send out +const pauseDetectionThreshold = time.Millisecond * 500 + +// how long we are willing to collect providers for the batch after we receive the first one +const maxCollectionDuration = time.Minute * 10 +func (s *reprovider) run() { provCh := s.q.Dequeue() s.closewg.Add(1) @@ -166,11 +232,12 @@ func (s *BatchProvidingSystem) Run() { for { performedReprovide := false + complete := false // at the start of every loop the maxCollectionDurationTimer and pauseDetectTimer should be already be // stopped and have empty channels loop: - for { + for uint(len(m)) < s.maxReprovideBatchSize { select { case <-maxCollectionDurationTimer.C: // if this timer has fired then the pause timer has started so let's stop and empty it @@ -198,6 +265,7 @@ func (s *BatchProvidingSystem) Run() { case <-pauseDetectTimer.C: // if this timer has fired then the max collection timer has started so let's stop and empty it stopAndEmptyTimer(maxCollectionDurationTimer) + complete = true break loop case <-maxCollectionDurationTimer.C: // if this timer has fired then the pause timer has started so let's stop and empty it @@ -230,41 +298,61 @@ func (s *BatchProvidingSystem) Run() { continue } - for !s.rsys.Ready() { - log.Debugf("reprovider system not ready") - select { - case <-time.After(time.Minute): - case <-s.ctx.Done(): - return + if r, ok := s.rsys.(Ready); ok { + ticker := time.NewTicker(time.Minute) + for !r.Ready() { + log.Debugf("reprovider system not ready") + select { + case <-ticker.C: + case <-s.ctx.Done(): + return + } } + ticker.Stop() } log.Debugf("starting provide of %d keys", len(keys)) start := time.Now() - err := s.rsys.ProvideMany(s.ctx, keys) + err := doProvideMany(s.ctx, s.rsys, keys) if err != nil { log.Debugf("providing failed %v", err) continue } dur := time.Since(start) - totalProvideTime := int64(s.totalProvides) * int64(s.avgProvideDuration) - recentAvgProvideDuration := time.Duration(int64(dur) / int64(len(keys))) - s.avgProvideDuration = time.Duration((totalProvideTime + int64(dur)) / int64(s.totalProvides+len(keys))) - s.totalProvides += len(keys) + totalProvideTime := time.Duration(s.totalProvides) * s.avgProvideDuration + recentAvgProvideDuration := dur / time.Duration(len(keys)) + + s.statLk.Lock() + s.avgProvideDuration = time.Duration((totalProvideTime + dur) / (time.Duration(s.totalProvides) + time.Duration(len(keys)))) + s.totalProvides += uint64(len(keys)) log.Debugf("finished providing of %d keys. It took %v with an average of %v per provide", len(keys), dur, recentAvgProvideDuration) if performedReprovide { - s.lastReprovideBatchSize = len(keys) + s.lastReprovideBatchSize = uint64(len(keys)) s.lastReprovideDuration = dur + s.statLk.Unlock() + + // Don't hold the lock while writing to disk, consumers don't need to wait on IO to read thoses fields. + if err := s.ds.Put(s.ctx, lastReprovideKey, storeTime(time.Now())); err != nil { log.Errorf("could not store last reprovide time: %v", err) } if err := s.ds.Sync(s.ctx, lastReprovideKey); err != nil { log.Errorf("could not perform sync of last reprovide time: %v", err) } + } else { + s.statLk.Unlock() + } + + s.throughputDurationSum += dur + s.throughputProvideCurrentCount += uint(len(keys)) + if s.throughputCallback != nil && s.throughputProvideCurrentCount >= s.throughputMinimumProvides { + s.throughputCallback(performedReprovide, complete, s.throughputProvideCurrentCount, s.throughputDurationSum) + s.throughputProvideCurrentCount = 0 + s.throughputDurationSum = 0 } } }() @@ -327,22 +415,22 @@ func parseTime(b []byte) (time.Time, error) { return time.Unix(0, tns), nil } -func (s *BatchProvidingSystem) Close() error { +func (s *reprovider) Close() error { s.close() err := s.q.Close() s.closewg.Wait() return err } -func (s *BatchProvidingSystem) Provide(cid cid.Cid) error { +func (s *reprovider) Provide(cid cid.Cid) error { return s.q.Enqueue(cid) } -func (s *BatchProvidingSystem) Reprovide(ctx context.Context) error { +func (s *reprovider) Reprovide(ctx context.Context) error { return s.reprovide(ctx, true) } -func (s *BatchProvidingSystem) reprovide(ctx context.Context, force bool) error { +func (s *reprovider) reprovide(ctx context.Context, force bool) error { if !s.shouldReprovide() && !force { return nil } @@ -373,7 +461,7 @@ reprovideCidLoop: return nil } -func (s *BatchProvidingSystem) getLastReprovideTime() (time.Time, error) { +func (s *reprovider) getLastReprovideTime() (time.Time, error) { val, err := s.ds.Get(s.ctx, lastReprovideKey) if errors.Is(err, datastore.ErrNotFound) { return time.Time{}, nil @@ -390,31 +478,46 @@ func (s *BatchProvidingSystem) getLastReprovideTime() (time.Time, error) { return t, nil } -func (s *BatchProvidingSystem) shouldReprovide() bool { +func (s *reprovider) shouldReprovide() bool { t, err := s.getLastReprovideTime() if err != nil { log.Debugf("getting last reprovide time failed: %s", err) return false } - if time.Since(t) < time.Duration(float64(s.reprovideInterval)*0.5) { + if time.Since(t) < s.reprovideInterval { return false } return true } -type BatchedProviderStats struct { - TotalProvides, LastReprovideBatchSize int +type ReproviderStats struct { + TotalProvides, LastReprovideBatchSize uint64 AvgProvideDuration, LastReprovideDuration time.Duration } // Stat returns various stats about this provider system -func (s *BatchProvidingSystem) Stat(ctx context.Context) (BatchedProviderStats, error) { +func (s *reprovider) Stat() (ReproviderStats, error) { // TODO: Does it matter that there is no locking around the total+average values? - return BatchedProviderStats{ + s.statLk.Lock() + defer s.statLk.Unlock() + return ReproviderStats{ TotalProvides: s.totalProvides, LastReprovideBatchSize: s.lastReprovideBatchSize, AvgProvideDuration: s.avgProvideDuration, LastReprovideDuration: s.lastReprovideDuration, }, nil } + +func doProvideMany(ctx context.Context, r Provide, keys []multihash.Multihash) error { + if many, ok := r.(ProvideMany); ok { + return many.ProvideMany(ctx, keys) + } + + for _, k := range keys { + if err := r.Provide(ctx, cid.NewCidV1(cid.Raw, k), true); err != nil { + return err + } + } + return nil +} diff --git a/provider/reprovider_test.go b/provider/reprovider_test.go new file mode 100644 index 000000000..0465c7034 --- /dev/null +++ b/provider/reprovider_test.go @@ -0,0 +1,211 @@ +package provider + +import ( + "bytes" + "context" + "strconv" + "sync" + "testing" + "time" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + mh "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/assert" +) + +type allFeatures interface { + Provide + ProvideMany + Ready +} + +type mockProvideMany struct { + delay time.Duration + lk sync.Mutex + keys []mh.Multihash + calls uint +} + +func (m *mockProvideMany) ProvideMany(ctx context.Context, keys []mh.Multihash) error { + m.lk.Lock() + defer m.lk.Unlock() + m.keys = append(m.keys, keys...) + m.calls++ + time.Sleep(time.Duration(len(keys)) * m.delay) + return nil +} + +func (m *mockProvideMany) Provide(ctx context.Context, key cid.Cid, _ bool) error { + m.lk.Lock() + defer m.lk.Unlock() + m.keys = append(m.keys, key.Hash()) + m.calls++ + time.Sleep(m.delay) + return nil +} + +func (m *mockProvideMany) Ready() bool { + return true +} + +func (m *mockProvideMany) GetKeys() (keys []mh.Multihash, calls uint) { + m.lk.Lock() + defer m.lk.Unlock() + return append([]mh.Multihash(nil), m.keys...), m.calls +} + +var _ allFeatures = (*mockProvideMany)(nil) + +type allButMany interface { + Provide + Ready +} + +type singleMockWrapper struct { + allButMany +} + +func TestReprovider(t *testing.T) { + t.Parallel() + t.Run("many", func(t *testing.T) { + t.Parallel() + testProvider(t, false) + }) + t.Run("single", func(t *testing.T) { + t.Parallel() + testProvider(t, true) + }) +} + +func testProvider(t *testing.T, singleProvide bool) { + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + + // It has to be so big because the combo of noisy CI runners + OSes that don't + // have scheduler as good as linux's one add a whole lot of jitter. + const provideDelay = time.Millisecond * 25 + orig := &mockProvideMany{ + delay: provideDelay, + } + var provider Provide = orig + if singleProvide { + provider = singleMockWrapper{orig} + } + + const numProvides = 100 + keysToProvide := make(map[cid.Cid]int) + for i := 0; i < numProvides; i++ { + h, err := mh.Sum([]byte(strconv.Itoa(i)), mh.SHA2_256, -1) + if err != nil { + panic(err) + } + c := cid.NewCidV1(cid.Raw, h) + keysToProvide[c] = i + } + + var keyWait sync.Mutex + keyWait.Lock() + batchSystem, err := New(ds, Online(provider), KeyProvider(func(ctx context.Context) (<-chan cid.Cid, error) { + defer keyWait.Unlock() + ch := make(chan cid.Cid) + go func() { + for k := range keysToProvide { + select { + case ch <- k: + case <-ctx.Done(): + return + } + } + }() + return ch, nil + }), + initialReprovideDelay(0), + ThroughputReport(func(_, complete bool, n uint, d time.Duration) { + if !singleProvide && !complete { + t.Errorf("expected a complete report but got an incomplete one") + } + + const twentyFivePercent = provideDelay / 4 + const seventyNinePercent = provideDelay - twentyFivePercent + const hundredTwentyFivePercent = provideDelay + twentyFivePercent + + avg := d / time.Duration(n) + + if !(seventyNinePercent <= avg && avg <= hundredTwentyFivePercent) { + t.Errorf("average computed duration is not within bounds, expected between %v and %v but got %v.", seventyNinePercent, hundredTwentyFivePercent, avg) + } + }, numProvides), + ) + if err != nil { + t.Fatal(err) + } + defer batchSystem.Close() + + keyWait.Lock() + time.Sleep(pauseDetectionThreshold + numProvides*provideDelay + time.Millisecond*10) // give it time to call provider after that + + keys, calls := orig.GetKeys() + if len(keys) != numProvides { + t.Fatalf("expected %d provider keys, got %d", numProvides, len(keys)) + } + if !singleProvide { + if calls != 1 { + t.Fatalf("expected 1 call batched provide call, got %d", calls) + } + } + + provMap := make(map[string]struct{}) + for _, k := range keys { + provMap[string(k)] = struct{}{} + } + + for i := 0; i < numProvides; i++ { + h, err := mh.Sum([]byte(strconv.Itoa(i)), mh.SHA2_256, -1) + if err != nil { + panic(err) + } + if _, found := provMap[string(h)]; !found { + t.Fatalf("could not find provider with value %d", i) + } + } +} + +func TestOfflineRecordsThenOnlineRepublish(t *testing.T) { + // Don't run in Parallel as this test is time sensitive. + + someHash, err := mh.Sum([]byte("Vires in Numeris!"), mh.BLAKE3, -1) + assert.NoError(t, err) + c := cid.NewCidV1(cid.Raw, someHash) + + ds := dssync.MutexWrap(datastore.NewMapDatastore()) + + // First public using an offline system to enqueue in the datastore. + sys, err := New(ds) + assert.NoError(t, err) + + err = sys.Provide(c) + assert.NoError(t, err) + + err = sys.Close() + assert.NoError(t, err) + + // Secondly restart an online datastore and we want to see this previously provided cid published. + prov := &mockProvideMany{} + sys, err = New(ds, Online(prov), initialReprovideDelay(0)) + assert.NoError(t, err) + + time.Sleep(pauseDetectionThreshold + time.Millisecond*10) // give it time to call provider after that + + err = sys.Close() + assert.NoError(t, err) + + prov.lk.Lock() + defer prov.lk.Unlock() + if len(prov.keys) != 1 { + t.Fatalf("expected to see 1 provide; got %d", len(prov.keys)) + } + if !bytes.Equal(prov.keys[0], someHash) { + t.Fatalf("keys are not equal expected %v, got %v", someHash, prov.keys[0]) + } +} diff --git a/provider/simple/provider.go b/provider/simple/provider.go deleted file mode 100644 index 63de031ad..000000000 --- a/provider/simple/provider.go +++ /dev/null @@ -1,116 +0,0 @@ -// Package simple implements structures and methods to provide blocks, -// keep track of which blocks are provided, and to allow those blocks to -// be reprovided. -package simple - -import ( - "context" - "time" - - q "github.com/ipfs/boxo/provider/queue" - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log" - "github.com/libp2p/go-libp2p/core/routing" -) - -var logP = logging.Logger("provider.simple") - -// Provider announces blocks to the network -type Provider struct { - ctx context.Context - // the CIDs for which provide announcements should be made - queue *q.Queue - // used to announce providing to the network - contentRouting routing.ContentRouting - // how long to wait for announce to complete before giving up - timeout time.Duration - // how many workers concurrently work through thhe queue - workerLimit int -} - -// Option defines the functional option type that can be used to configure -// provider instances -type Option func(*Provider) - -// WithTimeout is an option to set a timeout on a provider -func WithTimeout(timeout time.Duration) Option { - return func(p *Provider) { - p.timeout = timeout - } -} - -// MaxWorkers is an option to set the max workers on a provider -func MaxWorkers(count int) Option { - return func(p *Provider) { - p.workerLimit = count - } -} - -// NewProvider creates a provider that announces blocks to the network using a content router -func NewProvider(ctx context.Context, queue *q.Queue, contentRouting routing.ContentRouting, options ...Option) *Provider { - p := &Provider{ - ctx: ctx, - queue: queue, - contentRouting: contentRouting, - workerLimit: 8, - } - - for _, option := range options { - option(p) - } - - return p -} - -// Close stops the provider -func (p *Provider) Close() error { - return p.queue.Close() -} - -// Run workers to handle provide requests. -func (p *Provider) Run() { - p.handleAnnouncements() -} - -// Provide the given cid using specified strategy. -func (p *Provider) Provide(root cid.Cid) error { - return p.queue.Enqueue(root) -} - -// Handle all outgoing cids by providing (announcing) them -func (p *Provider) handleAnnouncements() { - for workers := 0; workers < p.workerLimit; workers++ { - go func() { - for p.ctx.Err() == nil { - select { - case <-p.ctx.Done(): - return - case c, ok := <-p.queue.Dequeue(): - if !ok { - // queue closed. - return - } - - p.doProvide(c) - } - } - }() - } -} - -func (p *Provider) doProvide(c cid.Cid) { - ctx := p.ctx - if p.timeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, p.timeout) - defer cancel() - } else { - ctx = p.ctx - } - - logP.Info("announce - start - ", c) - if err := p.contentRouting.Provide(ctx, c, true); err != nil { - logP.Warnf("Unable to provide entry: %s, %s", c, err) - } - logP.Info("announce - end - ", c) -} diff --git a/provider/simple/provider_test.go b/provider/simple/provider_test.go deleted file mode 100644 index 8734c8ff6..000000000 --- a/provider/simple/provider_test.go +++ /dev/null @@ -1,166 +0,0 @@ -package simple_test - -import ( - "context" - "math/rand" - "testing" - "time" - - "github.com/ipfs/go-cid" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/sync" - blocksutil "github.com/ipfs/go-ipfs-blocksutil" - "github.com/libp2p/go-libp2p/core/peer" - - "github.com/ipfs/boxo/internal/test" - q "github.com/ipfs/boxo/provider/queue" - - . "github.com/ipfs/boxo/provider/simple" -) - -var blockGenerator = blocksutil.NewBlockGenerator() - -type mockRouting struct { - provided chan cid.Cid -} - -func (r *mockRouting) Provide(ctx context.Context, cid cid.Cid, recursive bool) error { - select { - case r.provided <- cid: - case <-ctx.Done(): - panic("context cancelled, but shouldn't have") - } - return nil -} - -func (r *mockRouting) FindProvidersAsync(ctx context.Context, cid cid.Cid, timeout int) <-chan peer.AddrInfo { - return nil -} - -func mockContentRouting() *mockRouting { - r := mockRouting{} - r.provided = make(chan cid.Cid) - return &r -} - -func TestAnnouncement(t *testing.T) { - test.Flaky(t) - ctx := context.Background() - defer ctx.Done() - - ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue, err := q.NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } - - r := mockContentRouting() - - prov := NewProvider(ctx, queue, r) - prov.Run() - - cids := cid.NewSet() - - for i := 0; i < 100; i++ { - c := blockGenerator.Next().Cid() - cids.Add(c) - } - - go func() { - for _, c := range cids.Keys() { - err = prov.Provide(c) - // A little goroutine stirring to exercise some different states - r := rand.Intn(10) - time.Sleep(time.Microsecond * time.Duration(r)) - } - }() - - for cids.Len() > 0 { - select { - case cp := <-r.provided: - if !cids.Has(cp) { - t.Fatal("Wrong CID provided") - } - cids.Remove(cp) - case <-time.After(time.Second * 5): - t.Fatal("Timeout waiting for cids to be provided.") - } - } - prov.Close() - - select { - case cp := <-r.provided: - t.Fatal("did not expect to provide CID: ", cp) - case <-time.After(time.Second * 1): - } -} - -func TestClose(t *testing.T) { - test.Flaky(t) - ctx := context.Background() - defer ctx.Done() - - ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue, err := q.NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } - - r := mockContentRouting() - - prov := NewProvider(ctx, queue, r) - prov.Run() - - prov.Close() - - select { - case cp := <-r.provided: - t.Fatal("did not expect to provide anything, provided: ", cp) - case <-time.After(time.Second * 1): - } -} - -func TestAnnouncementTimeout(t *testing.T) { - test.Flaky(t) - ctx := context.Background() - defer ctx.Done() - - ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue, err := q.NewQueue(ctx, "test", ds) - if err != nil { - t.Fatal(err) - } - - r := mockContentRouting() - - prov := NewProvider(ctx, queue, r, WithTimeout(1*time.Second)) - prov.Run() - - cids := cid.NewSet() - - for i := 0; i < 100; i++ { - c := blockGenerator.Next().Cid() - cids.Add(c) - } - - go func() { - for _, c := range cids.Keys() { - err = prov.Provide(c) - // A little goroutine stirring to exercise some different states - r := rand.Intn(10) - time.Sleep(time.Microsecond * time.Duration(r)) - } - }() - - for cids.Len() > 0 { - select { - case cp := <-r.provided: - if !cids.Has(cp) { - t.Fatal("Wrong CID provided") - } - cids.Remove(cp) - case <-time.After(time.Second * 5): - t.Fatal("Timeout waiting for cids to be provided.") - } - } -} diff --git a/provider/simple/reprovide.go b/provider/simple/reprovide.go deleted file mode 100644 index a29b484fc..000000000 --- a/provider/simple/reprovide.go +++ /dev/null @@ -1,255 +0,0 @@ -package simple - -import ( - "context" - "errors" - "fmt" - "time" - - "github.com/cenkalti/backoff" - blocks "github.com/ipfs/boxo/blockstore" - "github.com/ipfs/boxo/fetcher" - fetcherhelpers "github.com/ipfs/boxo/fetcher/helpers" - "github.com/ipfs/boxo/verifcid" - "github.com/ipfs/go-cid" - "github.com/ipfs/go-cidutil" - logging "github.com/ipfs/go-log" - cidlink "github.com/ipld/go-ipld-prime/linking/cid" - "github.com/libp2p/go-libp2p/core/routing" -) - -var logR = logging.Logger("reprovider.simple") - -// ErrClosed is returned by Trigger when operating on a closed reprovider. -var ErrClosed = errors.New("reprovider service stopped") - -// KeyChanFunc is function streaming CIDs to pass to content routing -type KeyChanFunc func(context.Context) (<-chan cid.Cid, error) - -// Reprovider reannounces blocks to the network -type Reprovider struct { - // Reprovider context. Cancel to stop, then wait on closedCh. - ctx context.Context - cancel context.CancelFunc - closedCh chan struct{} - - // Trigger triggers a reprovide. - trigger chan chan<- error - - // The routing system to provide values through - rsys routing.ContentRouting - - keyProvider KeyChanFunc - - tick time.Duration -} - -// NewReprovider creates new Reprovider instance. -func NewReprovider(ctx context.Context, reprovideInterval time.Duration, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider { - ctx, cancel := context.WithCancel(ctx) - return &Reprovider{ - ctx: ctx, - cancel: cancel, - closedCh: make(chan struct{}), - trigger: make(chan chan<- error), - - rsys: rsys, - keyProvider: keyProvider, - tick: reprovideInterval, - } -} - -// Close the reprovider -func (rp *Reprovider) Close() error { - rp.cancel() - <-rp.closedCh - return nil -} - -// Run re-provides keys with 'tick' interval or when triggered -func (rp *Reprovider) Run() { - defer close(rp.closedCh) - - var initialReprovideCh, reprovideCh <-chan time.Time - - // If reproviding is enabled (non-zero) - if rp.tick > 0 { - reprovideTicker := time.NewTicker(rp.tick) - defer reprovideTicker.Stop() - reprovideCh = reprovideTicker.C - - // If the reprovide ticker is larger than a minute (likely), - // provide once after we've been up a minute. - // - // Don't provide _immediately_ as we might be just about to stop. - if rp.tick > time.Minute { - initialReprovideTimer := time.NewTimer(time.Minute) - defer initialReprovideTimer.Stop() - - initialReprovideCh = initialReprovideTimer.C - } - } - - var done chan<- error - for rp.ctx.Err() == nil { - select { - case <-initialReprovideCh: - case <-reprovideCh: - case done = <-rp.trigger: - case <-rp.ctx.Done(): - return - } - - err := rp.Reprovide() - - // only log if we've hit an actual error, otherwise just tell the client we're shutting down - if rp.ctx.Err() != nil { - err = ErrClosed - } else if err != nil { - logR.Errorf("failed to reprovide: %s", err) - } - - if done != nil { - if err != nil { - done <- err - } - close(done) - } - } -} - -// Reprovide registers all keys given by rp.keyProvider to libp2p content routing -func (rp *Reprovider) Reprovide() error { - keychan, err := rp.keyProvider(rp.ctx) - if err != nil { - return fmt.Errorf("failed to get key chan: %s", err) - } - for c := range keychan { - // hash security - if err := verifcid.ValidateCid(c); err != nil { - logR.Errorf("insecure hash in reprovider, %s (%s)", c, err) - continue - } - op := func() error { - err := rp.rsys.Provide(rp.ctx, c, true) - if err != nil { - logR.Debugf("Failed to provide key: %s", err) - } - return err - } - - err := backoff.Retry(op, backoff.WithContext(backoff.NewExponentialBackOff(), rp.ctx)) - if err != nil { - logR.Debugf("Providing failed after number of retries: %s", err) - return err - } - } - return nil -} - -// Trigger starts the reprovision process in rp.Run and waits for it to finish. -// -// Returns an error if a reprovide is already in progress. -func (rp *Reprovider) Trigger(ctx context.Context) error { - resultCh := make(chan error, 1) - select { - case rp.trigger <- resultCh: - default: - return fmt.Errorf("reprovider is already running") - } - - select { - case err := <-resultCh: - return err - case <-rp.ctx.Done(): - return ErrClosed - case <-ctx.Done(): - return ctx.Err() - } -} - -// Strategies - -// NewBlockstoreProvider returns key provider using bstore.AllKeysChan -func NewBlockstoreProvider(bstore blocks.Blockstore) KeyChanFunc { - return func(ctx context.Context) (<-chan cid.Cid, error) { - return bstore.AllKeysChan(ctx) - } -} - -// Pinner interface defines how the simple.Reprovider wants to interact -// with a Pinning service -type Pinner interface { - DirectKeys(ctx context.Context) ([]cid.Cid, error) - RecursiveKeys(ctx context.Context) ([]cid.Cid, error) -} - -// NewPinnedProvider returns provider supplying pinned keys -func NewPinnedProvider(onlyRoots bool, pinning Pinner, fetchConfig fetcher.Factory) KeyChanFunc { - return func(ctx context.Context) (<-chan cid.Cid, error) { - set, err := pinSet(ctx, pinning, fetchConfig, onlyRoots) - if err != nil { - return nil, err - } - - outCh := make(chan cid.Cid) - go func() { - defer close(outCh) - for c := range set.New { - select { - case <-ctx.Done(): - return - case outCh <- c: - } - } - - }() - - return outCh, nil - } -} - -func pinSet(ctx context.Context, pinning Pinner, fetchConfig fetcher.Factory, onlyRoots bool) (*cidutil.StreamingSet, error) { - set := cidutil.NewStreamingSet() - - go func() { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - defer close(set.New) - - dkeys, err := pinning.DirectKeys(ctx) - if err != nil { - logR.Errorf("reprovide direct pins: %s", err) - return - } - for _, key := range dkeys { - set.Visitor(ctx)(key) - } - - rkeys, err := pinning.RecursiveKeys(ctx) - if err != nil { - logR.Errorf("reprovide indirect pins: %s", err) - return - } - - session := fetchConfig.NewSession(ctx) - for _, key := range rkeys { - set.Visitor(ctx)(key) - if !onlyRoots { - err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: key}, func(res fetcher.FetchResult) error { - clink, ok := res.LastBlockLink.(cidlink.Link) - if ok { - set.Visitor(ctx)(clink.Cid) - } - return nil - }) - if err != nil { - logR.Errorf("reprovide indirect pins: %s", err) - return - } - } - } - }() - - return set, nil -} diff --git a/provider/simple/reprovide_test.go b/provider/simple/reprovide_test.go deleted file mode 100644 index 8b521ae56..000000000 --- a/provider/simple/reprovide_test.go +++ /dev/null @@ -1,289 +0,0 @@ -package simple_test - -import ( - "bytes" - "context" - "testing" - "time" - - bsrv "github.com/ipfs/boxo/blockservice" - blockstore "github.com/ipfs/boxo/blockstore" - offline "github.com/ipfs/boxo/exchange/offline" - bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice" - "github.com/ipfs/boxo/internal/test" - mock "github.com/ipfs/boxo/routing/mock" - blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" - ds "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" - "github.com/ipld/go-ipld-prime" - "github.com/ipld/go-ipld-prime/codec/dagcbor" - "github.com/ipld/go-ipld-prime/fluent/qp" - cidlink "github.com/ipld/go-ipld-prime/linking/cid" - basicnode "github.com/ipld/go-ipld-prime/node/basic" - testutil "github.com/libp2p/go-libp2p-testing/net" - "github.com/libp2p/go-libp2p/core/peer" - mh "github.com/multiformats/go-multihash" - - . "github.com/ipfs/boxo/provider/simple" -) - -func setupRouting(t *testing.T) (clA, clB mock.Client, idA, idB peer.ID) { - mrserv := mock.NewServer() - - iidA := testutil.RandIdentityOrFatal(t) - iidB := testutil.RandIdentityOrFatal(t) - - clA = mrserv.Client(iidA) - clB = mrserv.Client(iidB) - - return clA, clB, iidA.ID(), iidB.ID() -} - -func setupDag(t *testing.T) (nodes []cid.Cid, bstore blockstore.Blockstore) { - bstore = blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) - for _, data := range []string{"foo", "bar"} { - nb := basicnode.Prototype.Any.NewBuilder() - err := nb.AssignString(data) - if err != nil { - t.Fatal(err) - } - blk := toBlock(t, nb.Build()) - err = bstore.Put(context.Background(), blk) - if err != nil { - t.Fatal(err) - } - nodes = append(nodes, blk.Cid()) - nd, err := qp.BuildMap(basicnode.Prototype.Map, 1, func(ma ipld.MapAssembler) { - qp.MapEntry(ma, "child", qp.Link(cidlink.Link{Cid: blk.Cid()})) - }) - if err != nil { - t.Fatal(err) - } - blk = toBlock(t, nd) - err = bstore.Put(context.Background(), blk) - if err != nil { - t.Fatal(err) - } - nodes = append(nodes, blk.Cid()) - } - - return nodes, bstore -} - -func toBlock(t *testing.T, nd ipld.Node) blocks.Block { - buf := new(bytes.Buffer) - err := dagcbor.Encode(nd, buf) - if err != nil { - t.Fatal(err) - } - c, err := cid.Prefix{ - Version: 1, - Codec: cid.DagCBOR, - MhType: mh.SHA2_256, - MhLength: -1, - }.Sum(buf.Bytes()) - if err != nil { - t.Fatal(err) - } - blk, err := blocks.NewBlockWithCid(buf.Bytes(), c) - if err != nil { - t.Fatal(err) - } - return blk -} - -func TestReprovide(t *testing.T) { - test.Flaky(t) - testReprovide(t, func(r *Reprovider, ctx context.Context) error { - return r.Reprovide() - }) -} - -func TestTrigger(t *testing.T) { - test.Flaky(t) - testReprovide(t, func(r *Reprovider, ctx context.Context) error { - go r.Run() - time.Sleep(1 * time.Second) - defer r.Close() - err := r.Trigger(ctx) - return err - }) -} - -func testReprovide(t *testing.T, trigger func(r *Reprovider, ctx context.Context) error) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - clA, clB, idA, _ := setupRouting(t) - nodes, bstore := setupDag(t) - - keyProvider := NewBlockstoreProvider(bstore) - reprov := NewReprovider(ctx, time.Hour, clA, keyProvider) - reprov.Trigger(context.Background()) - err := trigger(reprov, ctx) - if err != nil { - t.Fatal(err) - } - - var providers []peer.AddrInfo - maxProvs := 100 - - for _, c := range nodes { - // We provide raw cids because of the multihash keying - // FIXME(@Jorropo): I think this change should be done in the DHT layer, probably an issue with our routing mock. - b := c.Bytes() - b[1] = 0x55 // rewrite the cid to raw - _, c, err := cid.CidFromBytes(b) - if err != nil { - t.Fatal(err) - } - provChan := clB.FindProvidersAsync(ctx, c, maxProvs) - for p := range provChan { - providers = append(providers, p) - } - - if len(providers) == 0 { - t.Fatal("Should have gotten a provider") - } - - if providers[0].ID != idA { - t.Fatal("Somehow got the wrong peer back as a provider.") - } - } -} - -func TestTriggerTwice(t *testing.T) { - test.Flaky(t) - // Ensure we can only trigger once at a time. - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - clA, _, _, _ := setupRouting(t) - - keyCh := make(chan cid.Cid) - startCh := make(chan struct{}) - keyFunc := func(ctx context.Context) (<-chan cid.Cid, error) { - <-startCh - return keyCh, nil - } - - reprov := NewReprovider(ctx, time.Hour, clA, keyFunc) - go reprov.Run() - defer reprov.Close() - - // Wait for the reprovider to start, otherwise, the reprovider will - // think a concurrent reprovide is running. - // - // We _could_ fix this race... but that would be complexity for nothing. - // 1. We start a reprovide 1 minute after startup anyways. - // 2. The window is really narrow. - time.Sleep(1 * time.Second) - - errCh := make(chan error, 2) - - // Trigger in the background - go func() { - errCh <- reprov.Trigger(ctx) - }() - - // Wait for the trigger to really start. - startCh <- struct{}{} - - start := time.Now() - // Try to trigger again, this should fail immediately. - if err := reprov.Trigger(ctx); err == nil { - t.Fatal("expected an error") - } - if time.Since(start) > 10*time.Millisecond { - t.Fatal("expected reprovide to fail instantly") - } - - // Let the trigger progress. - close(keyCh) - - // Check the result. - err := <-errCh - if err != nil { - t.Fatal(err) - } - - // Try to trigger again, this should work. - go func() { - errCh <- reprov.Trigger(ctx) - }() - startCh <- struct{}{} - err = <-errCh - if err != nil { - t.Fatal(err) - } -} - -type mockPinner struct { - recursive []cid.Cid - direct []cid.Cid -} - -func (mp *mockPinner) DirectKeys(ctx context.Context) ([]cid.Cid, error) { - return mp.direct, nil -} - -func (mp *mockPinner) RecursiveKeys(ctx context.Context) ([]cid.Cid, error) { - return mp.recursive, nil -} - -func TestReprovidePinned(t *testing.T) { - test.Flaky(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - nodes, bstore := setupDag(t) - - fetchConfig := bsfetcher.NewFetcherConfig(bsrv.New(bstore, offline.Exchange(bstore))) - - for i := 0; i < 2; i++ { - clA, clB, idA, _ := setupRouting(t) - - onlyRoots := i == 0 - t.Logf("only roots: %v", onlyRoots) - - var provide, dont []cid.Cid - if onlyRoots { - provide = []cid.Cid{nodes[1], nodes[3]} - dont = []cid.Cid{nodes[0], nodes[2]} - } else { - provide = []cid.Cid{nodes[0], nodes[1], nodes[3]} - dont = []cid.Cid{nodes[2]} - } - - keyProvider := NewPinnedProvider(onlyRoots, &mockPinner{ - recursive: []cid.Cid{nodes[1]}, - direct: []cid.Cid{nodes[3]}, - }, fetchConfig) - - reprov := NewReprovider(ctx, time.Hour, clA, keyProvider) - err := reprov.Reprovide() - if err != nil { - t.Fatal(err) - } - - for i, c := range provide { - prov, ok := <-clB.FindProvidersAsync(ctx, c, 1) - if !ok { - t.Errorf("Should have gotten a provider for %d", i) - continue - } - - if prov.ID != idA { - t.Errorf("Somehow got the wrong peer back as a provider.") - continue - } - } - for i, c := range dont { - prov, ok := <-clB.FindProvidersAsync(ctx, c, 1) - if ok { - t.Fatalf("found provider %s for %d, expected none", prov.ID, i) - } - } - } -} diff --git a/provider/system.go b/provider/system.go deleted file mode 100644 index 9fc3e8879..000000000 --- a/provider/system.go +++ /dev/null @@ -1,60 +0,0 @@ -package provider - -import ( - "context" - - "github.com/ipfs/go-cid" -) - -// System defines the interface for interacting with the value -// provider system -type System interface { - Run() - Close() error - Provide(cid.Cid) error - Reprovide(context.Context) error -} - -type system struct { - provider Provider - reprovider Reprovider -} - -// NewSystem constructs a new provider system from a provider and reprovider -func NewSystem(provider Provider, reprovider Reprovider) System { - return &system{provider, reprovider} -} - -// Run the provider system by running the provider and reprovider -func (s *system) Run() { - go s.provider.Run() - go s.reprovider.Run() -} - -// Close the provider and reprovider -func (s *system) Close() error { - var errs []error - - if err := s.provider.Close(); err != nil { - errs = append(errs, err) - } - - if err := s.reprovider.Close(); err != nil { - errs = append(errs, err) - } - - if len(errs) > 0 { - return errs[0] - } - return nil -} - -// Provide a value -func (s *system) Provide(cid cid.Cid) error { - return s.provider.Provide(cid) -} - -// Reprovide all the previously provided values -func (s *system) Reprovide(ctx context.Context) error { - return s.reprovider.Trigger(ctx) -}