Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

provider: refactor to only maintain one batched implementation and add throughput callback #273

Merged
merged 2 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.19
require (
github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a
github.com/benbjohnson/clock v1.3.0
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cespare/xxhash/v2 v2.2.0
github.com/crackcomm/go-gitignore v0.0.0-20170627025303-887ab5e44cc3
github.com/cskr/pubsub v1.0.2
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.2.0 h1:HN5dHm3WBOgndBH6E8V0q2jIYIR3s9yglV8k/+MN3u4=
github.com/cenkalti/backoff/v4 v4.2.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
Expand Down
30 changes: 0 additions & 30 deletions provider/README.md

This file was deleted.

119 changes: 0 additions & 119 deletions provider/batched/system_test.go

This file was deleted.

22 changes: 14 additions & 8 deletions provider/queue/queue.go → provider/internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package queue
import (
"context"
"fmt"

cid "github.com/ipfs/go-cid"
datastore "github.com/ipfs/go-datastore"
namespace "github.com/ipfs/go-datastore/namespace"
Expand All @@ -20,7 +21,6 @@ var log = logging.Logger("provider.queue")
type Queue struct {
// used to differentiate queues in datastore
// e.g. provider vs reprovider
name string
ctx context.Context
ds datastore.Datastore // Must be threadsafe
dequeue chan cid.Cid
Expand All @@ -32,11 +32,10 @@ type Queue struct {
}

// NewQueue creates a queue for cids
func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, error) {
namespaced := namespace.Wrap(ds, datastore.NewKey("/"+name+"/queue/"))
cancelCtx, cancel := context.WithCancel(ctx)
func NewQueue(ds datastore.Datastore) *Queue {
namespaced := namespace.Wrap(ds, datastore.NewKey("/queue"))
cancelCtx, cancel := context.WithCancel(context.Background())
q := &Queue{
name: name,
ctx: cancelCtx,
ds: namespaced,
dequeue: make(chan cid.Cid),
Expand All @@ -45,13 +44,16 @@ func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue,
closed: make(chan struct{}, 1),
}
q.work()
return q, nil
return q
}

// Close stops the queue
func (q *Queue) Close() error {
q.close()
<-q.closed
// We don't close dequeue because the provider which consume this get caught in
// an infinite loop dequeing cid.Undef if we do that.
// The provider has it's own select on top of dequeue and will handle this by itself.
return nil
}

Expand Down Expand Up @@ -79,8 +81,6 @@ func (q *Queue) work() {
defer func() {
// also cancels any in-progess enqueue tasks.
q.close()
// unblocks anyone waiting
close(q.dequeue)
Jorropo marked this conversation as resolved.
Show resolved Hide resolved
// unblocks the close call
close(q.closed)
}()
Expand Down Expand Up @@ -121,6 +121,12 @@ func (q *Queue) work() {
q.counter++
nextKey := datastore.NewKey(keyPath)

if c == cid.Undef {
// fast path, skip rereading the datastore if we don't have anything in hand yet
c = toQueue
k = nextKey
}

if err := q.ds.Put(q.ctx, nextKey, toQueue.Bytes()); err != nil {
log.Errorf("Failed to enqueue cid: %s", err)
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,8 @@ func TestBasicOperation(t *testing.T) {
defer ctx.Done()

ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue := NewQueue(ds)
defer queue.Close()

cids := makeCids(10)

Expand All @@ -63,10 +61,8 @@ func TestMangledData(t *testing.T) {
defer ctx.Done()

ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue := NewQueue(ds)
defer queue.Close()

cids := makeCids(10)
for _, c := range cids {
Expand All @@ -75,7 +71,7 @@ func TestMangledData(t *testing.T) {

// put bad data in the queue
queueKey := datastore.NewKey("/test/0")
err = queue.ds.Put(ctx, queueKey, []byte("borked"))
err := queue.ds.Put(ctx, queueKey, []byte("borked"))
if err != nil {
t.Fatal(err)
}
Expand All @@ -91,10 +87,8 @@ func TestInitialization(t *testing.T) {
defer ctx.Done()

ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue := NewQueue(ds)
defer queue.Close()

cids := makeCids(10)
for _, c := range cids {
Expand All @@ -104,10 +98,8 @@ func TestInitialization(t *testing.T) {
assertOrdered(cids[:5], queue, t)

// make a new queue, same data
queue, err = NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue = NewQueue(ds)
defer queue.Close()

assertOrdered(cids[5:], queue, t)
}
Expand All @@ -118,21 +110,18 @@ func TestInitializationWithManyCids(t *testing.T) {
defer ctx.Done()

ds := sync.MutexWrap(datastore.NewMapDatastore())
queue, err := NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue := NewQueue(ds)

cids := makeCids(25)
for _, c := range cids {
queue.Enqueue(c)
}

queue.Close()

// make a new queue, same data
queue, err = NewQueue(ctx, "test", ds)
if err != nil {
t.Fatal(err)
}
queue = NewQueue(ds)
defer queue.Close()

assertOrdered(cids, queue, t)
}
32 changes: 32 additions & 0 deletions provider/noop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package provider

import (
"context"

"github.com/ipfs/go-cid"
)

type noopProvider struct{}

var _ System = (*noopProvider)(nil)

// NewNoopProvider creates a ProviderSystem that does nothing.
func NewNoopProvider() System {
return &noopProvider{}
}

func (op *noopProvider) Close() error {
return nil
}

func (op *noopProvider) Provide(cid.Cid) error {
return nil
}

func (op *noopProvider) Reprovide(context.Context) error {
return nil
}

func (op *noopProvider) Stat() (ReproviderStats, error) {
return ReproviderStats{}, nil
}
29 changes: 0 additions & 29 deletions provider/offline.go

This file was deleted.

Loading