-
Notifications
You must be signed in to change notification settings - Fork 19
/
async_producer.go
101 lines (85 loc) · 2.39 KB
/
async_producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package mq
import (
"sync"
"github.com/NeowayLabs/wabbit"
)
// AsyncProducer describes available methods for producer.
// This kind of producer is asynchronous.
// All occurred errors will be accessible with MQ.Error().
type AsyncProducer interface {
// Produce sends message to broker. Returns immediately.
Produce(data []byte)
}
type asyncProducer struct {
sync.Mutex // Protect channel during posting and reconnect.
workerStatus
channel wabbit.Channel
errorChannel chan<- error
exchange string
options wabbit.Option
publishChannel chan []byte
routingKey string
shutdownChannel chan chan struct{}
}
func newAsyncProducer(channel wabbit.Channel, errorChannel chan<- error, config ProducerConfig) *asyncProducer {
return &asyncProducer{
channel: channel,
errorChannel: errorChannel,
exchange: config.Exchange,
options: wabbit.Option(config.Options),
publishChannel: make(chan []byte, config.BufferSize),
routingKey: config.RoutingKey,
shutdownChannel: make(chan chan struct{}),
}
}
func (producer *asyncProducer) init() {
go producer.worker()
}
func (producer *asyncProducer) worker() {
producer.markAsRunning()
for {
select {
case message := <-producer.publishChannel:
err := producer.produce(message)
if err != nil {
producer.errorChannel <- err
// TODO Resend message.
}
case done := <-producer.shutdownChannel:
// TODO It is necessary to guarantee the message delivery order.
producer.closeChannel()
close(done)
return
}
}
}
// Method safely sets new RMQ channel.
func (producer *asyncProducer) setChannel(channel wabbit.Channel) {
producer.Lock()
producer.channel = channel
producer.Unlock()
}
// Close producer's channel.
func (producer *asyncProducer) closeChannel() {
producer.Lock()
if err := producer.channel.Close(); err != nil {
producer.errorChannel <- err
}
producer.Unlock()
}
func (producer *asyncProducer) Produce(message []byte) {
producer.publishChannel <- message
}
func (producer *asyncProducer) produce(message []byte) error {
producer.Lock()
defer producer.Unlock()
return producer.channel.Publish(producer.exchange, producer.routingKey, message, producer.options)
}
// Stops the worker if it is running.
func (producer *asyncProducer) Stop() {
if producer.markAsStoppedIfCan() {
done := make(chan struct{})
producer.shutdownChannel <- done
<-done
}
}