Skip to content

Commit

Permalink
Cleanup AMQP subscriber for use in Godin
Browse files Browse the repository at this point in the history
  • Loading branch information
lukasjarosch committed Jun 27, 2019
1 parent 00bc11a commit 4dd1ccb
Showing 1 changed file with 68 additions and 18 deletions.
86 changes: 68 additions & 18 deletions pkg/amqp/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,30 @@ package amqp

import (
"github.com/streadway/amqp"
"strings"
"fmt"
"github.com/google/uuid"
)

type SubscriberHandler func(delivery amqp.Delivery)

// Subscription defines all data required to setup an AMQP subscription
// All values, except the ctag are provided by the configuration or inferred by Godin.
type Subscription struct {
Topic string `json:"topic"`
Exchange string `json:"exchange"`
ExchangeType string `json:"exchange_type"`
Queue string `json:"queue"`
Topic string `json:"topic"`
Exchange string `json:"exchange"`
Queue SubscriptionQueue `json:"queue"`
AutoAck bool `json:"auto_ack"`
ctag string `json:"-"` // generated
}

// SubscriptionQueue configures the queue on which the subscription runs.
type SubscriptionQueue struct {
Name string
Durable bool
AutoDelete bool
Exclusive bool
NoWait bool
}

type handler struct {
Expand All @@ -20,45 +35,76 @@ type handler struct {
routingKey string
}

// Subscriber handles AMQP subscriptions.
type Subscriber struct {
channel *amqp.Channel
exchange string
handlers []handler
channel *amqp.Channel
subscription *Subscription
handlers []handler
}

// NewSubscriber returns an AMQP publisher
func NewSubscriber(channel *amqp.Channel, exchange string) Subscriber {
func NewSubscriber(channel *amqp.Channel, subscription *Subscription) Subscriber {

// generate a unique ctag for the subscriber
sub := strings.Replace(subscription.Topic, ".", "_", -1)
ctag := fmt.Sprintf("%s_%s", sub, uuid.New().String())
subscription.ctag = ctag

return Subscriber{
channel: channel,
exchange: exchange,
channel: channel,
subscription: subscription,
}
}

func (c *Subscriber) Subscribe(queueName, routingKey string, ctag string, handler SubscriberHandler) error {
queue, err := c.channel.QueueDeclare(queueName, true, false, false, false, nil)
// Subscribe will declare the queue defined in the Subscription, bind it to the exchange and start consuming
// by calling the handler in a goroutine.
func (c *Subscriber) Subscribe(handler SubscriberHandler) error {
queue, err := c.channel.QueueDeclare(
c.subscription.Queue.Name,
c.subscription.Queue.Durable,
c.subscription.Queue.AutoDelete,
c.subscription.Queue.Exclusive,
c.subscription.Queue.NoWait,
nil,
)
if err != nil {
return err
}

if err = c.channel.QueueBind(queue.Name, routingKey, c.exchange, false, nil); err != nil {
if err = c.channel.QueueBind(
queue.Name,
c.subscription.Topic,
c.subscription.Exchange,
c.subscription.Queue.NoWait,
nil,
); err != nil {
return err
}

deliveries, err := c.channel.Consume(queue.Name, ctag, false, false, false, false, nil)
deliveries, err := c.channel.Consume(
queue.Name,
"",
c.subscription.AutoAck,
c.subscription.Queue.Exclusive,
false,
c.subscription.Queue.NoWait,
nil,
)
if err != nil {
return err
}

h := c.addHandler(routingKey, ctag, handler)
h := c.setHandler(handler)
go c.Handler(deliveries, h)

return nil
}

func (c *Subscriber) addHandler(routingKey string, ctag string, handlerImpl SubscriberHandler) handler {
// setHandler installs a SubscriberHandler to use for this subscription.
func (c *Subscriber) setHandler(handlerImpl SubscriberHandler) handler {
h := handler{
routingKey: routingKey,
ctag: ctag,
routingKey: c.subscription.Topic,
ctag: c.subscription.ctag,
implementation: handlerImpl,
done: make(chan error),
}
Expand All @@ -67,13 +113,17 @@ func (c *Subscriber) addHandler(routingKey string, ctag string, handlerImpl Subs
return h
}

// Handler is started by Subscribe() as Goroutine. For each received AMQP delivery,
// it will call the implementation(delivery) to allow business logic for each delivery to run.
func (c *Subscriber) Handler(deliveries <-chan amqp.Delivery, h handler) {
for d := range deliveries {
h.implementation(d)
}
h.done <- nil
}

// Shutdown will cancel the subscriber by it's ctag. It needs to be registered
// to a shutdown handler.
func (c *Subscriber) Shutdown() error {
for _, h := range c.handlers {
if err := c.channel.Cancel(h.ctag, true); err != nil {
Expand Down

0 comments on commit 4dd1ccb

Please sign in to comment.