diff --git a/cmd/ingress/main.go b/cmd/ingress/main.go index 6ae82650f5..aa01deb439 100644 --- a/cmd/ingress/main.go +++ b/cmd/ingress/main.go @@ -111,7 +111,7 @@ func main() { if err != nil { logger.Errorf("error creating RabbitMQ connections: %s, waiting for a retry", err) } - defer rmqHelper.CleanupRabbitMQ(env.connection, env.channel, logger) + defer rmqHelper.CleanupRabbitMQ(env.connection, logger) env.reporter = ingress.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String())) connectionArgs := kncloudevents.ConnectionArgs{ @@ -233,11 +233,10 @@ func (env *envConfig) CreateRabbitMQConnections( err = channel.Confirm(false) } if err != nil { - rmqHelper.CloseRabbitMQConnections(conn, channel, logger) + rmqHelper.CloseRabbitMQConnections(conn, logger) logger.Warn("Retrying RabbitMQ connections setup") go rmqHelper.SignalRetry(true) return nil, nil, err } - return conn, channel, nil } diff --git a/pkg/rabbit/setup.go b/pkg/rabbit/setup.go index 731712154a..4028c24962 100644 --- a/pkg/rabbit/setup.go +++ b/pkg/rabbit/setup.go @@ -77,7 +77,7 @@ func (r *RabbitMQHelper) WatchRabbitMQConnections( } if !r.cleaningUp { logger.Warn("Lost connection to RabbitMQ, reconnecting. Error: %v", zap.Error(err)) - r.CloseRabbitMQConnections(connection, channel, logger) + r.CloseRabbitMQConnections(connection, logger) r.SignalRetry(true) } } @@ -90,7 +90,7 @@ func (r *RabbitMQHelper) WaitForRetrySignal() bool { return <-r.retryChannel } -func (r *RabbitMQHelper) CloseRabbitMQConnections(connection *amqp.Connection, channel *amqp.Channel, logger *zap.SugaredLogger) { +func (r *RabbitMQHelper) CloseRabbitMQConnections(connection *amqp.Connection, logger *zap.SugaredLogger) { r.cleaningUp = true if connection != nil && !connection.IsClosed() { if err := connection.Close(); err != nil { @@ -100,8 +100,8 @@ func (r *RabbitMQHelper) CloseRabbitMQConnections(connection *amqp.Connection, c r.cleaningUp = false } -func (r *RabbitMQHelper) CleanupRabbitMQ(connection *amqp.Connection, channel *amqp.Channel, logger *zap.SugaredLogger) { +func (r *RabbitMQHelper) CleanupRabbitMQ(connection *amqp.Connection, logger *zap.SugaredLogger) { r.SignalRetry(false) - r.CloseRabbitMQConnections(connection, channel, logger) + r.CloseRabbitMQConnections(connection, logger) close(r.retryChannel) } diff --git a/pkg/rabbit/setup_test.go b/pkg/rabbit/setup_test.go index f4e23300af..078f756a08 100644 --- a/pkg/rabbit/setup_test.go +++ b/pkg/rabbit/setup_test.go @@ -40,7 +40,7 @@ func Test_SetupRabbitMQReconnections(t *testing.T) { }() <-testChannel // Testing a failing setup - conn, channel, err := rabbitMQHelper.SetupRabbitMQ("amqp://localhost:5672/%2f", logger) + conn, _, err := rabbitMQHelper.SetupRabbitMQ("amqp://localhost:5672/%2f", logger) <-testChannel if err == nil { t.Error("SetupRabbitMQ should fail with the default DialFunc in testing environments") @@ -51,5 +51,5 @@ func Test_SetupRabbitMQReconnections(t *testing.T) { // Test SignalRetry func rabbitMQHelper.SignalRetry(true) <-testChannel - rabbitMQHelper.CleanupRabbitMQ(conn, channel, logger) + rabbitMQHelper.CleanupRabbitMQ(conn, logger) }