From cf9aff954ef916aaf1bddea362bcbb91466e60a1 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 25 Oct 2018 13:37:49 -0600 Subject: [PATCH] Reduce channels in AbstractSimpleTransportTestCase (#34863) This is related to #30876. The AbstractSimpleTransportTestCase initiates many tcp connections. There are normally over 1,000 connections in TIME_WAIT at the end of the test. This is because every test opens at least two different transports that connect to each other with 13 channel connection profiles. This commit modifies the default connection profile used by this test to 6. One connection for each type, except for REG which gets 2 connections. --- .../netty4/SimpleNetty4TransportTests.java | 2 +- .../nio/SimpleNioTransportTests.java | 2 +- .../transport/ConnectionManager.java | 21 +-------------- .../transport/ConnectionProfile.java | 27 +++++++++++++++++++ .../transport/ConnectionManagerTests.java | 10 +++---- .../AbstractSimpleTransportTestCase.java | 24 +++++++++++++---- .../transport/MockTcpTransportTests.java | 2 +- .../nio/SimpleMockNioTransportTests.java | 2 +- ...stractSimpleSecurityTransportTestCase.java | 3 +-- ...pleSecurityNetty4ServerTransportTests.java | 2 +- .../nio/SimpleSecurityNioTransportTests.java | 2 +- 11 files changed, 59 insertions(+), 38 deletions(-) diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java index 4e63727024fb0..e7faac8ae01db 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java @@ -73,7 +73,7 @@ protected Version getCurrentVersion() { } }; MockTransportService mockTransportService = - MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet()); + MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet()); mockTransportService.start(); return mockTransportService; } diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java index 8f6d78b481ddf..33d40b9f735fa 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java @@ -77,7 +77,7 @@ protected Version getCurrentVersion() { } }; MockTransportService mockTransportService = - MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet()); + MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet()); mockTransportService.start(); return mockTransportService; } diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java index 4e4d369330c80..5f2635fac88d9 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java @@ -67,7 +67,7 @@ public class ConnectionManager implements Closeable { private final DelegatingNodeConnectionListener connectionListener = new DelegatingNodeConnectionListener(); public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool) { - this(settings, transport, threadPool, buildDefaultConnectionProfile(settings)); + this(settings, transport, threadPool, ConnectionProfile.buildDefaultConnectionProfile(settings)); } public ConnectionManager(Settings settings, Transport transport, ThreadPool threadPool, ConnectionProfile defaultProfile) { @@ -323,23 +323,4 @@ public void onConnectionClosed(Transport.Connection connection) { } } } - - public static ConnectionProfile buildDefaultConnectionProfile(Settings settings) { - int connectionsPerNodeRecovery = TransportService.CONNECTIONS_PER_NODE_RECOVERY.get(settings); - int connectionsPerNodeBulk = TransportService.CONNECTIONS_PER_NODE_BULK.get(settings); - int connectionsPerNodeReg = TransportService.CONNECTIONS_PER_NODE_REG.get(settings); - int connectionsPerNodeState = TransportService.CONNECTIONS_PER_NODE_STATE.get(settings); - int connectionsPerNodePing = TransportService.CONNECTIONS_PER_NODE_PING.get(settings); - ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); - builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings)); - builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings)); - builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK); - builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING); - // if we are not master eligible we don't need a dedicated channel to publish the state - builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE); - // if we are not a data-node we don't need any dedicated channels for recovery - builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY); - builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG); - return builder.build(); - } } diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java b/server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java index b9ed42ca00a56..d6183655fa2cb 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java @@ -18,7 +18,9 @@ */ package org.elasticsearch.transport; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import java.util.ArrayList; @@ -91,6 +93,31 @@ public static ConnectionProfile resolveConnectionProfile(@Nullable ConnectionPro } } + /** + * Builds a default connection profile based on the provided settings. + * + * @param settings to build the connection profile from + * @return the connection profile + */ + public static ConnectionProfile buildDefaultConnectionProfile(Settings settings) { + int connectionsPerNodeRecovery = TransportService.CONNECTIONS_PER_NODE_RECOVERY.get(settings); + int connectionsPerNodeBulk = TransportService.CONNECTIONS_PER_NODE_BULK.get(settings); + int connectionsPerNodeReg = TransportService.CONNECTIONS_PER_NODE_REG.get(settings); + int connectionsPerNodeState = TransportService.CONNECTIONS_PER_NODE_STATE.get(settings); + int connectionsPerNodePing = TransportService.CONNECTIONS_PER_NODE_PING.get(settings); + Builder builder = new Builder(); + builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings)); + builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings)); + builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK); + builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING); + // if we are not master eligible we don't need a dedicated channel to publish the state + builder.addConnections(DiscoveryNode.isMasterNode(settings) ? connectionsPerNodeState : 0, TransportRequestOptions.Type.STATE); + // if we are not a data-node we don't need any dedicated channels for recovery + builder.addConnections(DiscoveryNode.isDataNode(settings) ? connectionsPerNodeRecovery : 0, TransportRequestOptions.Type.RECOVERY); + builder.addConnections(connectionsPerNodeReg, TransportRequestOptions.Type.REG); + return builder.build(); + } + /** * A builder to build a new {@link ConnectionProfile} */ diff --git a/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java b/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java index bff5a2b122d2f..3dc9e0aece71a 100644 --- a/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java @@ -64,7 +64,7 @@ public void stopThreadPool() { } public void testConnectionProfileResolve() { - final ConnectionProfile defaultProfile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY); + final ConnectionProfile defaultProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY); assertEquals(defaultProfile, ConnectionProfile.resolveConnectionProfile(null, defaultProfile)); final ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); @@ -96,7 +96,7 @@ public void testConnectionProfileResolve() { } public void testDefaultConnectionProfile() { - ConnectionProfile profile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY); + ConnectionProfile profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY); assertEquals(13, profile.getNumConnections()); assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG)); @@ -104,7 +104,7 @@ public void testDefaultConnectionProfile() { assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); - profile = ConnectionManager.buildDefaultConnectionProfile(Settings.builder().put("node.master", false).build()); + profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.master", false).build()); assertEquals(12, profile.getNumConnections()); assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG)); @@ -112,7 +112,7 @@ public void testDefaultConnectionProfile() { assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); - profile = ConnectionManager.buildDefaultConnectionProfile(Settings.builder().put("node.data", false).build()); + profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.data", false).build()); assertEquals(11, profile.getNumConnections()); assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG)); @@ -120,7 +120,7 @@ public void testDefaultConnectionProfile() { assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); - profile = ConnectionManager.buildDefaultConnectionProfile(Settings.builder().put("node.data", false) + profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.data", false) .put("node.master", false).build()); assertEquals(10, profile.getNumConnections()); assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 3b64f00084ec8..85a654c4cac36 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -116,7 +116,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { protected abstract MockTransportService build(Settings settings, Version version, ClusterSettings clusterSettings, boolean doHandshake); protected int channelsPerNodeConnection() { - return 13; + // This is a customized profile for this test case. + return 6; } @Override @@ -125,9 +126,17 @@ public void setUp() throws Exception { super.setUp(); threadPool = new TestThreadPool(getClass().getName()); clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - serviceA = buildService("TS_A", version0, clusterSettings); // this one supports dynamic tracer updates + Settings connectionSettings = Settings.builder() + .put(TransportService.CONNECTIONS_PER_NODE_RECOVERY.getKey(), 1) + .put(TransportService.CONNECTIONS_PER_NODE_BULK.getKey(), 1) + .put(TransportService.CONNECTIONS_PER_NODE_REG.getKey(), 2) + .put(TransportService.CONNECTIONS_PER_NODE_STATE.getKey(), 1) + .put(TransportService.CONNECTIONS_PER_NODE_PING.getKey(), 1) + .build(); + + serviceA = buildService("TS_A", version0, clusterSettings, connectionSettings); // this one supports dynamic tracer updates nodeA = serviceA.getLocalNode(); - serviceB = buildService("TS_B", version1, null); // this one doesn't support dynamic tracer updates + serviceB = buildService("TS_B", version1, null, connectionSettings); // this one doesn't support dynamic tracer updates nodeB = serviceB.getLocalNode(); // wait till all nodes are properly connected and the event has been sent, so tests in this class // will not get this callback called on the connections done in this setup @@ -174,7 +183,12 @@ private MockTransportService buildService(final String name, final Version versi } protected MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings) { - return buildService(name, version, clusterSettings, Settings.EMPTY, true, true); + return buildService(name, version, clusterSettings, Settings.EMPTY); + } + + protected MockTransportService buildService(final String name, final Version version, ClusterSettings clusterSettings, + Settings settings) { + return buildService(name, version, clusterSettings, settings, true, true); } @Override @@ -1999,7 +2013,7 @@ protected String handleRequest(TcpChannel mockChannel, String profileName, Strea assertEquals("handshake failed", exception.getCause().getMessage()); } - ConnectionProfile connectionProfile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY); + ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY); try (TransportService service = buildService("TS_TPC", Version.CURRENT, null); TcpTransport.NodeChannels connection = originalTransport.openConnection( new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0), diff --git a/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java b/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java index 4084d08b2e806..e8b5f38b88df1 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java @@ -50,7 +50,7 @@ public Version executeHandshake(DiscoveryNode node, TcpChannel mockChannel, Time } }; MockTransportService mockTransportService = - MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet()); + MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet()); mockTransportService.start(); return mockTransportService; } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java index bebe50752f4e1..10f089e855a5d 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java @@ -78,7 +78,7 @@ protected Version getCurrentVersion() { }; MockTransportService mockTransportService = - MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, Collections.emptySet()); + MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet()); mockTransportService.start(); return mockTransportService; } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/AbstractSimpleSecurityTransportTestCase.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/AbstractSimpleSecurityTransportTestCase.java index 2e1a423d5fdf8..077edf22c91ca 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/AbstractSimpleSecurityTransportTestCase.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/AbstractSimpleSecurityTransportTestCase.java @@ -21,7 +21,6 @@ import org.elasticsearch.transport.AbstractSimpleTransportTestCase; import org.elasticsearch.transport.BindTransportException; import org.elasticsearch.transport.ConnectTransportException; -import org.elasticsearch.transport.ConnectionManager; import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TransportRequestOptions; @@ -111,7 +110,7 @@ public void testTcpHandshake() throws IOException, InterruptedException { assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport); TcpTransport originalTransport = (TcpTransport) serviceA.getOriginalTransport(); - ConnectionProfile connectionProfile = ConnectionManager.buildDefaultConnectionProfile(Settings.EMPTY); + ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY); try (TransportService service = buildService("TS_TPC", Version.CURRENT, null); TcpTransport.NodeChannels connection = originalTransport.openConnection( new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0), diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java index 88895034df9de..291b39f4b05ba 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java @@ -93,7 +93,7 @@ protected Version getCurrentVersion() { }; MockTransportService mockTransportService = - MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, + MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet()); mockTransportService.start(); return mockTransportService; diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java index 5208d58d74390..7fd4d8b5e0319 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java @@ -55,7 +55,7 @@ protected Version getCurrentVersion() { }; MockTransportService mockTransportService = - MockTransportService.createNewService(Settings.EMPTY, transport, version, threadPool, clusterSettings, + MockTransportService.createNewService(settings, transport, version, threadPool, clusterSettings, Collections.emptySet()); mockTransportService.start(); return mockTransportService;