Skip to content

Commit

Permalink
GH-1443: Pull CCF.resetConnection() to CF
Browse files Browse the repository at this point in the history
Resolves #1443

**Cherry-pick to `2.4.x`**
  • Loading branch information
garyrussell authored Mar 29, 2022
1 parent 38da8c7 commit ea01566
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.amqp.AmqpException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
Expand All @@ -38,7 +39,7 @@
* @since 1.3
*/
public abstract class AbstractRoutingConnectionFactory implements ConnectionFactory, RoutingConnectionFactory,
InitializingBean {
InitializingBean, DisposableBean {

private final Map<Object, ConnectionFactory> targetConnectionFactories =
new ConcurrentHashMap<Object, ConnectionFactory>();
Expand Down Expand Up @@ -260,4 +261,15 @@ protected ConnectionFactory removeTargetConnectionFactory(Object key) {
@Nullable
protected abstract Object determineCurrentLookupKey();

@Override
public void destroy() {
resetConnection();
}

@Override
public void resetConnection() {
this.targetConnectionFactories.values().forEach(factory -> factory.resetConnection());
this.defaultTargetConnectionFactory.resetConnection();
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -882,6 +882,7 @@ public final void destroy() {
* used to force a reconnect to the primary broker after failing over to a secondary
* broker.
*/
@Override
public void resetConnection() {
synchronized (this.connectionMonitor) {
if (this.connection.target != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -83,4 +83,12 @@ default boolean isPublisherReturns() {
return false;
}

/**
* Close any connection(s) that might be cached by this factory. This does not prevent
* new connections from being opened.
* @since 2.4.4
*/
default void resetConnection() {
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -350,7 +350,7 @@ protected ConnectionFactory createConnectionFactory(String address, String node)
}

@Override
public void destroy() {
public void resetConnection() {
Exception lastException = null;
for (ConnectionFactory connectionFactory : this.nodeFactories.values()) {
if (connectionFactory instanceof DisposableBean) {
Expand All @@ -367,4 +367,9 @@ public void destroy() {
}
}

@Override
public void destroy() {
resetConnection();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ public synchronized Connection createConnection() throws AmqpException {
* used to force a reconnect to the primary broker after failing over to a secondary
* broker.
*/
@Override
public void resetConnection() {
destroy();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 the original author or authors.
* Copyright 2020-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -140,6 +140,7 @@ public void closeThreadChannel() {
* used to force a reconnect to the primary broker after failing over to a secondary
* broker.
*/
@Override
public void resetConnection() {
destroy();
}
Expand Down

0 comments on commit ea01566

Please sign in to comment.