Skip to content

Commit

Permalink
Close provider on ipfs shutdown
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Michael Avila <davidmichaelavila@gmail.com>
  • Loading branch information
michaelavila committed Apr 9, 2019
1 parent d0582b2 commit 7fd4f4e
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 1 deletion.
4 changes: 4 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,10 @@ func (n *IpfsNode) teardown() error {
// needs to use another during its shutdown/cleanup process, it should be
// closed before that other object

if n.Provider != nil {
closers = append(closers, n.Provider)
}

if n.FilesRoot != nil {
closers = append(closers, n.FilesRoot)
}
Expand Down
4 changes: 4 additions & 0 deletions provider/offline.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@ func (op *offlineProvider) Run() {}
func (op *offlineProvider) Provide(cid cid.Cid) error {
return nil
}

func (op *offlineProvider) Close() error {
return nil
}
8 changes: 8 additions & 0 deletions provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type Provider interface {
Run()
// Provide takes a cid and makes an attempt to announce it to the network
Provide(cid.Cid) error
// Close stops the provider
Close() error
}

type provider struct {
Expand All @@ -39,6 +41,12 @@ func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.Conte
}
}

// Close stops the provider
func (p *provider) Close() error {
p.queue.Close()
return nil
}

// Start workers to handle provide requests.
func (p *provider) Run() {
p.handleAnnouncements()
Expand Down
18 changes: 17 additions & 1 deletion provider/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type Queue struct {
ds datastore.Datastore // Must be threadsafe
dequeue chan cid.Cid
enqueue chan cid.Cid
close context.CancelFunc
closed chan struct{}
}

// NewQueue creates a queue for cids
Expand All @@ -36,19 +38,29 @@ func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue,
if err != nil {
return nil, err
}
cancelCtx, cancel := context.WithCancel(ctx)
q := &Queue{
name: name,
ctx: ctx,
ctx: cancelCtx,
head: head,
tail: tail,
ds: namespaced,
dequeue: make(chan cid.Cid),
enqueue: make(chan cid.Cid),
close: cancel,
closed: make(chan struct{}, 1),
}
q.work()
return q, nil
}

// Close stops the queue
func (q *Queue) Close() error {
q.close()
<-q.closed
return nil
}

// Enqueue puts a cid in the queue
func (q *Queue) Enqueue(cid cid.Cid) {
select {
Expand Down Expand Up @@ -102,6 +114,10 @@ func (q *Queue) work() {
var k datastore.Key = datastore.Key{}
var c cid.Cid = cid.Undef

defer func() {
close(q.closed)
}()

for {
if c == cid.Undef {
k, c = q.nextEntry()
Expand Down

0 comments on commit 7fd4f4e

Please sign in to comment.