Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add functionality to renew an existing connection #12

Merged
merged 2 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,52 @@ func (c *Connection) Recover() error {
return nil
}

// Renew can be used to establish a new connection.
// If new URI is provided, it will be used to renew the connection instead of the current URI.
func (c *Connection) Renew(uri ...string) error {
const errMessage = "failed to renew: %w"

if len(uri) == 1 {
c.options.uriMU.Lock()
c.options.uri = uri[0]
c.options.uriMU.Unlock()
}

if err := c.closeForRenewal(); err != nil {
return fmt.Errorf(errMessage, err)
}

if err := c.recoverConnection(); err != nil {
return fmt.Errorf(errMessage, err)
}

if err := c.recoverChannel(); err != nil {
return fmt.Errorf(errMessage, err)
}

return nil
}

func (c *Connection) closeForRenewal() error {
const errMessage = "failed to close the connection to the broker gracefully on renewal: %w"

if c.amqpConnection != nil {
c.logger.logDebug("closing connection")

c.connectionCloseWG.Add(closeWGDelta)

if err := c.amqpConnection.Close(); err != nil {
return fmt.Errorf(errMessage, err)
}

c.connectionCloseWG.Wait()

c.logger.logDebug("gracefully closed connection to the broker")
}

return nil
}

// RemoveQueue removes the queue from the broker including all bindings then purges the messages based on
// broker configuration, returning the number of messages purged.
//
Expand Down Expand Up @@ -224,7 +270,9 @@ func (c *Connection) createConnection() error {
var err error

c.amqpConnMU.Lock()
c.options.uriMU.RLock()
c.amqpConnection, err = amqp.DialConfig(c.options.uri, amqp.Config(*c.options.Config))
c.options.uriMU.RUnlock()
c.amqpConnMU.Unlock()

if err != nil {
Expand Down
20 changes: 20 additions & 0 deletions connection_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ package clarimq

import (
"encoding/json"
"fmt"
"net"
"net/url"
"strconv"
"sync"
"time"

amqp "github.com/rabbitmq/amqp091-go"
Expand Down Expand Up @@ -32,6 +37,7 @@ type (
Config *Config
codec *codec
uri string
uriMU *sync.RWMutex
PrefetchCount int
RecoveryInterval time.Duration
MaxRecoveryRetries int
Expand All @@ -53,9 +59,23 @@ type (
ReturnHandler func(Return)
)

// ToURI returns the URI representation of the ConnectionSettings.
// Includes url escaping for safe usage.
func (c *ConnectionSettings) ToURI() string {
return fmt.Sprintf("amqp://%s:%s@%s/",
url.QueryEscape(c.UserName),
url.QueryEscape(c.Password),
net.JoinHostPort(
url.QueryEscape(c.Host),
strconv.Itoa(c.Port),
),
)
}

func defaultConnectionOptions(uri string) *ConnectionOptions {
return &ConnectionOptions{
uri: uri,
uriMU: &sync.RWMutex{},
RecoveryInterval: defaultRecoveryInterval,
MaxRecoveryRetries: defaultMaxRecoveryRetries,
BackOffFactor: defaultBackOffFactor,
Expand Down
6 changes: 5 additions & 1 deletion retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,12 @@ func (c *Consumer) setupRetryPublisher() error {
var err error

if c.options.RetryOptions.RetryConn == nil {
c.conn.options.uriMU.RLock()
uri := c.conn.options.uri
c.conn.options.uriMU.RUnlock()

if c.options.RetryOptions.RetryConn, err = NewConnection(
c.conn.options.uri,
uri,
WithConnectionOptionConnectionName(fmt.Sprintf("%s_%s", c.options.ConsumerOptions.Name, keyRetry)),
); err != nil {
return fmt.Errorf(errMessage, err)
Expand Down
Loading