Skip to content

Commit

Permalink
provider: refactor to only maintain one batched implementation and ad…
Browse files Browse the repository at this point in the history
…d throughput callback

This code does a few things:
1. Remove the simple provider to avoid duplicating features.
2. Add the support for single providing on the batched provider.
3. Fix a bugs in the batched provider.
4. Add support for a throughputCallback in the batched provider.
6. Add support for an offline mode of the batched provider (stuff is exclusively pushed onto the queue).
5. Move the batched provider to be the only provider and make the queue implementation private.
  • Loading branch information
Jorropo committed May 17, 2023
1 parent ff20a4d commit 0827951
Show file tree
Hide file tree
Showing 16 changed files with 555 additions and 1,172 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.19
require (
github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a
github.com/benbjohnson/clock v1.3.0
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cespare/xxhash/v2 v2.2.0
github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3
github.com/cskr/pubsub v1.0.2
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.2.0 h1:HN5dHm3WBOgndBH6E8V0q2jIYIR3s9yglV8k/+MN3u4=
github.com/cenkalti/backoff/v4 v4.2.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
Expand Down
30 changes: 0 additions & 30 deletions provider/README.md

This file was deleted.

119 changes: 0 additions & 119 deletions provider/batched/system_test.go

This file was deleted.

11 changes: 5 additions & 6 deletions provider/queue/queue.go → provider/internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package queue
import (
"context"
"fmt"

cid "github.com/ipfs/go-cid"
datastore "github.com/ipfs/go-datastore"
namespace "github.com/ipfs/go-datastore/namespace"
Expand All @@ -20,7 +21,6 @@ var log = logging.Logger("provider.queue")
type Queue struct {
// used to differentiate queues in datastore
// e.g. provider vs reprovider
name string
ctx context.Context
ds datastore.Datastore // Must be threadsafe
dequeue chan cid.Cid
Expand All @@ -32,11 +32,10 @@ type Queue struct {
}

// NewQueue creates a queue for cids
func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, error) {
namespaced := namespace.Wrap(ds, datastore.NewKey("/"+name+"/queue/"))
cancelCtx, cancel := context.WithCancel(ctx)
func NewQueue(ds datastore.Datastore) *Queue {
namespaced := namespace.Wrap(ds, datastore.NewKey("/queue"))
cancelCtx, cancel := context.WithCancel(context.Background())
q := &Queue{
name: name,
ctx: cancelCtx,
ds: namespaced,
dequeue: make(chan cid.Cid),
Expand All @@ -45,7 +44,7 @@ func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue,
closed: make(chan struct{}, 1),
}
q.work()
return q, nil
return q
}

// Close stops the queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,8 @@ func TestBasicOperation(t *testing.T) {
defer ctx.Done()

ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue := NewQueue(ds)
defer queue.Close()

cids := makeCids(10)

Expand All @@ -63,10 +61,8 @@ func TestMangledData(t *testing.T) {
defer ctx.Done()

ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue := NewQueue(ds)
defer queue.Close()

cids := makeCids(10)
for _, c := range cids {
Expand All @@ -75,7 +71,7 @@ func TestMangledData(t *testing.T) {

// put bad data in the queue
queueKey := datastore.NewKey("/test/0")
err = queue.ds.Put(ctx, queueKey, []byte("borked"))
err := queue.ds.Put(ctx, queueKey, []byte("borked"))
if err != nil {
t.Fatal(err)
}
Expand All @@ -91,10 +87,8 @@ func TestInitialization(t *testing.T) {
defer ctx.Done()

ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue := NewQueue(ds)
defer queue.Close()

cids := makeCids(10)
for _, c := range cids {
Expand All @@ -104,10 +98,8 @@ func TestInitialization(t *testing.T) {
assertOrdered(cids[:5], queue, t)

// make a new queue, same data
queue, err = NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue = NewQueue(ds)
defer queue.Close()

assertOrdered(cids[5:], queue, t)
}
Expand All @@ -118,21 +110,18 @@ func TestInitializationWithManyCids(t *testing.T) {
defer ctx.Done()

ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue := NewQueue(ds)

cids := makeCids(25)
for _, c := range cids {
queue.Enqueue(c)
}

queue.Close()

// make a new queue, same data
queue, err = NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue = NewQueue(ds)
defer queue.Close()

assertOrdered(cids, queue, t)
}
35 changes: 35 additions & 0 deletions provider/noop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package provider

import (
"context"

"github.com/ipfs/go-cid"
)

type noopProvider struct{}

var _ System = (*noopProvider)(nil)

// NewNoopProvider creates a ProviderSystem that does nothing.
func NewNoopProvider() System {
return &noopProvider{}
}

func (op *noopProvider) Run() {
}

func (op *noopProvider) Close() error {
return nil
}

func (op *noopProvider) Provide(cid.Cid) error {
return nil
}

func (op *noopProvider) Reprovide(context.Context) error {
return nil
}

func (op *noopProvider) Stat() (ReproviderStats, error) {
return ReproviderStats{}, nil
}
29 changes: 0 additions & 29 deletions provider/offline.go

This file was deleted.

Loading

0 comments on commit 0827951

Please sign in to comment.