diff --git a/core/core.go b/core/core.go index 1a87c0123ca..99379b85c01 100644 --- a/core/core.go +++ b/core/core.go @@ -675,6 +675,10 @@ func (n *IpfsNode) teardown() error { // needs to use another during its shutdown/cleanup process, it should be // closed before that other object + if n.Provider != nil { + closers = append(closers, n.Provider) + } + if n.FilesRoot != nil { closers = append(closers, n.FilesRoot) } diff --git a/provider/offline.go b/provider/offline.go index 029ddfa9889..0c91ed2af77 100644 --- a/provider/offline.go +++ b/provider/offline.go @@ -14,3 +14,7 @@ func (op *offlineProvider) Run() {} func (op *offlineProvider) Provide(cid cid.Cid) error { return nil } + +func (op *offlineProvider) Close() error { + return nil +} diff --git a/provider/provider.go b/provider/provider.go index f9aa4ed7820..67c5c6b6b92 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -20,6 +20,8 @@ type Provider interface { Run() // Provide takes a cid and makes an attempt to announce it to the network Provide(cid.Cid) error + // Close stops the provider + Close() error } type provider struct { @@ -39,6 +41,12 @@ func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.Conte } } +// Close stops the provider +func (p *provider) Close() error { + p.queue.Close() + return nil +} + // Start workers to handle provide requests. func (p *provider) Run() { p.handleAnnouncements() diff --git a/provider/queue.go b/provider/queue.go index b1d899cbf6e..8fdfca81521 100644 --- a/provider/queue.go +++ b/provider/queue.go @@ -27,6 +27,8 @@ type Queue struct { ds datastore.Datastore // Must be threadsafe dequeue chan cid.Cid enqueue chan cid.Cid + close context.CancelFunc + closed chan struct{} } // NewQueue creates a queue for cids @@ -36,19 +38,29 @@ func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue, if err != nil { return nil, err } + cancelCtx, cancel := context.WithCancel(ctx) q := &Queue{ name: name, - ctx: ctx, + ctx: cancelCtx, head: head, tail: tail, ds: namespaced, dequeue: make(chan cid.Cid), enqueue: make(chan cid.Cid), + close: cancel, + closed: make(chan struct{}, 1), } q.work() return q, nil } +// Close stops the queue +func (q *Queue) Close() error { + q.close() + <-q.closed + return nil +} + // Enqueue puts a cid in the queue func (q *Queue) Enqueue(cid cid.Cid) { select { @@ -103,6 +115,10 @@ func (q *Queue) work() { var k datastore.Key = datastore.Key{} var c cid.Cid = cid.Undef + defer func() { + close(q.closed) + }() + for { if c == cid.Undef { k, c = q.nextEntry()