-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsubscriber.go
99 lines (82 loc) · 2.56 KB
/
subscriber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package vnats
import (
"errors"
"fmt"
"log/slog"
"github.com/nats-io/nats.go"
)
// NewSubscriber creates a new Subscriber that subscribes to a NATS stream.
func (c *Connection) NewSubscriber(args SubscriberArgs) (*Subscriber, error) {
subscription, err := c.nats.Subscribe(args.Subject, args.ConsumerName, args.Mode)
if err != nil {
return nil, fmt.Errorf("subscriber could not be created: %w", err)
}
sub := &Subscriber{
conn: c,
subscription: subscription,
logger: c.logger,
consumerName: args.ConsumerName,
quitSignal: make(chan bool),
}
c.subscribers = append(c.subscribers, sub)
return sub, nil
}
// MsgHandler is the type of function the Subscriber has to implement to process an incoming message.
type MsgHandler func(msg Msg) error
// Subscriber subscribes to a NATS consumer and pulls messages to handle by MsgHandler.
type Subscriber struct {
conn *Connection
subscription *nats.Subscription
logger *slog.Logger
consumerName string
handler MsgHandler
quitSignal chan bool
}
// Start subscribes to the NATS consumer and starts a go-routine that handles pulled messages.
func (s *Subscriber) Start(handler MsgHandler) (err error) {
if s.handler != nil {
return fmt.Errorf("handler is already set, don't call Start() multiple times")
}
s.handler = handler
go func() {
for {
select {
case <-s.quitSignal:
s.logger.Info("Received signal to quit subscription go-routine.")
return
default:
s.processMessages()
}
}
}()
return nil
}
// Stop unsubscribes the consumer from the NATS stream.
func (s *Subscriber) Stop() error {
if err := s.subscription.Unsubscribe(); err != nil {
return err
}
s.handler = nil
s.logger.Info("Unsubscribed consumer", slog.String("name", s.consumerName))
return nil
}
func (s *Subscriber) processMessages() {
natsMsgs, err := s.subscription.Fetch(1) // Fetch only one msg at once to keep the order
if errors.Is(err, nats.ErrTimeout) { // ErrTimeout is expected/ no new messages, so we don't log it
return
} else if err != nil {
s.logger.Error("Failed to receive msg", slog.String("error", err.Error()))
return
}
msg := makeMsg(natsMsgs[0])
if err = s.handler(msg); err != nil {
s.logger.Error("Message handle error, will be NAKed", slog.String("error", err.Error()))
if err := natsMsgs[0].NakWithDelay(defaultNakDelay); err != nil {
s.logger.Error("natsMsg.Nak() failed", slog.String("error", err.Error()))
}
return
}
if err = natsMsgs[0].Ack(); err != nil {
s.logger.Error("natsMsg.Ack() failed:", slog.String("error", err.Error()))
}
}