Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix level db panic #6186

Merged
merged 1 commit into from
Apr 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,10 @@ func (n *IpfsNode) teardown() error {
// needs to use another during its shutdown/cleanup process, it should be
// closed before that other object

if n.Provider != nil {
closers = append(closers, n.Provider)
}

if n.FilesRoot != nil {
closers = append(closers, n.FilesRoot)
}
Expand Down
4 changes: 4 additions & 0 deletions provider/offline.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@ func (op *offlineProvider) Run() {}
func (op *offlineProvider) Provide(cid cid.Cid) error {
return nil
}

func (op *offlineProvider) Close() error {
return nil
}
8 changes: 8 additions & 0 deletions provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type Provider interface {
Run()
// Provide takes a cid and makes an attempt to announce it to the network
Provide(cid.Cid) error
// Close stops the provider
Close() error
}

type provider struct {
Expand All @@ -39,6 +41,12 @@ func NewProvider(ctx context.Context, queue *Queue, contentRouting routing.Conte
}
}

// Close stops the provider
func (p *provider) Close() error {
p.queue.Close()
return nil
}

// Start workers to handle provide requests.
func (p *provider) Run() {
p.handleAnnouncements()
Expand Down
18 changes: 17 additions & 1 deletion provider/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type Queue struct {
ds datastore.Datastore // Must be threadsafe
dequeue chan cid.Cid
enqueue chan cid.Cid
close context.CancelFunc
closed chan struct{}
}

// NewQueue creates a queue for cids
Expand All @@ -36,19 +38,29 @@ func NewQueue(ctx context.Context, name string, ds datastore.Datastore) (*Queue,
if err != nil {
return nil, err
}
cancelCtx, cancel := context.WithCancel(ctx)
q := &Queue{
name: name,
ctx: ctx,
ctx: cancelCtx,
head: head,
tail: tail,
ds: namespaced,
dequeue: make(chan cid.Cid),
enqueue: make(chan cid.Cid),
close: cancel,
closed: make(chan struct{}, 1),
}
q.work()
return q, nil
}

// Close stops the queue
func (q *Queue) Close() error {
q.close()
michaelavila marked this conversation as resolved.
Show resolved Hide resolved
<-q.closed
return nil
}

// Enqueue puts a cid in the queue
func (q *Queue) Enqueue(cid cid.Cid) {
select {
Expand Down Expand Up @@ -103,6 +115,10 @@ func (q *Queue) work() {
var k datastore.Key = datastore.Key{}
var c cid.Cid = cid.Undef

defer func() {
close(q.closed)
}()

for {
if c == cid.Undef {
k, c = q.nextEntry()
Expand Down