generated from ipfs/ipfs-repository-template
-
Notifications
You must be signed in to change notification settings - Fork 95
/
queue.go
165 lines (144 loc) · 3.95 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package queue
import (
"context"
"fmt"
cid "github.com/ipfs/go-cid"
datastore "github.com/ipfs/go-datastore"
namespace "github.com/ipfs/go-datastore/namespace"
query "github.com/ipfs/go-datastore/query"
logging "github.com/ipfs/go-log"
)
var log = logging.Logger("provider.queue")
// Queue provides a best-effort durability, FIFO interface to the datastore for storing cids
//
// Best-effort durability just means that cids in the process of being provided when a
// crash or shutdown occurs may be in the queue when the node is brought back online
// depending on whether the underlying datastore has synchronous or asynchronous writes.
type Queue struct {
// used to differentiate queues in datastore
// e.g. provider vs reprovider
ctx context.Context
ds datastore.Datastore // Must be threadsafe
dequeue chan cid.Cid
enqueue chan cid.Cid
close context.CancelFunc
closed chan struct{}
counter uint64
}
// NewQueue creates a queue for cids
func NewQueue(ds datastore.Datastore) *Queue {
namespaced := namespace.Wrap(ds, datastore.NewKey("/queue"))
cancelCtx, cancel := context.WithCancel(context.Background())
q := &Queue{
ctx: cancelCtx,
ds: namespaced,
dequeue: make(chan cid.Cid),
enqueue: make(chan cid.Cid),
close: cancel,
closed: make(chan struct{}, 1),
}
q.work()
return q
}
// Close stops the queue
func (q *Queue) Close() error {
q.close()
<-q.closed
return nil
}
// Enqueue puts a cid in the queue
func (q *Queue) Enqueue(cid cid.Cid) error {
select {
case <-q.ctx.Done():
return fmt.Errorf("failed to enqueue CID: shutting down")
default:
}
select {
case q.enqueue <- cid:
return nil
case <-q.ctx.Done():
return fmt.Errorf("failed to enqueue CID: shutting down")
}
}
// Dequeue returns a channel that if listened to will remove entries from the queue
func (q *Queue) Dequeue() <-chan cid.Cid {
return q.dequeue
}
// Run dequeues and enqueues when available.
func (q *Queue) work() {
go func() {
var k datastore.Key = datastore.Key{}
var c cid.Cid = cid.Undef
defer func() {
// also cancels any in-progess enqueue tasks.
q.close()
// unblocks the close call
close(q.closed)
}()
for {
if c == cid.Undef {
head, err := q.getQueueHead()
if err != nil {
log.Errorf("error querying for head of queue: %s, stopping provider", err)
return
} else if head != nil {
k = datastore.NewKey(head.Key)
c, err = cid.Parse(head.Value)
if err != nil {
log.Warnf("error parsing queue entry cid with key (%s), removing it from queue: %s", head.Key, err)
err = q.ds.Delete(q.ctx, k)
if err != nil {
log.Errorf("error deleting queue entry with key (%s), due to error (%s), stopping provider", head.Key, err)
return
}
continue
}
} else {
c = cid.Undef
}
}
// If c != cid.Undef set dequeue and attempt write, otherwise wait for enqueue
var dequeue chan cid.Cid
if c != cid.Undef {
dequeue = q.dequeue
}
select {
case toQueue := <-q.enqueue:
keyPath := fmt.Sprintf("%020d/%s", q.counter, c.String())
q.counter++
nextKey := datastore.NewKey(keyPath)
if c == cid.Undef {
// fast path, skip rereading the datastore if we don't have anything in hand yet
c = toQueue
k = nextKey
}
if err := q.ds.Put(q.ctx, nextKey, toQueue.Bytes()); err != nil {
log.Errorf("Failed to enqueue cid: %s", err)
continue
}
case dequeue <- c:
err := q.ds.Delete(q.ctx, k)
if err != nil {
log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err)
continue
}
c = cid.Undef
case <-q.ctx.Done():
return
}
}
}()
}
func (q *Queue) getQueueHead() (*query.Entry, error) {
qry := query.Query{Orders: []query.Order{query.OrderByKey{}}, Limit: 1}
results, err := q.ds.Query(q.ctx, qry)
if err != nil {
return nil, err
}
defer results.Close()
r, ok := results.NextSync()
if !ok {
return nil, nil
}
return &r.Entry, r.Error
}