Skip to content

Commit

Permalink
Merge pull request #2 from Clarilab/Task/11810
Browse files Browse the repository at this point in the history
feat: add multiple loggers at once
  • Loading branch information
nicoandrewss authored Sep 4, 2023
2 parents 8d90cde + eafbdac commit f1f5bdb
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 137 deletions.
10 changes: 10 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version: 2
updates:
- package-ecosystem: gomod
directory: "/"
schedule:
interval: weekly
- package-ecosystem: github-actions
directory: "/"
schedule:
interval: weekly
2 changes: 0 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ lint:
vuln:
govulncheck ./...

test_all: test test_integration

test:
go test -skip "(Test_Integration|Test_Reconnection)" -vet=off -failfast -race -coverprofile=coverage.out

Expand Down
12 changes: 6 additions & 6 deletions binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ type (
}
)

func (b *Binding) queueNameOr(defaultName string) string {
func (b *Binding) defaultQueueNameOr(name string) string {
if b.QueueName != "" {
return b.QueueName
}

return defaultName
return name
}

func (b *Binding) exchangeNameOr(defaultName string) string {
func (b *Binding) defaultExchangeNameOr(name string) string {
if b.ExchangeName != "" {
return b.ExchangeName
}

return defaultName
return name
}

func defaultBindingOptions() *BindingOptions {
Expand All @@ -59,9 +59,9 @@ func declareBindings(channel *amqp.Channel, queueName, exchangeName string, bind
}

err := channel.QueueBind(
binding.queueNameOr(queueName),
binding.defaultQueueNameOr(queueName),
binding.RoutingKey,
binding.exchangeNameOr(exchangeName),
binding.defaultExchangeNameOr(exchangeName),
binding.NoWait,
amqp.Table(binding.Args),
)
Expand Down
2 changes: 1 addition & 1 deletion clarimq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ func Test_Integration_ManualRemoveExchangeQueueAndBindings(t *testing.T) {
clarimq.WithExchangeOptionDeclare(true),
clarimq.WithExchangeOptionKind(amqp.ExchangeTopic),
clarimq.WithExchangeOptionName(params.exchangeName),
clarimq.WithConsumerOptionBinding(clarimq.Binding{
clarimq.WithBindingOptionCustomBinding(clarimq.Binding{
RoutingKey: params.routingKey,
BindingOptions: &clarimq.BindingOptions{
Args: clarimq.Table{},
Expand Down
142 changes: 69 additions & 73 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package clarimq

import (
"fmt"
"log/slog"
"net"
"net/url"
"strconv"
Expand All @@ -25,14 +24,12 @@ type Connection struct {

connectionCloseWG *sync.WaitGroup

startRecoveryChan chan struct{}
recoveryFailedChan chan error

recoveryFailedChan chan error
consumerRecoveryChan chan error

runningConsumers int

logger *log
logger *logger

returnHandler ReturnHandler
}
Expand All @@ -51,10 +48,9 @@ func NewConnection(uri string, options ...ConnectionOption) (*Connection, error)

conn := &Connection{
connectionCloseWG: &sync.WaitGroup{},
startRecoveryChan: make(chan struct{}),
recoveryFailedChan: make(chan error, reconnectFailChanSize),
consumerRecoveryChan: make(chan error),
logger: newLogger(opt.logger),
logger: newLogger(opt.loggers),
returnHandler: opt.ReturnHandler,
options: opt,
}
Expand Down Expand Up @@ -96,7 +92,6 @@ func (c *Connection) Close() error {

c.connectionCloseWG.Wait()

close(c.startRecoveryChan)
close(c.recoveryFailedChan)
close(c.consumerRecoveryChan)

Expand All @@ -116,8 +111,7 @@ func (c *Connection) NotifyAutoRecoveryFail() <-chan error {
func (c *Connection) Reconnect() error {
const errMessage = "failed to reconnect to rabbitmq: %w"

err := c.startRecovery()
if err != nil {
if err := c.recoverConnection(); err != nil {
return fmt.Errorf(errMessage, err)
}

Expand All @@ -135,12 +129,12 @@ func (c *Connection) Reconnect() error {
func (c *Connection) RemoveQueue(name string, ifUnused bool, ifEmpty bool, noWait bool) (int, error) {
const errMessage = "failed to remove queue: %w"

removedMessages, err := c.amqpChannel.QueueDelete(name, ifUnused, ifEmpty, noWait)
purgedMessages, err := c.amqpChannel.QueueDelete(name, ifUnused, ifEmpty, noWait)
if err != nil {
return 0, fmt.Errorf(errMessage, err)
}

return removedMessages, nil
return purgedMessages, nil
}

// RemoveBinding removes a binding between an exchange and queue matching the key and arguments.
Expand All @@ -149,8 +143,7 @@ func (c *Connection) RemoveQueue(name string, ifUnused bool, ifEmpty bool, noWai
func (c *Connection) RemoveBinding(queueName string, routingKey string, exchangeName string, args Table) error {
const errMessage = "failed to remove binding: %w"

err := c.amqpChannel.QueueUnbind(queueName, routingKey, exchangeName, amqp.Table(args))
if err != nil {
if err := c.amqpChannel.QueueUnbind(queueName, routingKey, exchangeName, amqp.Table(args)); err != nil {
return fmt.Errorf(errMessage, err)
}

Expand All @@ -169,8 +162,7 @@ func (c *Connection) RemoveBinding(queueName string, routingKey string, exchange
func (c *Connection) RemoveExchange(name string, ifUnused bool, noWait bool) error {
const errMessage = "failed to remove exchange: %w"

err := c.amqpChannel.ExchangeDelete(name, ifUnused, noWait)
if err != nil {
if err := c.amqpChannel.ExchangeDelete(name, ifUnused, noWait); err != nil {
return fmt.Errorf(errMessage, err)
}

Expand All @@ -192,17 +184,13 @@ func (c *Connection) connect() error {
const errMessage = "failed to connect to rabbitmq: %w"

if c.amqpConnection == nil {
err := c.createConnection()
if err != nil {
if err := c.createConnection(); err != nil {
return fmt.Errorf(errMessage, err)
}

err = c.createChannel()
if err != nil {
if err := c.createChannel(); err != nil {
return fmt.Errorf(errMessage, err)
}

c.watchRecoveryChan()
}

return nil
Expand Down Expand Up @@ -253,17 +241,7 @@ func (c *Connection) watchConnectionNotifications() {
for {
select {
case err := <-closeChan:
if err == nil {
slog.Debug("closed connection")

c.connectionCloseWG.Done()

return
}

c.logger.logDebug("connection unexpectedly closed", "cause", err)

c.startRecoveryChan <- struct{}{}
c.handleClosedConnection(err)

return

Expand All @@ -283,15 +261,7 @@ func (c *Connection) watchChannelNotifications() {
for {
select {
case err := <-closeChan:
if err == nil {
slog.Debug("closed channel")

c.connectionCloseWG.Done()

return
}

c.logger.logDebug("channel unexpectedly closed", "cause", err)
c.handleClosedChannel(err)

return

Expand Down Expand Up @@ -319,52 +289,80 @@ func (c *Connection) watchChannelNotifications() {
}()
}

func (c *Connection) watchRecoveryChan() {
go func() {
for range c.startRecoveryChan {
err := c.startRecovery()
if err != nil {
c.recoveryFailedChan <- err
func (c *Connection) handleClosedConnection(err *amqp.Error) {
if err == nil {
c.logger.logDebug("connection gracefully closed")

return
}
c.connectionCloseWG.Done()

return
}

if err := c.recoverConnection(); err != nil {
c.recoveryFailedChan <- err
}
}

func (c *Connection) handleClosedChannel(err *amqp.Error) {
if err == nil {
c.logger.logDebug("channel gracefully closed")

c.connectionCloseWG.Done()

return
}

if err.Code == amqp.NotFound {
c.logger.logDebug("channel unexpectedly closed", "cause", err)

if err := c.recoverChannel(); err != nil {
c.recoveryFailedChan <- err
}
}()
}
}

func (c *Connection) startRecovery() error {
const errMessage = "recovery failed: %w"
func (c *Connection) recoverConnection() error {
const errMessage = "failed to recover connection: %w"

c.amqpConnection = nil
c.amqpChannel = nil

err := c.backOff(
if err := c.backOff(
func() error {
err := c.createConnection()
if err != nil {
return err
}
return c.createConnection()
},
); err != nil {
return fmt.Errorf(errMessage, err)
}

err = c.createChannel()
if err != nil {
return err
}
if err := c.recoverChannel(); err != nil {
return fmt.Errorf(errMessage, err)
}

return nil
c.logger.logInfo("successfully recovered connection")

return nil
}

func (c *Connection) recoverChannel() error {
const errMessage = "failed to recover channel: %w"

c.amqpChannel = nil

if err := c.backOff(
func() error {
return c.createChannel()
},
)
if err != nil {
); err != nil {
return fmt.Errorf(errMessage, err)
}

if c.runningConsumers > 0 {
err = c.recoverConsumer()
if err != nil {
if err := c.recoverConsumer(); err != nil {
return fmt.Errorf(errMessage, err)
}
}

c.logger.logInfo("successfully recovered connection")
c.logger.logInfo("successfully recovered channel")

return nil
}
Expand All @@ -391,20 +389,18 @@ func (c *Connection) backOff(action func() error) error {

for retry <= c.options.MaxReconnectRetries {
if action() == nil {
c.logger.logDebug("successfully reestablished amqp-connection", "retries", retry)

break
}

if retry == c.options.MaxReconnectRetries {
c.logger.logDebug("reconnection failed: maximum retries exceeded", "retries", retry)
c.logger.logDebug("recovery failed: maximum retries exceeded", "retries", retry)

return fmt.Errorf(errMessage, ErrMaxRetriesExceeded)
}

delay := time.Duration(c.options.BackOffFactor*retry) * c.options.ReconnectInterval

c.logger.logDebug("failed to reconnect: backing off...", "back-off-time", delay.String())
c.logger.logDebug("failed to recover: backing off...", "back-off-time", delay.String())

time.Sleep(delay)

Expand Down
13 changes: 10 additions & 3 deletions connection_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type (
// ConnectionOptions are used to describe how a new connection will be created.
ConnectionOptions struct {
ReturnHandler
logger []*slog.Logger
loggers []*slog.Logger
Config *Config
codec *codec
uri string
Expand Down Expand Up @@ -97,7 +97,7 @@ func WithConnectionOptionConnectionName(name string) ConnectionOption {
// WithConnectionOptionTextLogging enables structured text logging to the given writer.
func WithConnectionOptionTextLogging(w io.Writer, logLevel slog.Level) ConnectionOption {
return func(o *ConnectionOptions) {
o.logger = append(o.logger,
o.loggers = append(o.loggers,
slog.New(slog.NewTextHandler(
w,
&slog.HandlerOptions{
Expand All @@ -111,7 +111,7 @@ func WithConnectionOptionTextLogging(w io.Writer, logLevel slog.Level) Connectio
// WithConnectionOptionJSONLogging enables structured json logging to the given writer.
func WithConnectionOptionJSONLogging(w io.Writer, logLevel slog.Level) ConnectionOption {
return func(o *ConnectionOptions) {
o.logger = append(o.logger,
o.loggers = append(o.loggers,
slog.New(slog.NewJSONHandler(
w,
&slog.HandlerOptions{
Expand All @@ -122,6 +122,13 @@ func WithConnectionOptionJSONLogging(w io.Writer, logLevel slog.Level) Connectio
}
}

// WithConnectionOptionMultipleLoggers adds multiple loggers.
func WithConnectionOptionMultipleLoggers(loggers []*slog.Logger) ConnectionOption {
return func(o *ConnectionOptions) {
o.loggers = append(o.loggers, loggers...)
}
}

// WithConnectionOptionAMQPConfig sets the amqp.Config that will be used to create the connection.
//
// Warning: this will override any values set in the connection config.
Expand Down
Loading

0 comments on commit f1f5bdb

Please sign in to comment.