-
Notifications
You must be signed in to change notification settings - Fork 2
/
sequentialSendQueue.go
89 lines (73 loc) · 1.73 KB
/
sequentialSendQueue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package sentry
import (
"sync"
"github.com/pkg/errors"
)
// NewSequentialSendQueue creates a new sequential send queue instance with
// a given buffer size which can be used as a replacement for the default
// send queue.
func NewSequentialSendQueue(buffer int) SendQueue {
b := make(chan QueuedEventInternal, buffer)
q := &sequentialSendQueue{
buffer: b,
shutdownCh: make(chan struct{}),
}
q.wait.Add(1)
go q.worker(b)
return q
}
type sequentialSendQueue struct {
buffer chan<- QueuedEventInternal
shutdown bool
shutdownCh chan struct{}
wait sync.WaitGroup
}
func (q *sequentialSendQueue) Enqueue(cfg Config, packet Packet) QueuedEvent {
e := NewQueuedEvent(cfg, packet)
ei := e.(QueuedEventInternal)
if q.shutdown {
err := errors.New("sequential send queue: shutdown")
ei.Complete(errors.Wrap(err, ErrSendQueueShutdown.Error()))
return e
}
select {
case q.buffer <- ei:
default:
if e, ok := e.(QueuedEventInternal); ok {
err := errors.New("sequential send queue: buffer full")
e.Complete(errors.Wrap(err, ErrSendQueueFull.Error()))
}
}
return e
}
func (q *sequentialSendQueue) Shutdown(wait bool) {
if q.shutdown {
return
}
q.shutdownCh <- struct{}{}
q.shutdown = true
if wait {
q.wait.Wait()
}
}
func (q *sequentialSendQueue) worker(buffer <-chan QueuedEventInternal) {
defer q.wait.Done()
for {
select {
case <-q.shutdownCh:
return
case e, ok := <-buffer:
if !ok {
return
}
cfg := e.Config()
t := cfg.Transport()
if t == nil {
e.Complete(errors.New("no transport configured"))
continue
}
err := t.Send(cfg.DSN(), e.Packet())
e.Complete(err)
}
}
}