diff --git a/libs/core/src/main/java/org/elasticsearch/core/AbstractRefCounted.java b/libs/core/src/main/java/org/elasticsearch/core/AbstractRefCounted.java index 1ad8724dce2bf..d55c99f4a3b2b 100644 --- a/libs/core/src/main/java/org/elasticsearch/core/AbstractRefCounted.java +++ b/libs/core/src/main/java/org/elasticsearch/core/AbstractRefCounted.java @@ -58,6 +58,11 @@ public final boolean decRef() { return false; } + @Override + public final boolean hasReferences() { + return refCount.get() > 0; + } + /** * Called whenever the ref count is incremented or decremented. Can be overridden to record access to the instance for debugging * purposes. @@ -74,7 +79,7 @@ protected void alreadyClosed() { /** * Returns the current reference count. */ - public int refCount() { + public final int refCount() { return this.refCount.get(); } diff --git a/libs/core/src/main/java/org/elasticsearch/core/RefCounted.java b/libs/core/src/main/java/org/elasticsearch/core/RefCounted.java index fbe2ce9b7c97b..0f7dec4968ba7 100644 --- a/libs/core/src/main/java/org/elasticsearch/core/RefCounted.java +++ b/libs/core/src/main/java/org/elasticsearch/core/RefCounted.java @@ -54,4 +54,12 @@ public interface RefCounted { * @return returns {@code true} if the ref count dropped to 0 as a result of calling this method */ boolean decRef(); + + /** + * Returns {@code true} only if there was at least one active reference when the method was called; if it returns {@code false} then the + * object is closed; future attempts to acquire references will fail. + * + * @return whether there are currently any active references to this object. + */ + boolean hasReferences(); } diff --git a/libs/core/src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTests.java b/libs/core/src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTests.java index 8579ebddee87c..56d10f39f6c61 100644 --- a/libs/core/src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTests.java +++ b/libs/core/src/test/java/org/elasticsearch/common/util/concurrent/RefCountedTests.java @@ -59,13 +59,7 @@ public void testRefCount() { assertThat( expectThrows(IllegalStateException.class, counted::incRef).getMessage(), equalTo(AbstractRefCounted.ALREADY_CLOSED_MESSAGE)); - - try { - counted.ensureOpen(); - fail(" expected exception"); - } catch (IllegalStateException ex) { - assertThat(ex.getMessage(), equalTo("closed")); - } + assertThat(expectThrows(IllegalStateException.class, counted::ensureOpen).getMessage(), equalTo("closed")); } public void testMultiThreaded() throws InterruptedException { @@ -79,6 +73,7 @@ public void testMultiThreaded() throws InterruptedException { latch.await(); for (int j = 0; j < 10000; j++) { counted.incRef(); + assertTrue(counted.hasReferences()); try { counted.ensureOpen(); } finally { @@ -96,13 +91,11 @@ public void testMultiThreaded() throws InterruptedException { thread.join(); } counted.decRef(); - try { - counted.ensureOpen(); - fail("expected to be closed"); - } catch (IllegalStateException ex) { - assertThat(ex.getMessage(), equalTo("closed")); - } + assertThat(expectThrows(IllegalStateException.class, counted::ensureOpen).getMessage(), equalTo("closed")); + assertThat(expectThrows(IllegalStateException.class, counted::incRef).getMessage(), + equalTo(AbstractRefCounted.ALREADY_CLOSED_MESSAGE)); assertThat(counted.refCount(), is(0)); + assertFalse(counted.hasReferences()); assertThat(exceptions, Matchers.emptyIterable()); } @@ -117,7 +110,8 @@ protected void closeInternal() { public void ensureOpen() { if (closed.get()) { - assert this.refCount() == 0; + assertEquals(0, this.refCount()); + assertFalse(hasReferences()); throw new IllegalStateException("closed"); } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksIT.java index 24debca9e0af4..cc4fadccc1b3e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksIT.java @@ -331,7 +331,9 @@ public void testRemoveBanParentsOnDisconnect() throws Exception { if (bannedParent.getNodeId().equals(node.getId()) && randomBoolean()) { Collection childConns = taskManager.startBanOnChildTasks(bannedParent.getId(), "", () -> {}); for (Transport.Connection connection : randomSubsetOf(childConns)) { - connection.close(); + if (connection.getNode().equals(node) == false) { + connection.close(); + } } } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/discovery/ClusterDisruptionIT.java b/server/src/internalClusterTest/java/org/elasticsearch/discovery/ClusterDisruptionIT.java index 5bed879368500..b9fe36a371888 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/discovery/ClusterDisruptionIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/discovery/ClusterDisruptionIT.java @@ -16,19 +16,22 @@ import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.coordination.ClusterBootstrapService; +import org.elasticsearch.cluster.coordination.FollowersChecker; import org.elasticsearch.cluster.coordination.LagDetector; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.routing.Murmur3HashFunction; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; @@ -40,6 +43,8 @@ import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions; import org.elasticsearch.test.disruption.ServiceDisruptionScheme; import org.elasticsearch.test.junit.annotations.TestIssueLogging; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportService; import java.util.ArrayList; import java.util.Collections; @@ -58,6 +63,11 @@ import static org.elasticsearch.action.DocWriteResponse.Result.CREATED; import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED; +import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING; +import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING; +import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_INTERVAL_SETTING; +import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING; +import static org.elasticsearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.everyItem; @@ -494,4 +504,63 @@ public void testRestartNodeWhileIndexing() throws Exception { } } + public void testRejoinWhileBeingRemoved() { + final String masterNode = internalCluster().startMasterOnlyNode(Settings.builder() + .put(FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), "100ms") + .put(FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), "1") + .build()); + final String dataNode = internalCluster().startDataOnlyNode(Settings.builder() + .put(DISCOVERY_FIND_PEERS_INTERVAL_SETTING.getKey(), "100ms") + .put(LEADER_CHECK_INTERVAL_SETTING.getKey(), "100ms") + .put(LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), "1") + .build()); + + final ClusterService masterClusterService = internalCluster().getInstance(ClusterService.class, masterNode); + final PlainActionFuture removedNode = new PlainActionFuture<>(); + masterClusterService.addListener(clusterChangedEvent -> { + if (removedNode.isDone() == false && clusterChangedEvent.state().nodes().getDataNodes().isEmpty()) { + removedNode.onResponse(null); + } + }); + + final ClusterService dataClusterService = internalCluster().getInstance(ClusterService.class, dataNode); + final PlainActionFuture failedLeader = new PlainActionFuture() { + @Override + protected boolean blockingAllowed() { + // we're deliberately blocking the cluster applier on the master until the data node starts to rejoin + return true; + } + }; + final AtomicBoolean dataNodeHasMaster = new AtomicBoolean(true); + dataClusterService.addListener(clusterChangedEvent -> { + dataNodeHasMaster.set(clusterChangedEvent.state().nodes().getMasterNode() != null); + if (failedLeader.isDone() == false && dataNodeHasMaster.get() == false) { + failedLeader.onResponse(null); + } + }); + + masterClusterService.addHighPriorityApplier(event -> { + failedLeader.actionGet(); + if (dataNodeHasMaster.get() == false) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new AssertionError("unexpected", e); + } + } + }); + + final MockTransportService dataTransportService + = (MockTransportService) internalCluster().getInstance(TransportService.class, dataNode); + dataTransportService.addRequestHandlingBehavior(FollowersChecker.FOLLOWER_CHECK_ACTION_NAME, (handler, request, channel, task) -> { + if (removedNode.isDone() == false) { + channel.sendResponse(new ElasticsearchException("simulated check failure")); + } else { + handler.messageReceived(request, channel, task); + } + }); + + removedNode.actionGet(10, TimeUnit.SECONDS); + ensureStableCluster(2); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java index a2502b4aa0e29..b8e525d0b9dac 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java @@ -11,21 +11,20 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.GroupedActionListener; -import org.elasticsearch.action.support.ListenableActionFuture; import org.elasticsearch.cluster.coordination.FollowersChecker; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterApplier; -import org.elasticsearch.core.Nullable; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -37,6 +36,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.common.settings.Setting.Property; import static org.elasticsearch.common.settings.Setting.positiveTimeSetting; @@ -70,8 +70,7 @@ public class NodeConnectionsService extends AbstractLifecycleComponent { // Crucially there are no blocking calls under this mutex: it is not held while connecting or disconnecting. private final Object mutex = new Object(); - // contains an entry for every node in the latest cluster state, as well as for nodes from which we are in the process of - // disconnecting + // contains an entry for every node in the latest cluster state private final Map targetsByNode = new HashMap<>(); private final TimeValue reconnectInterval; @@ -86,7 +85,7 @@ public NodeConnectionsService(Settings settings, ThreadPool threadPool, Transpor /** * Connect to all the given nodes, but do not disconnect from any extra nodes. Calls the completion handler on completion of all - * connection attempts to _new_ nodes, but not on attempts to re-establish connections to nodes that are already known. + * connection attempts to _new_ nodes, without waiting for any attempts to re-establish connections to nodes that were already known. */ public void connectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletion) { @@ -102,22 +101,20 @@ public void connectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletion) synchronized (mutex) { for (final DiscoveryNode discoveryNode : discoveryNodes) { ConnectionTarget connectionTarget = targetsByNode.get(discoveryNode); - final boolean isNewNode; - if (connectionTarget == null) { - // new node, set up target and listener + final boolean isNewNode = connectionTarget == null; + if (isNewNode) { connectionTarget = new ConnectionTarget(discoveryNode); targetsByNode.put(discoveryNode, connectionTarget); - isNewNode = true; - } else { - // existing node, but maybe we're disconnecting from it, in which case it was recently removed from the cluster - // state and has now been re-added so we should wait for the re-connection - isNewNode = connectionTarget.isPendingDisconnection(); } if (isNewNode) { - runnables.add(connectionTarget.connect(listener)); + logger.debug("connecting to {}", discoveryNode); + runnables.add(connectionTarget.connect(ActionListener.runAfter( + listener, + () -> logger.debug("connected to {}", discoveryNode)))); } else { // known node, try and ensure it's connected but do not wait + logger.trace("checking connection to existing node [{}]", discoveryNode); runnables.add(connectionTarget.connect(null)); runnables.add(() -> listener.onResponse(null)); } @@ -139,33 +136,7 @@ public void disconnectFromNodesExcept(DiscoveryNodes discoveryNodes) { } for (final DiscoveryNode discoveryNode : nodesToDisconnect) { - runnables.add(targetsByNode.get(discoveryNode).disconnect()); - } - } - runnables.forEach(Runnable::run); - } - - void ensureConnections(Runnable onCompletion) { - // Called by tests after some disruption has concluded. It is possible that one or more targets are currently CONNECTING and have - // been since the disruption was active, and that the connection attempt was thwarted by a concurrent disruption to the connection. - // If so, we cannot simply add our listener to the queue because it will be notified when this CONNECTING activity completes even - // though it was disrupted. We must therefore wait for all the current activity to finish and then go through and reconnect to - // any missing nodes. - awaitPendingActivity(() -> connectDisconnectedTargets(onCompletion)); - } - - private void awaitPendingActivity(Runnable onCompletion) { - final List runnables = new ArrayList<>(); - synchronized (mutex) { - final Collection connectionTargets = targetsByNode.values(); - if (connectionTargets.isEmpty()) { - runnables.add(onCompletion); - } else { - final GroupedActionListener listener = new GroupedActionListener<>( - ActionListener.wrap(onCompletion), connectionTargets.size()); - for (final ConnectionTarget connectionTarget : connectionTargets) { - runnables.add(connectionTarget.awaitCurrentActivity(listener)); - } + runnables.add(targetsByNode.remove(discoveryNode)::disconnect); } } runnables.forEach(Runnable::run); @@ -173,21 +144,20 @@ private void awaitPendingActivity(Runnable onCompletion) { /** * Makes a single attempt to reconnect to any nodes which are disconnected but should be connected. Does not attempt to reconnect any - * nodes which are in the process of disconnecting. The onCompletion handler is called after all ongoing connection/disconnection - * attempts have completed. + * nodes which are in the process of disconnecting. The onCompletion handler is called after all connection attempts have completed. */ - private void connectDisconnectedTargets(Runnable onCompletion) { + void ensureConnections(Runnable onCompletion) { final List runnables = new ArrayList<>(); synchronized (mutex) { final Collection connectionTargets = targetsByNode.values(); if (connectionTargets.isEmpty()) { runnables.add(onCompletion); } else { - logger.trace("connectDisconnectedTargets: {}", targetsByNode); + logger.trace("ensureConnections: {}", targetsByNode); final GroupedActionListener listener = new GroupedActionListener<>( ActionListener.wrap(onCompletion), connectionTargets.size()); for (final ConnectionTarget connectionTarget : connectionTargets) { - runnables.add(connectionTarget.ensureConnected(listener)); + runnables.add(connectionTarget.connect(listener)); } } } @@ -197,7 +167,7 @@ private void connectDisconnectedTargets(Runnable onCompletion) { class ConnectionChecker extends AbstractRunnable { protected void doRun() { if (connectionChecker == this) { - connectDisconnectedTargets(this::scheduleNextCheck); + ensureConnections(this::scheduleNextCheck); } } @@ -243,231 +213,75 @@ public void reconnectToNodes(DiscoveryNodes discoveryNodes, Runnable onCompletio }); } - private enum ActivityType { - IDLE, - CONNECTING, - DISCONNECTING - } - - /** - * {@link ConnectionTarget} ensures that we are never concurrently connecting to and disconnecting from a node, and that we eventually - * either connect to or disconnect from it according to whether {@link ConnectionTarget#connect(ActionListener)} or - * {@link ConnectionTarget#disconnect()} was called last. - *

- * Each {@link ConnectionTarget} is in one of these states: - *

- * - idle ({@link ConnectionTarget#future} has no listeners) - * - awaiting connection ({@link ConnectionTarget#future} may contain listeners awaiting a connection) - * - awaiting disconnection ({@link ConnectionTarget#future} may contain listeners awaiting a disconnection) - *

- * It will be awaiting connection (respectively disconnection) after calling {@code connect()} (respectively {@code disconnect()}). It - * will eventually become idle if these methods are not called infinitely often. - *

- * These methods return a {@link Runnable} which starts the connection/disconnection process iff it was idle before the method was - * called, and which notifies any failed listeners if the {@code ConnectionTarget} went from {@code CONNECTING} to {@code DISCONNECTING} - * or vice versa. The connection/disconnection process continues until all listeners have been removed, at which point it becomes idle - * again. - *

- * Additionally if the last step of the process was a disconnection then this target is removed from the current set of targets. Thus - * if this {@link ConnectionTarget} is idle and in the current set of targets then it should be connected. - *

- * All of the {@code listeners} are awaiting the completion of the same activity, which is either a connection or a disconnection. If - * we are currently connecting and then {@link ConnectionTarget#disconnect()} is called then all connection listeners are - * removed from the list so they can be notified of failure; once the connecting process has finished a disconnection will be started. - * Similarly if we are currently disconnecting and then {@link ConnectionTarget#connect(ActionListener)} is called then all - * disconnection listeners are immediately removed for failure notification and a connection is started once the disconnection is - * complete. - */ private class ConnectionTarget { private final DiscoveryNode discoveryNode; - private ListenableActionFuture future = new ListenableActionFuture<>(); - private ActivityType activityType = ActivityType.IDLE; // indicates what any listeners are awaiting - private final AtomicInteger consecutiveFailureCount = new AtomicInteger(); - - private final Runnable connectActivity = new AbstractRunnable() { - - final AbstractRunnable abstractRunnable = this; - - @Override - protected void doRun() { - assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held"; - if (transportService.nodeConnected(discoveryNode)) { - // transportService.connectToNode is a no-op if already connected, but we don't want any DEBUG logging in this case - // since we run this for every node on every cluster state update. - logger.trace("still connected to {}", discoveryNode); - onConnected(); - } else { - logger.debug("connecting to {}", discoveryNode); - transportService.connectToNode(discoveryNode, new ActionListener() { - @Override - public void onResponse(Void aVoid) { - assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held"; - logger.debug("connected to {}", discoveryNode); - onConnected(); - } - - @Override - public void onFailure(Exception e) { - abstractRunnable.onFailure(e); - } - }); - } - } - - private void onConnected() { - consecutiveFailureCount.set(0); - onCompletion(ActivityType.CONNECTING, null, disconnectActivity); - } - - @Override - public void onFailure(Exception e) { - assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held"; - final int currentFailureCount = consecutiveFailureCount.incrementAndGet(); - // only warn every 6th failure - final Level level = currentFailureCount % 6 == 1 ? Level.WARN : Level.DEBUG; - logger.log(level, new ParameterizedMessage("failed to connect to {} (tried [{}] times)", - discoveryNode, currentFailureCount), e); - onCompletion(ActivityType.CONNECTING, e, disconnectActivity); - } - - @Override - public String toString() { - return "connect to " + discoveryNode; - } - }; - - private final Runnable disconnectActivity = new AbstractRunnable() { - @Override - protected void doRun() { - assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held"; - transportService.disconnectFromNode(discoveryNode); - consecutiveFailureCount.set(0); - logger.debug("disconnected from {}", discoveryNode); - onCompletion(ActivityType.DISCONNECTING, null, connectActivity); - } - - @Override - public void onFailure(Exception e) { - assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held"; - consecutiveFailureCount.incrementAndGet(); - // we may not have disconnected, but will not retry, so this connection might have leaked - logger.warn(new ParameterizedMessage("failed to disconnect from {}, possible connection leak", discoveryNode), e); - assert false : "failed to disconnect from " + discoveryNode + ", possible connection leak\n" + e; - onCompletion(ActivityType.DISCONNECTING, e, connectActivity); - } - }; + private final AtomicReference connectionRef = new AtomicReference<>(); ConnectionTarget(DiscoveryNode discoveryNode) { this.discoveryNode = discoveryNode; } - Runnable connect(@Nullable ActionListener listener) { - return addListenerAndStartActivity(listener, ActivityType.CONNECTING, connectActivity, - "disconnection cancelled by reconnection"); + private void setConnectionRef(Releasable connectionReleasable) { + Releasables.close(connectionRef.getAndSet(connectionReleasable)); } - Runnable disconnect() { - return addListenerAndStartActivity(null, ActivityType.DISCONNECTING, disconnectActivity, - "connection cancelled by disconnection"); - } + Runnable connect(ActionListener listener) { + return () -> { + final boolean alreadyConnected = transportService.nodeConnected(discoveryNode); - Runnable ensureConnected(@Nullable ActionListener listener) { - assert Thread.holdsLock(mutex) : "mutex not held"; - - if (activityType == ActivityType.IDLE) { - if (transportService.nodeConnected(discoveryNode)) { - return () -> listener.onResponse(null); + if (alreadyConnected) { + logger.trace("refreshing connection to {}", discoveryNode); } else { - // target is disconnected, and we are currently idle, so start a connection process. - activityType = ActivityType.CONNECTING; - addListener(listener); - return connectActivity; + logger.debug("connecting to {}", discoveryNode); } - } else { - addListener(listener); - return () -> { - }; - } - } - - Runnable awaitCurrentActivity(ActionListener listener) { - assert Thread.holdsLock(mutex) : "mutex not held"; - - if (activityType == ActivityType.IDLE) { - return () -> listener.onResponse(null); - } else { - addListener(listener); - return () -> { - }; - } - } - - private void addListener(@Nullable ActionListener listener) { - assert Thread.holdsLock(mutex) : "mutex not held"; - assert activityType != ActivityType.IDLE; - if (listener != null) { - future.addListener(listener); - } - } - private ListenableActionFuture getAndClearFuture() { - assert Thread.holdsLock(mutex) : "mutex not held"; - final ListenableActionFuture drainedFuture = future; - future = new ListenableActionFuture<>(); - return drainedFuture; - } - - private Runnable addListenerAndStartActivity(@Nullable ActionListener listener, ActivityType newActivityType, - Runnable activity, String cancellationMessage) { - assert Thread.holdsLock(mutex) : "mutex not held"; - assert newActivityType.equals(ActivityType.IDLE) == false; - - if (activityType == ActivityType.IDLE) { - activityType = newActivityType; - addListener(listener); - return activity; - } - - if (activityType == newActivityType) { - addListener(listener); - return () -> { - }; - } - - activityType = newActivityType; - final ListenableActionFuture oldFuture = getAndClearFuture(); - addListener(listener); - return () -> oldFuture.onFailure(new ElasticsearchException(cancellationMessage)); - } - - private void onCompletion(ActivityType completedActivityType, @Nullable Exception e, Runnable oppositeActivity) { - assert Thread.holdsLock(mutex) == false : "mutex unexpectedly held"; - - final Runnable cleanup; - synchronized (mutex) { - assert activityType != ActivityType.IDLE; - if (activityType == completedActivityType) { - final ListenableActionFuture oldFuture = getAndClearFuture(); - activityType = ActivityType.IDLE; + // It's possible that connectionRef is a reference to an older connection that closed out from under us, but that something + // else has opened a fresh connection to the node. Therefore we always call connectToNode() and update connectionRef. + transportService.connectToNode(discoveryNode, new ActionListener() { + @Override + public void onResponse(Releasable connectionReleasable) { + if (alreadyConnected) { + logger.trace("refreshed connection to {}", discoveryNode); + } else { + logger.debug("connected to {}", discoveryNode); + } + consecutiveFailureCount.set(0); + setConnectionRef(connectionReleasable); - cleanup = e == null ? () -> oldFuture.onResponse(null) : () -> oldFuture.onFailure(e); + final boolean isActive; + synchronized (mutex) { + isActive = targetsByNode.get(discoveryNode) == ConnectionTarget.this; + } + if (isActive == false) { + logger.debug("connected to stale {} - releasing stale connection", discoveryNode); + setConnectionRef(null); + } + if (listener != null) { + listener.onResponse(null); + } + } - if (completedActivityType.equals(ActivityType.DISCONNECTING)) { - final ConnectionTarget removedTarget = targetsByNode.remove(discoveryNode); - assert removedTarget == this : removedTarget + " vs " + this; + @Override + public void onFailure(Exception e) { + final int currentFailureCount = consecutiveFailureCount.incrementAndGet(); + // only warn every 6th failure + final Level level = currentFailureCount % 6 == 1 ? Level.WARN : Level.DEBUG; + logger.log(level, new ParameterizedMessage("failed to connect to {} (tried [{}] times)", + discoveryNode, currentFailureCount), e); + setConnectionRef(null); + if (listener != null) { + listener.onFailure(e); + } } - } else { - cleanup = oppositeActivity; - } - } - cleanup.run(); + }); + }; } - boolean isPendingDisconnection() { - assert Thread.holdsLock(mutex) : "mutex not held"; - return activityType == ActivityType.DISCONNECTING; + void disconnect() { + setConnectionRef(null); + logger.debug("disconnected from {}", discoveryNode); } @Override @@ -475,7 +289,6 @@ public String toString() { synchronized (mutex) { return "ConnectionTarget{" + "discoveryNode=" + discoveryNode + - ", activityType=" + activityType + '}'; } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index 3769a85a5160a..5f65fccc2484e 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -34,6 +34,7 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterApplier; import org.elasticsearch.cluster.service.ClusterApplier.ClusterApplyListener; +import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; @@ -51,6 +52,7 @@ import org.elasticsearch.core.Booleans; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; import org.elasticsearch.discovery.ConfiguredHostsResolver; import org.elasticsearch.discovery.Discovery; @@ -298,6 +300,7 @@ public void onFailure(Exception e) { @Override public void onSuccess() { + onClusterStateApplied(); applyListener.onResponse(null); } }); @@ -305,6 +308,15 @@ public void onSuccess() { } } + private void onClusterStateApplied() { + assert Thread.currentThread().getName().contains('[' + ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME + ']') + || Thread.currentThread().getName().startsWith("TEST-") + : Thread.currentThread().getName(); + if (getMode() != Mode.CANDIDATE) { + joinHelper.onClusterStateApplied(); + } + } + PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) { assert publishRequest.getAcceptedState().nodes().getLocalNode().equals(getLocalNode()) : publishRequest.getAcceptedState().nodes().getLocalNode() + " != " + getLocalNode(); @@ -490,21 +502,44 @@ private void handleJoinRequest(JoinRequest joinRequest, JoinHelper.JoinCallback return; } - transportService.connectToNode(joinRequest.getSourceNode(), ActionListener.wrap(ignore -> { - final ClusterState stateForJoinValidation = getStateForMasterService(); - - if (stateForJoinValidation.nodes().isLocalNodeElectedMaster()) { - onJoinValidators.forEach(a -> a.accept(joinRequest.getSourceNode(), stateForJoinValidation)); - if (stateForJoinValidation.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) { - // we do this in a couple of places including the cluster update thread. This one here is really just best effort - // to ensure we fail as fast as possible. - JoinTaskExecutor.ensureVersionBarrier( - joinRequest.getSourceNode().getVersion(), - stateForJoinValidation.getNodes().getMinNodeVersion()); + transportService.connectToNode(joinRequest.getSourceNode(), ActionListener.wrap(connectionReference -> { + boolean retainConnection = false; + try { + final JoinHelper.JoinCallback wrappedJoinCallback = new JoinHelper.JoinCallback() { + @Override + public void onSuccess() { + Releasables.close(connectionReference); + joinCallback.onSuccess(); + } + + @Override + public void onFailure(Exception e) { + Releasables.close(connectionReference); + joinCallback.onFailure(e); + } + }; + + final ClusterState stateForJoinValidation = getStateForMasterService(); + + if (stateForJoinValidation.nodes().isLocalNodeElectedMaster()) { + onJoinValidators.forEach(a -> a.accept(joinRequest.getSourceNode(), stateForJoinValidation)); + if (stateForJoinValidation.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) { + // we do this in a couple of places including the cluster update thread. This one here is really just best effort + // to ensure we fail as fast as possible. + JoinTaskExecutor.ensureVersionBarrier( + joinRequest.getSourceNode().getVersion(), + stateForJoinValidation.getNodes().getMinNodeVersion()); + } + sendValidateJoinRequest(stateForJoinValidation, joinRequest, wrappedJoinCallback); + } else { + processJoinRequest(joinRequest, wrappedJoinCallback); + } + + retainConnection = true; + } finally { + if (retainConnection == false) { + Releasables.close(connectionReference); } - sendValidateJoinRequest(stateForJoinValidation, joinRequest, joinCallback); - } else { - processJoinRequest(joinRequest, joinCallback); } }, joinCallback::onFailure)); } @@ -1454,6 +1489,7 @@ public void onFailure(Exception e) { @Override public void onSuccess() { + onClusterStateApplied(); clusterStatePublicationEvent.setMasterApplyElapsedMillis( transportService.getThreadPool().rawRelativeTimeInMillis() - completionTimeMillis); synchronized (mutex) { diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index 97a74c6411412..279c65833f20f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -27,15 +27,19 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; import org.elasticsearch.discovery.zen.MembershipAction; import org.elasticsearch.discovery.zen.ZenDiscovery; +import org.elasticsearch.core.Tuple; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.monitor.NodeHealthService; import org.elasticsearch.monitor.StatusInfo; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; +import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; @@ -46,6 +50,7 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -86,6 +91,7 @@ public class JoinHelper { private final Set> pendingOutgoingJoins = Collections.synchronizedSet(new HashSet<>()); private final AtomicReference lastFailedJoinAttempt = new AtomicReference<>(); + private final Map joinConnections = new HashMap<>(); // synchronized on itself private final Supplier joinTaskExecutorGenerator; @@ -230,6 +236,37 @@ public void sendJoinRequest(DiscoveryNode destination, long term, Optional }); } + public void onClusterStateApplied() { + // we applied a cluster state as LEADER or FOLLOWER which means the NodeConnectionsService has taken ownership of any connections to + // nodes in the cluster and therefore we can release the connection(s) that we were using for joining + final List releasables; + synchronized (joinConnections) { + if (joinConnections.isEmpty()) { + return; + } + releasables = new ArrayList<>(joinConnections.values()); + joinConnections.clear(); + } + + logger.debug("releasing [{}] connections on successful cluster state application", releasables.size()); + releasables.forEach(Releasables::close); + } + + private void registerConnection(DiscoveryNode destination, Releasable connectionReference) { + final Releasable previousConnection; + synchronized (joinConnections) { + previousConnection = joinConnections.put(destination, connectionReference); + } + Releasables.close(previousConnection); + } + + private void unregisterAndReleaseConnection(DiscoveryNode destination, Releasable connectionReference) { + synchronized (joinConnections) { + joinConnections.remove(destination, connectionReference); + } + Releasables.close(connectionReference); + } + // package-private for testing static class FailedJoinAttempt { private final DiscoveryNode destination; @@ -289,38 +326,66 @@ public void sendJoinRequest(DiscoveryNode destination, long term, Optional final Tuple dedupKey = Tuple.tuple(destination, joinRequest); if (pendingOutgoingJoins.add(dedupKey)) { logger.debug("attempting to join {} with {}", destination, joinRequest); - final String actionName; - final TransportRequest transportRequest; - final TransportRequestOptions transportRequestOptions; - if (Coordinator.isZen1Node(destination)) { - actionName = MembershipAction.DISCOVERY_JOIN_ACTION_NAME; - transportRequest = new MembershipAction.JoinRequest(transportService.getLocalNode()); - transportRequestOptions = TransportRequestOptions.timeout(joinTimeout); - } else { - actionName = JOIN_ACTION_NAME; - transportRequest = joinRequest; - transportRequestOptions = TransportRequestOptions.EMPTY; - } - transportService.sendRequest(destination, actionName, transportRequest, transportRequestOptions, - new TransportResponseHandler.Empty() { - @Override - public void handleResponse(TransportResponse.Empty response) { - pendingOutgoingJoins.remove(dedupKey); - logger.debug("successfully joined {} with {}", destination, joinRequest); - lastFailedJoinAttempt.set(null); - onCompletion.run(); - } - @Override - public void handleException(TransportException exp) { - pendingOutgoingJoins.remove(dedupKey); - logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp); - FailedJoinAttempt attempt = new FailedJoinAttempt(destination, joinRequest, exp); - attempt.logNow(); - lastFailedJoinAttempt.set(attempt); - onCompletion.run(); + // Typically we're already connected to the destination at this point, the PeerFinder holds a reference to this connection to + // keep it open, but we need to acquire our own reference to keep the connection alive through the joining process. + transportService.connectToNode(destination, new ActionListener() { + @Override + public void onResponse(Releasable connectionReference) { + logger.trace("acquired connection for joining join {} with {}", destination, joinRequest); + + // Register the connection in joinConnections so it can be released once we successfully apply the cluster state, at + // which point the NodeConnectionsService will have taken ownership of it. + registerConnection(destination, connectionReference); + + final String actionName; + final TransportRequest transportRequest; + final TransportRequestOptions transportRequestOptions; + if (Coordinator.isZen1Node(destination)) { + actionName = MembershipAction.DISCOVERY_JOIN_ACTION_NAME; + transportRequest = new MembershipAction.JoinRequest(transportService.getLocalNode()); + transportRequestOptions = TransportRequestOptions.timeout(joinTimeout); + } else { + actionName = JOIN_ACTION_NAME; + transportRequest = joinRequest; + transportRequestOptions = TransportRequestOptions.EMPTY; } - }); + transportService.sendRequest(destination, actionName, transportRequest, transportRequestOptions, + new TransportResponseHandler.Empty() { + @Override + public void handleResponse(TransportResponse.Empty response) { + pendingOutgoingJoins.remove(dedupKey); + logger.debug("successfully joined {} with {}", destination, joinRequest); + lastFailedJoinAttempt.set(null); + onCompletion.run(); + } + + @Override + public void handleException(TransportException exp) { + pendingOutgoingJoins.remove(dedupKey); + logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp); + FailedJoinAttempt attempt = new FailedJoinAttempt(destination, joinRequest, exp); + attempt.logNow(); + lastFailedJoinAttempt.set(attempt); + unregisterAndReleaseConnection(destination, connectionReference); + onCompletion.run(); + } + }); + } + + @Override + public void onFailure(Exception e) { + pendingOutgoingJoins.remove(dedupKey); + FailedJoinAttempt attempt = new FailedJoinAttempt( + destination, + joinRequest, + new ConnectTransportException(destination, "failed to acquire connection", e)); + attempt.logNow(); + lastFailedJoinAttempt.set(attempt); + onCompletion.run(); + } + }); + } else { logger.debug("already attempting to join {} with request {}, not sending request", destination, joinRequest); } diff --git a/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java index 905e9bd5d7c1f..ba90bdd617f10 100644 --- a/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java @@ -517,6 +517,12 @@ public void appendDescriptionWithoutAttributes(StringBuilder stringBuilder) { } } + public String descriptionWithoutAttributes() { + final StringBuilder stringBuilder = new StringBuilder(); + appendDescriptionWithoutAttributes(stringBuilder); + return stringBuilder.toString(); + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(getId()); diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java index d9c8e64fe2698..9b69213691263 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java @@ -498,7 +498,7 @@ private void applyChanges(ClusterState previousClusterState, ClusterState newClu protected void connectToNodesAndWait(ClusterState newClusterState) { // can't wait for an ActionFuture on the cluster applier thread, but we do want to block the thread here, so use a CountDownLatch. final CountDownLatch countDownLatch = new CountDownLatch(1); - nodeConnectionsService.connectToNodes(newClusterState.nodes(), countDownLatch::countDown); + connectToNodesAsync(newClusterState, countDownLatch::countDown); try { countDownLatch.await(); } catch (InterruptedException e) { @@ -507,6 +507,10 @@ protected void connectToNodesAndWait(ClusterState newClusterState) { } } + protected final void connectToNodesAsync(ClusterState newClusterState, Runnable onCompletion) { + nodeConnectionsService.connectToNodes(newClusterState.nodes(), onCompletion); + } + private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) { callClusterStateAppliers(clusterChangedEvent, stopWatch, highPriorityStateAppliers); callClusterStateAppliers(clusterChangedEvent, stopWatch, normalPriorityStateAppliers); diff --git a/server/src/main/java/org/elasticsearch/common/bytes/ReleasableBytesReference.java b/server/src/main/java/org/elasticsearch/common/bytes/ReleasableBytesReference.java index b638fd2978efd..6dacaf9da94e5 100644 --- a/server/src/main/java/org/elasticsearch/common/bytes/ReleasableBytesReference.java +++ b/server/src/main/java/org/elasticsearch/common/bytes/ReleasableBytesReference.java @@ -72,6 +72,11 @@ public boolean decRef() { return refCounted.decRef(); } + @Override + public boolean hasReferences() { + return refCounted.hasReferences(); + } + public ReleasableBytesReference retain() { refCounted.incRef(); return this; diff --git a/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java b/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java index 52134140aba3b..bb50b0cd7e60e 100644 --- a/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java +++ b/server/src/main/java/org/elasticsearch/discovery/HandshakingTransportAddressConnector.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.NotifyOnceListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Randomness; @@ -20,6 +21,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.core.Releasable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.core.internal.io.IOUtils; @@ -55,8 +57,8 @@ public HandshakingTransportAddressConnector(Settings settings, TransportService } @Override - public void connectToRemoteMasterNode(TransportAddress transportAddress, ActionListener listener) { - transportService.getThreadPool().generic().execute(new AbstractRunnable() { + public void connectToRemoteMasterNode(TransportAddress transportAddress, ActionListener listener) { + transportService.getThreadPool().generic().execute(new ActionRunnable(listener) { private final AbstractRunnable thisConnectionAttempt = this; @Override @@ -91,12 +93,11 @@ protected void innerOnResponse(DiscoveryNode remoteNode) { } else if (remoteNode.isMasterNode() == false) { listener.onFailure(new ConnectTransportException(remoteNode, "non-master-eligible node found")); } else { - transportService.connectToNode(remoteNode, new ActionListener() { + transportService.connectToNode(remoteNode, new ActionListener() { @Override - public void onResponse(Void ignored) { - logger.trace("[{}] completed full connection with [{}]", - thisConnectionAttempt, remoteNode); - listener.onResponse(remoteNode); + public void onResponse(Releasable connectionReleasable) { + logger.trace("[{}] completed full connection with [{}]", thisConnectionAttempt, remoteNode); + listener.onResponse(new ProbeConnectionResult(remoteNode, connectionReleasable)); } @Override diff --git a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java index 096f45bb9cfba..1fb5afd65d467 100644 --- a/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java +++ b/server/src/main/java/org/elasticsearch/discovery/PeerFinder.java @@ -24,6 +24,8 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.discovery.zen.UnicastZenPing; @@ -132,9 +134,11 @@ public void activate(final DiscoveryNodes lastAcceptedNodes) { public void deactivate(DiscoveryNode leader) { final boolean peersRemoved; + final List connectionReferences; synchronized (mutex) { logger.trace("deactivating and setting leader to {}", leader); active = false; + connectionReferences = peersByAddress.values().stream().map(Peer::getConnectionReference).collect(Collectors.toList()); peersRemoved = handleWakeUp(); this.leader = Optional.of(leader); assert assertInactiveWithNoKnownPeers(); @@ -142,6 +146,24 @@ public void deactivate(DiscoveryNode leader) { if (peersRemoved) { onFoundPeersUpdated(); } + + // Discovery is over, we're joining a cluster, so we can release all the connections that were being used for discovery. We haven't + // finished joining/forming the cluster yet, but if we're joining an existing master then the join will hold open the connection + // it's using and if we're becoming the master then join validation will hold open the connections to the joining peers; this set of + // peers is a quorum so that's good enough. + // + // Note however that this might still close connections to other master-eligible nodes that we discovered but which aren't currently + // involved in joining: either they're not the master we're joining or else we're becoming the master but they didn't try and join + // us yet. It's a pretty safe bet that we'll want to have connections to these nodes in the near future: either they're already in + // the cluster or else they will discover we're the master and join us straight away. In theory we could keep these discovery + // connections open for a while rather than closing them here and then reopening them again, but in practice it's so much simpler to + // forget about them for now. + // + // Note also that the NodeConnectionsService is still maintaining connections to the nodes in the last-applied cluster state, so + // this will only close connections to nodes that we didn't know about beforehand. In most cases that's because we only just started + // and haven't applied any cluster states at all yet. This won't cause any connection disruption during a typical master failover. + assert peersRemoved || connectionReferences.isEmpty() : connectionReferences; + Releasables.close(connectionReferences); } // exposed to subclasses for testing @@ -228,12 +250,6 @@ private List getFoundPeersUnderLock() { .map(Peer::getDiscoveryNode).filter(Objects::nonNull).distinct().collect(Collectors.toList()); } - private Peer createConnectingPeer(TransportAddress transportAddress) { - Peer peer = new Peer(transportAddress); - peer.establishConnection(); - return peer; - } - /** * @return whether any peers were removed due to disconnection */ @@ -303,12 +319,16 @@ protected void startProbe(TransportAddress transportAddress) { return; } - peersByAddress.computeIfAbsent(transportAddress, this::createConnectingPeer); + if (peersByAddress.containsKey(transportAddress) == false) { + final Peer peer = new Peer(transportAddress); + peersByAddress.put(transportAddress, peer); + peer.establishConnection(); + } } private class Peer { private final TransportAddress transportAddress; - private final SetOnce discoveryNode = new SetOnce<>(); + private final SetOnce probeConnectionResult = new SetOnce<>(); private volatile boolean peersRequestInFlight; Peer(TransportAddress transportAddress) { @@ -317,13 +337,17 @@ private class Peer { @Nullable DiscoveryNode getDiscoveryNode() { - return discoveryNode.get(); + return Optional.ofNullable(probeConnectionResult.get()).map(ProbeConnectionResult::getDiscoveryNode).orElse(null); + } + + private boolean isActive() { + return active && peersByAddress.get(transportAddress) == this; } boolean handleWakeUp() { assert holdsLock() : "PeerFinder mutex not held"; - if (active == false) { + if (isActive() == false) { return true; } @@ -347,29 +371,41 @@ boolean handleWakeUp() { void establishConnection() { assert holdsLock() : "PeerFinder mutex not held"; assert getDiscoveryNode() == null : "unexpectedly connected to " + getDiscoveryNode(); - assert active; + assert isActive(); final boolean verboseFailureLogging = transportService.getThreadPool().relativeTimeInMillis() - activatedAtMillis > verbosityIncreaseTimeout.millis(); logger.trace("{} attempting connection", this); - transportAddressConnector.connectToRemoteMasterNode(transportAddress, new ActionListener() { + transportAddressConnector.connectToRemoteMasterNode(transportAddress, new ActionListener() { @Override - public void onResponse(DiscoveryNode remoteNode) { + public void onResponse(ProbeConnectionResult connectResult) { + assert holdsLock() == false : "PeerFinder mutex is held in error"; + final DiscoveryNode remoteNode = connectResult.getDiscoveryNode(); assert remoteNode.isMasterNode() : remoteNode + " is not master-eligible"; assert remoteNode.equals(getLocalNode()) == false : remoteNode + " is the local node"; - synchronized (mutex) { - if (active == false) { - return; + boolean retainConnection = false; + try { + synchronized (mutex) { + if (isActive() == false) { + return; + } + + assert probeConnectionResult.get() == null : + "connection result unexpectedly already set to " + probeConnectionResult.get(); + probeConnectionResult.set(connectResult); + + requestPeers(); } - assert discoveryNode.get() == null : "discoveryNode unexpectedly already set to " + discoveryNode.get(); - discoveryNode.set(remoteNode); - requestPeers(); - } + onFoundPeersUpdated(); - assert holdsLock() == false : "PeerFinder mutex is held in error"; - onFoundPeersUpdated(); + retainConnection = true; + } finally { + if (retainConnection == false) { + Releasables.close(connectResult); + } + } } @Override @@ -394,6 +430,8 @@ public void onFailure(Exception e) { logger.debug(new ParameterizedMessage("{} connection failed", Peer.this), e); } synchronized (mutex) { + assert probeConnectionResult.get() == null + : "discoveryNode unexpectedly already set to " + probeConnectionResult.get(); peersByAddress.remove(transportAddress); } } @@ -403,7 +441,7 @@ public void onFailure(Exception e) { private void requestPeers() { assert holdsLock() : "PeerFinder mutex not held"; assert peersRequestInFlight == false : "PeersRequest already in flight"; - assert active; + assert isActive(); final DiscoveryNode discoveryNode = getDiscoveryNode(); assert discoveryNode != null : "cannot request peers without first connecting"; @@ -429,7 +467,7 @@ public PeersResponse read(StreamInput in) throws IOException { public void handleResponse(PeersResponse response) { logger.trace("{} received {}", Peer.this, response); synchronized (mutex) { - if (active == false) { + if (isActive() == false) { return; } @@ -489,9 +527,15 @@ public String executor() { transportResponseHandler); } + @Nullable + Releasable getConnectionReference() { + assert holdsLock() : "PeerFinder mutex not held"; + return probeConnectionResult.get(); + } + @Override public String toString() { - return "address [" + transportAddress + "], node [" + discoveryNode.get() + "], requesting [" + peersRequestInFlight + "]"; + return "address [" + transportAddress + "], node [" + getDiscoveryNode() + "], requesting [" + peersRequestInFlight + "]"; } } diff --git a/server/src/main/java/org/elasticsearch/discovery/ProbeConnectionResult.java b/server/src/main/java/org/elasticsearch/discovery/ProbeConnectionResult.java new file mode 100644 index 0000000000000..cd86300f95c5e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/discovery/ProbeConnectionResult.java @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.discovery; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.core.Releasable; + +/** + * The result of a "probe" connection to a transport address, if it successfully discovered a valid node and established a full connection + * with it. + */ +public class ProbeConnectionResult implements Releasable { + + private final DiscoveryNode discoveryNode; + private final Releasable releasable; + + public ProbeConnectionResult(DiscoveryNode discoveryNode, Releasable releasable) { + this.discoveryNode = discoveryNode; + this.releasable = releasable; + } + + public DiscoveryNode getDiscoveryNode() { + return discoveryNode; + } + + @Override + public void close() { + releasable.close(); + } + + @Override + public String toString() { + return "ProbeConnectionResult{" + discoveryNode + "}"; + } +} diff --git a/server/src/main/java/org/elasticsearch/discovery/TransportAddressConnector.java b/server/src/main/java/org/elasticsearch/discovery/TransportAddressConnector.java index b830ad2c1facf..407567d304b4d 100644 --- a/server/src/main/java/org/elasticsearch/discovery/TransportAddressConnector.java +++ b/server/src/main/java/org/elasticsearch/discovery/TransportAddressConnector.java @@ -9,12 +9,11 @@ package org.elasticsearch.discovery; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.transport.TransportAddress; public interface TransportAddressConnector { /** * Identify the node at the given address and, if it is a master node and not the local node then establish a full connection to it. */ - void connectToRemoteMasterNode(TransportAddress transportAddress, ActionListener listener); + void connectToRemoteMasterNode(TransportAddress transportAddress, ActionListener listener); } diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index decaebc294371..a837b08997451 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -396,6 +396,11 @@ public final boolean decRef() { return refCounter.decRef(); } + @Override + public final boolean hasReferences() { + return refCounter.hasReferences(); + } + @Override public void close() { if (isClosed.compareAndSet(false, true)) { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryFileChunkRequest.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryFileChunkRequest.java index 538f4a44bd330..c70f15acd3b2e 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryFileChunkRequest.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryFileChunkRequest.java @@ -138,4 +138,9 @@ public boolean tryIncRef() { public boolean decRef() { return content.decRef(); } + + @Override + public boolean hasReferences() { + return content.hasReferences(); + } } diff --git a/server/src/main/java/org/elasticsearch/transport/BytesTransportRequest.java b/server/src/main/java/org/elasticsearch/transport/BytesTransportRequest.java index d960ef1dbd22d..1f05adbac0336 100644 --- a/server/src/main/java/org/elasticsearch/transport/BytesTransportRequest.java +++ b/server/src/main/java/org/elasticsearch/transport/BytesTransportRequest.java @@ -74,4 +74,9 @@ public boolean tryIncRef() { public boolean decRef() { return bytes.decRef(); } + + @Override + public boolean hasReferences() { + return bytes.hasReferences(); + } } diff --git a/server/src/main/java/org/elasticsearch/transport/CloseableConnection.java b/server/src/main/java/org/elasticsearch/transport/CloseableConnection.java index bf147441fa9e3..53092b3bf74a3 100644 --- a/server/src/main/java/org/elasticsearch/transport/CloseableConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/CloseableConnection.java @@ -9,21 +9,28 @@ package org.elasticsearch.transport; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.CompletableContext; /** * Abstract Transport.Connection that provides common close logic. */ -public abstract class CloseableConnection implements Transport.Connection { +public abstract class CloseableConnection extends AbstractRefCounted implements Transport.Connection { private final CompletableContext closeContext = new CompletableContext<>(); + private final CompletableContext removeContext = new CompletableContext<>(); @Override public void addCloseListener(ActionListener listener) { closeContext.addListener(ActionListener.toBiConsumer(listener)); } + @Override + public void addRemovedListener(ActionListener listener) { + removeContext.addListener(ActionListener.toBiConsumer(listener)); + } + @Override public boolean isClosed() { return closeContext.isDone(); @@ -35,4 +42,14 @@ public void close() { // protection and only be completed once. The attached listeners will only be notified once. closeContext.complete(null); } + + @Override + public void onRemoved() { + removeContext.complete(null); + } + + @Override + protected void closeInternal() { + close(); + } } diff --git a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java index cb3823adf08fb..51fd29f68e15c 100644 --- a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java @@ -12,10 +12,13 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.util.concurrent.RunOnce; +import org.elasticsearch.core.AbstractRefCounted; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; import org.elasticsearch.core.internal.io.IOUtils; import java.util.Collections; @@ -36,7 +39,8 @@ public class ClusterConnectionManager implements ConnectionManager { private static final Logger logger = LogManager.getLogger(ClusterConnectionManager.class); private final ConcurrentMap connectedNodes = ConcurrentCollections.newConcurrentMap(); - private final ConcurrentMap> pendingConnections = ConcurrentCollections.newConcurrentMap(); + private final ConcurrentMap> pendingConnections + = ConcurrentCollections.newConcurrentMap(); private final AbstractRefCounted connectingRefCounter = AbstractRefCounted.of(this::pendingConnectionsComplete); private final Transport transport; @@ -71,14 +75,38 @@ public void openConnection(DiscoveryNode node, ConnectionProfile connectionProfi } /** - * Connects to a node with the given connection profile. If the node is already connected this method has no effect. - * Once a successful is established, it can be validated before being exposed. - * The ActionListener will be called on the calling thread or the generic thread pool. + * Connects to the given node, or acquires another reference to an existing connection to the given node if a connection already exists. + * + * @param connectionProfile the profile to use if opening a new connection. Only used in tests, this is {@code null} in production. + * @param connectionValidator a callback to validate the connection before it is exposed (e.g. to {@link #nodeConnected}). + * @param listener completed on the calling thread or by the {@link ConnectionValidator}; in production the + * {@link ConnectionValidator} will complete the listener on the generic thread pool (see + * {@link TransportService#connectionValidator}). If successful, completed with a {@link Releasable} which + * will release this connection (and close it if no other references to it are held). */ @Override - public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile, - ConnectionValidator connectionValidator, - ActionListener listener) throws ConnectTransportException { + public void connectToNode( + DiscoveryNode node, + @Nullable ConnectionProfile connectionProfile, + ConnectionValidator connectionValidator, + ActionListener listener + ) throws ConnectTransportException { + connectToNodeOrRetry(node, connectionProfile, connectionValidator, 0, listener); + } + + /** + * Connects to the given node, or acquires another reference to an existing connection to the given node if a connection already exists. + * If a connection already exists but has been completely released (so it's in the process of closing) then this method will wait for + * the close to complete and then try again (up to 10 times). + */ + private void connectToNodeOrRetry( + DiscoveryNode node, + @Nullable ConnectionProfile connectionProfile, + ConnectionValidator connectionValidator, + int previousFailureCount, + ActionListener listener + ) throws ConnectTransportException { + ConnectionProfile resolvedProfile = ConnectionProfile.resolveConnectionProfile(connectionProfile, defaultProfile); if (node == null) { listener.onFailure(new ConnectTransportException(null, "can't connect to a null node")); @@ -90,75 +118,116 @@ public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfil return; } - if (connectedNodes.containsKey(node)) { + final ActionListener acquiringListener = listener.delegateFailure((delegate, connection) -> { + if (connection.tryIncRef()) { + delegate.onResponse(Releasables.releaseOnce(connection::decRef)); + return; + } + + // We found a connection that's registered but already fully released, so it'll be removed soon by its close listener. Bad luck, + // let's wait for it to be removed and then try again. + final int failureCount = previousFailureCount + 1; + if (failureCount < 10) { + logger.trace("concurrent connect/disconnect for [{}] ([{}] failures), will try again", node, failureCount); + connection.addRemovedListener(listener.delegateFailure((retryDelegate, ignored) -> connectToNodeOrRetry( + node, + connectionProfile, + connectionValidator, + failureCount, + retryDelegate))); + } else { + // A run of bad luck this long is probably not bad luck after all: something's broken, just give up. + logger.warn("failed to connect to [{}] after [{}] attempts, giving up", node.descriptionWithoutAttributes(), failureCount); + listener.onFailure(new ConnectTransportException( + node, + "concurrently connecting and disconnecting even after [" + failureCount + "] attempts")); + } + }); + + final Transport.Connection existingConnection = connectedNodes.get(node); + if (existingConnection != null) { connectingRefCounter.decRef(); - listener.onResponse(null); + acquiringListener.onResponse(existingConnection); return; } - final ListenableFuture currentListener = new ListenableFuture<>(); - final ListenableFuture existingListener = pendingConnections.putIfAbsent(node, currentListener); + final ListenableFuture currentListener = new ListenableFuture<>(); + final ListenableFuture existingListener = pendingConnections.putIfAbsent(node, currentListener); if (existingListener != null) { try { // wait on previous entry to complete connection attempt - existingListener.addListener(listener); + existingListener.addListener(acquiringListener); } finally { connectingRefCounter.decRef(); } return; } - currentListener.addListener(listener); + currentListener.addListener(acquiringListener); // It's possible that a connection completed, and the pendingConnections entry was removed, between the calls to // connectedNodes.containsKey and pendingConnections.putIfAbsent above, so we check again to make sure we don't open a redundant // extra connection to the node. We could _just_ check here, but checking up front skips the work to mark the connection as pending. - if (connectedNodes.containsKey(node)) { - ListenableFuture future = pendingConnections.remove(node); + final Transport.Connection existingConnectionRecheck = connectedNodes.get(node); + if (existingConnectionRecheck != null) { + ListenableFuture future = pendingConnections.remove(node); assert future == currentListener : "Listener in pending map is different than the expected listener"; connectingRefCounter.decRef(); - future.onResponse(null); + future.onResponse(existingConnectionRecheck); return; } final RunOnce releaseOnce = new RunOnce(connectingRefCounter::decRef); - internalOpenConnection(node, resolvedProfile, ActionListener.wrap(conn -> { - connectionValidator.validate(conn, resolvedProfile, ActionListener.wrap( + internalOpenConnection(node, resolvedProfile, ActionListener.wrap( + conn -> connectionValidator.validate(conn, resolvedProfile, ActionListener.runAfter(ActionListener.wrap( ignored -> { assert Transports.assertNotTransportThread("connection validator success"); try { if (connectedNodes.putIfAbsent(node, conn) != null) { - assert false : "redundant conection to " + node; - logger.debug("existing connection to node [{}], closing new redundant connection", node); + assert false : "redundant connection to " + node; + logger.warn("existing connection to node [{}], closing new redundant connection", node); IOUtils.closeWhileHandlingException(conn); } else { logger.debug("connected to node [{}]", node); try { connectionListener.onNodeConnected(node, conn); } finally { - final Transport.Connection finalConnection = conn; conn.addCloseListener(ActionListener.wrap(() -> { - logger.trace("unregistering {} after connection close and marking as disconnected", node); - connectedNodes.remove(node, finalConnection); + connectedNodes.remove(node, conn); connectionListener.onNodeDisconnected(node, conn); + conn.onRemoved(); + })); + + conn.addCloseListener(ActionListener.wrap(() -> { + if (connectingRefCounter.hasReferences() == false) { + logger.trace("connection manager shut down, closing transport connection to [{}]", node); + } else if (conn.hasReferences()) { + logger.info("transport connection to [{}] closed by remote", node.descriptionWithoutAttributes()); + // In production code we only close connections via ref-counting, so this message confirms that a + // 'node-left ... reason: disconnected' event was caused by external factors. Put differently, if a + // node leaves the cluster with "reason: disconnected" but without this message being logged then + // that's a bug. + } else { + logger.debug("closing unused transport connection to [{}]", node); + } })); } } } finally { - ListenableFuture future = pendingConnections.remove(node); + ListenableFuture future = pendingConnections.remove(node); assert future == currentListener : "Listener in pending map is different than the expected listener"; releaseOnce.run(); - future.onResponse(null); + future.onResponse(conn); } }, e -> { assert Transports.assertNotTransportThread("connection validator failure"); IOUtils.closeWhileHandlingException(conn); - failConnectionListeners(node, releaseOnce, e, currentListener); - })); - }, e -> { - assert Transports.assertNotTransportThread("internalOpenConnection failure"); - failConnectionListeners(node, releaseOnce, e, currentListener); - })); + failConnectionListener(node, releaseOnce, e, currentListener); + }), conn::decRef)), + e -> { + assert Transports.assertNotTransportThread("internalOpenConnection failure"); + failConnectionListener(node, releaseOnce, e, currentListener); + })); } /** @@ -265,8 +334,13 @@ private void internalOpenConnection(DiscoveryNode node, ConnectionProfile connec })); } - private void failConnectionListeners(DiscoveryNode node, RunOnce releaseOnce, Exception e, ListenableFuture expectedListener) { - ListenableFuture future = pendingConnections.remove(node); + private void failConnectionListener( + DiscoveryNode node, + RunOnce releaseOnce, + Exception e, + ListenableFuture expectedListener + ) { + ListenableFuture future = pendingConnections.remove(node); releaseOnce.run(); if (future != null) { assert future == expectedListener : "Listener in pending map is different than the expected listener"; diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java index a17f2e17e5681..a1235d93f4e98 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionManager.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.core.Releasable; import java.io.Closeable; import java.util.Set; @@ -25,7 +26,7 @@ public interface ConnectionManager extends Closeable { void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile, ConnectionValidator connectionValidator, - ActionListener listener) throws ConnectTransportException; + ActionListener listener) throws ConnectTransportException; Transport.Connection getConnection(DiscoveryNode node); diff --git a/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java index 76f1381904416..b6cdd8778269c 100644 --- a/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/ProxyConnectionStrategy.java @@ -209,11 +209,17 @@ public void onFailure(Exception e) { DiscoveryNode node = new DiscoveryNode(id, resolved, attributes, DiscoveryNodeRole.BUILT_IN_ROLES, Version.CURRENT.minimumCompatibilityVersion()); - connectionManager.connectToNode(node, null, clusterNameValidator, compositeListener.delegateResponse((l, e) -> { - logger.debug(new ParameterizedMessage("failed to open remote connection [remote cluster: {}, address: {}]", - clusterAlias, resolved), e); - l.onFailure(e); - })); + connectionManager.connectToRemoteClusterNode( + node, + clusterNameValidator, + compositeListener.delegateResponse((l, e) -> { + logger.debug(new ParameterizedMessage( + "failed to open remote connection [remote cluster: {}, address: {}]", + clusterAlias, + resolved), + e); + l.onFailure(e); + })); } } else { int openConnections = connectionManager.size(); diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java index 49b8a27a16a55..67907e70a11ad 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteConnectionManager.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.core.Releasable; import java.io.IOException; import java.util.ArrayList; @@ -42,11 +43,33 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti }); } + /** + * Remote cluster connections have a different lifecycle from intra-cluster connections. Use {@link #connectToRemoteClusterNode} + * instead of this method. + */ @Override - public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile, - ConnectionManager.ConnectionValidator connectionValidator, - ActionListener listener) throws ConnectTransportException { - delegate.connectToNode(node, connectionProfile, connectionValidator, listener); + public final void connectToNode( + DiscoveryNode node, + ConnectionProfile connectionProfile, + ConnectionValidator connectionValidator, + ActionListener listener + ) throws ConnectTransportException { + // it's a mistake to call this expecting a useful Releasable back, we never release remote cluster connections today. + assert false : "use connectToRemoteClusterNode instead"; + listener.onFailure(new UnsupportedOperationException("use connectToRemoteClusterNode instead")); + } + + public void connectToRemoteClusterNode( + DiscoveryNode node, + ConnectionValidator connectionValidator, + ActionListener listener + ) throws ConnectTransportException { + delegate.connectToNode(node, null, connectionValidator, listener.map(connectionReleasable -> { + // We drop the connectionReleasable here but it's not really a leak: we never close individual connections to a remote cluster + // ourselves - instead we close the whole connection manager if the remote cluster is removed, which bypasses any refcounting + // and just closes the underlying channels. + return null; + })); } @Override @@ -170,7 +193,7 @@ public void sendRequest(long requestId, String action, TransportRequest request, @Override public void close() { - assert false: "proxy connections must not be closed"; + assert false : "proxy connections must not be closed"; } @Override @@ -178,6 +201,11 @@ public void addCloseListener(ActionListener listener) { connection.addCloseListener(listener); } + @Override + public void addRemovedListener(ActionListener listener) { + connection.addRemovedListener(listener); + } + @Override public boolean isClosed() { return connection.isClosed(); @@ -196,5 +224,29 @@ public Object getCacheKey() { Transport.Connection getConnection() { return connection; } + + @Override + public void incRef() { + } + + @Override + public boolean tryIncRef() { + return true; + } + + @Override + public boolean decRef() { + assert false : "proxy connections must not be released"; + return false; + } + + @Override + public boolean hasReferences() { + return true; + } + + @Override + public void onRemoved() { + } } } diff --git a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java index dc8872b96fe0e..3d8b371530327 100644 --- a/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java +++ b/server/src/main/java/org/elasticsearch/transport/SniffConnectionStrategy.java @@ -314,8 +314,10 @@ private void collectRemoteNodes(Iterator> seedNodesSuppl logger.trace("[{}] opening managed connection to seed node: [{}] proxy address: [{}]", clusterAlias, handshakeNode, proxyAddress); final DiscoveryNode handshakeNodeWithProxy = maybeAddProxyAddress(proxyAddress, handshakeNode); - connectionManager.connectToNode(handshakeNodeWithProxy, null, - transportService.connectionValidator(handshakeNodeWithProxy), fullConnectionStep); + connectionManager.connectToRemoteClusterNode( + handshakeNodeWithProxy, + transportService.connectionValidator(handshakeNodeWithProxy), + fullConnectionStep); } else { fullConnectionStep.onResponse(null); } @@ -396,7 +398,7 @@ private void handleNodes(Iterator nodesIter) { if (nodePredicate.test(node) && shouldOpenMoreConnections()) { logger.trace("[{}] opening managed connection to node: [{}] proxy address: [{}]", clusterAlias, node, proxyAddress); final DiscoveryNode nodeWithProxy = maybeAddProxyAddress(proxyAddress, node); - connectionManager.connectToNode(nodeWithProxy, null, + connectionManager.connectToRemoteClusterNode(nodeWithProxy, transportService.connectionValidator(node), new ActionListener() { @Override public void onResponse(Void aVoid) { diff --git a/server/src/main/java/org/elasticsearch/transport/Transport.java b/server/src/main/java/org/elasticsearch/transport/Transport.java index 7297512e2acf4..b9bb5cb1ee404 100644 --- a/server/src/main/java/org/elasticsearch/transport/Transport.java +++ b/server/src/main/java/org/elasticsearch/transport/Transport.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; @@ -82,7 +83,7 @@ default boolean isSecure() { /** * A unidirectional connection to a {@link DiscoveryNode} */ - interface Connection extends Closeable { + interface Connection extends Closeable, RefCounted { /** * The node this connection is associated with */ @@ -127,6 +128,17 @@ default Object getCacheKey() { @Override void close(); + + /** + * Called after this connection is removed from the transport service. + */ + void onRemoved(); + + /** + * Similar to {@link #addCloseListener} except that these listeners are notified once the connection is removed from the transport + * service. + */ + void addRemovedListener(ActionListener listener); } /** diff --git a/server/src/main/java/org/elasticsearch/transport/TransportMessage.java b/server/src/main/java/org/elasticsearch/transport/TransportMessage.java index a3313e3a3d46d..ad3e37f88447f 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportMessage.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportMessage.java @@ -43,6 +43,7 @@ public void incRef() { @Override public boolean tryIncRef() { + // noop, override to manage the life-cycle of resources held by a transport message return true; } @@ -51,4 +52,10 @@ public boolean decRef() { // noop, override to manage the life-cycle of resources held by a transport message return false; } + + @Override + public boolean hasReferences() { + // noop, override to manage the life-cycle of resources held by a transport message + return true; + } } diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 0fb1e49c1408d..566aa74955b11 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -142,6 +142,10 @@ public void sendRequest(long requestId, String action, TransportRequest request, public void addCloseListener(ActionListener listener) { } + @Override + public void addRemovedListener(ActionListener listener) { + } + @Override public boolean isClosed() { return false; @@ -149,6 +153,31 @@ public boolean isClosed() { @Override public void close() { + assert false : "should not close the local node connection"; + } + + @Override + public void incRef() { + } + + @Override + public boolean tryIncRef() { + return true; + } + + @Override + public boolean decRef() { + return false; + } + + @Override + public boolean hasReferences() { + return true; + } + + @Override + public void onRemoved() { + assert false : "should not remove the local node connection"; } @Override @@ -367,7 +396,7 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection * @param node the node to connect to * @param listener the action listener to notify */ - public void connectToNode(DiscoveryNode node, ActionListener listener) throws ConnectTransportException { + public void connectToNode(DiscoveryNode node, ActionListener listener) throws ConnectTransportException { connectToNode(node, null, listener); } @@ -379,7 +408,7 @@ public void connectToNode(DiscoveryNode node, ActionListener listener) thr * @param connectionProfile the connection profile to use when connecting to this node * @param listener the action listener to notify */ - public void connectToNode(final DiscoveryNode node, ConnectionProfile connectionProfile, ActionListener listener) { + public void connectToNode(final DiscoveryNode node, ConnectionProfile connectionProfile, ActionListener listener) { if (isLocalNode(node)) { listener.onResponse(null); return; diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index 6e59a5f67892b..23318b262baf8 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -710,7 +710,10 @@ public void sendRequest(long requestId, String action, TransportRequest request, @Override public void addCloseListener(ActionListener listener) { + } + @Override + public void addRemovedListener(ActionListener listener) { } @Override @@ -722,5 +725,30 @@ public boolean isClosed() { public void close() { throw new UnsupportedOperationException(); } + + @Override + public void onRemoved() { + throw new UnsupportedOperationException(); + } + + @Override + public void incRef() { + } + + @Override + public boolean tryIncRef() { + return true; + } + + @Override + public boolean decRef() { + assert false : "shouldn't release a mock connection"; + return false; + } + + @Override + public boolean hasReferences() { + return true; + } } } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 1ac4bb114bb3d..bfb0e0865c906 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -316,6 +316,10 @@ public void sendRequest(long requestId, String action, TransportRequest request, public void addCloseListener(ActionListener listener) { } + @Override + public void addRemovedListener(ActionListener listener) { + } + @Override public boolean isClosed() { return false; @@ -324,6 +328,31 @@ public boolean isClosed() { @Override public void close() { } + + @Override + public void incRef() { + } + + @Override + public boolean tryIncRef() { + return true; + } + + @Override + public boolean decRef() { + assert false : "shouldn't release a mock connection"; + return false; + } + + @Override + public boolean hasReferences() { + return true; + } + + @Override + public void onRemoved() { + assert false : "shouldn't remove a mock connection"; + } }; { diff --git a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java index ea478abe584fa..14cd7f0f2be15 100644 --- a/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java @@ -11,9 +11,9 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.elasticsearch.Build; -import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ListenableActionFuture; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; @@ -26,7 +26,9 @@ import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; +import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.CheckedRunnable; +import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.MockLogAppender; @@ -35,7 +37,9 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectionProfile; +import org.elasticsearch.transport.NodeNotConnectedException; import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportMessageListener; import org.elasticsearch.transport.TransportRequest; @@ -54,12 +58,12 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import static java.util.Collections.emptySet; import static org.elasticsearch.cluster.NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; -import static org.elasticsearch.core.TimeValue.timeValueMillis; import static org.hamcrest.Matchers.equalTo; public class NodeConnectionsServiceTests extends ESTestCase { @@ -86,12 +90,12 @@ private DiscoveryNodes discoveryNodesFromList(List discoveryNodes return builder.build(); } - public void testConnectAndDisconnect() throws Exception { + public void testEventuallyConnectsOnlyToAppliedNodes() throws Exception { final NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService); - final AtomicBoolean stopReconnecting = new AtomicBoolean(); + final AtomicBoolean keepGoing = new AtomicBoolean(true); final Thread reconnectionThread = new Thread(() -> { - while (stopReconnecting.get() == false) { + while (keepGoing.get()) { final PlainActionFuture future = new PlainActionFuture<>(); service.ensureConnections(() -> future.onResponse(null)); future.actionGet(); @@ -99,53 +103,52 @@ public void testConnectAndDisconnect() throws Exception { }, "reconnection thread"); reconnectionThread.start(); - try { - - final List allNodes = generateNodes(); - for (int iteration = 0; iteration < 3; iteration++) { + final List allNodes = generateNodes(); - final boolean isDisrupting = randomBoolean(); - if (isDisrupting == false) { - // if the previous iteration was a disrupting one then there could still be some pending disconnections which would - // prevent us from asserting that all nodes are connected in this iteration without this call. - ensureConnections(service); + final boolean isDisrupting = randomBoolean(); + final Thread disruptionThread = new Thread(() -> { + while (isDisrupting && keepGoing.get()) { + final Transport.Connection connection; + try { + connection = transportService.getConnection(randomFrom(allNodes)); + } catch (NodeNotConnectedException e) { + continue; } - final AtomicBoolean stopDisrupting = new AtomicBoolean(); - final Thread disruptionThread = new Thread(() -> { - while (isDisrupting && stopDisrupting.get() == false) { - transportService.disconnectFromNode(randomFrom(allNodes)); - } - }, "disruption thread " + iteration); - disruptionThread.start(); - - final DiscoveryNodes nodes = discoveryNodesFromList(randomSubsetOf(allNodes)); + final PlainActionFuture future = new PlainActionFuture<>(); - service.connectToNodes(nodes, () -> future.onResponse(null)); - future.actionGet(); - if (isDisrupting == false) { - assertConnected(transportService, nodes); - } - service.disconnectFromNodesExcept(nodes); + connection.addRemovedListener(future); + connection.close(); + future.actionGet(10, TimeUnit.SECONDS); + } + }, "disruption thread"); + disruptionThread.start(); - assertTrue(stopDisrupting.compareAndSet(false, true)); - disruptionThread.join(); + for (int i = 0; i < 10; i++) { + final DiscoveryNodes connectNodes = discoveryNodesFromList(randomSubsetOf(allNodes)); + final PlainActionFuture future = new PlainActionFuture<>(); + service.connectToNodes(connectNodes, () -> future.onResponse(null)); + future.actionGet(10, TimeUnit.SECONDS); + final DiscoveryNodes disconnectExceptNodes = discoveryNodesFromList(randomSubsetOf(allNodes)); + service.disconnectFromNodesExcept(disconnectExceptNodes); + } - if (randomBoolean()) { - // sometimes do not wait for the disconnections to complete before starting the next connections - if (usually()) { - ensureConnections(service); - assertConnectedExactlyToNodes(nodes); - } else { - assertBusy(() -> assertConnectedExactlyToNodes(nodes)); - } - } - } - } finally { - assertTrue(stopReconnecting.compareAndSet(false, true)); - reconnectionThread.join(); + final DiscoveryNodes nodes = discoveryNodesFromList(randomSubsetOf(allNodes)); + final PlainActionFuture connectFuture = new PlainActionFuture<>(); + service.connectToNodes(nodes, () -> connectFuture.onResponse(null)); + connectFuture.actionGet(10, TimeUnit.SECONDS); + service.disconnectFromNodesExcept(nodes); + + assertTrue(keepGoing.compareAndSet(true, false)); + reconnectionThread.join(); + disruptionThread.join(); + + if (isDisrupting) { + final PlainActionFuture ensureFuture = new PlainActionFuture<>(); + service.ensureConnections(() -> ensureFuture.onResponse(null)); + ensureFuture.actionGet(10, TimeUnit.SECONDS); } - ensureConnections(service); + assertBusy(() -> assertConnectedExactlyToNodes(nodes)); } public void testPeriodicReconnection() { @@ -213,12 +216,23 @@ public String toString() { public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception { final NodeConnectionsService service = new NodeConnectionsService(Settings.EMPTY, threadPool, transportService); + final AtomicReference> disconnectListenerRef = new AtomicReference<>(); + transportService.addConnectionListener(new TransportConnectionListener() { + @Override + public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connection) { + final ActionListener disconnectListener = disconnectListenerRef.getAndSet(null); + if (disconnectListener != null) { + disconnectListener.onResponse(node); + } + } + }); + // connect to one node final DiscoveryNode node0 = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT); final DiscoveryNodes nodes0 = DiscoveryNodes.builder().add(node0).build(); final PlainActionFuture future0 = new PlainActionFuture<>(); service.connectToNodes(nodes0, () -> future0.onResponse(null)); - future0.actionGet(); + future0.actionGet(10, TimeUnit.SECONDS); assertConnectedExactlyToNodes(nodes0); // connection attempts to node0 block indefinitely @@ -233,59 +247,55 @@ public void testOnlyBlocksOnConnectionsToNewNodes() throws Exception { final DiscoveryNodes nodes01 = DiscoveryNodes.builder(nodes0).add(node1).build(); final PlainActionFuture future1 = new PlainActionFuture<>(); service.connectToNodes(nodes01, () -> future1.onResponse(null)); - future1.actionGet(); + future1.actionGet(10, TimeUnit.SECONDS); assertConnectedExactlyToNodes(nodes1); // can also disconnect from node0 without blocking final PlainActionFuture future2 = new PlainActionFuture<>(); service.connectToNodes(nodes1, () -> future2.onResponse(null)); - future2.actionGet(); + future2.actionGet(10, TimeUnit.SECONDS); service.disconnectFromNodesExcept(nodes1); assertConnectedExactlyToNodes(nodes1); // however, now node0 is considered to be a new node so we will block on a subsequent attempt to connect to it final PlainActionFuture future3 = new PlainActionFuture<>(); service.connectToNodes(nodes01, () -> future3.onResponse(null)); - expectThrows(ElasticsearchTimeoutException.class, () -> future3.actionGet(timeValueMillis(scaledRandomIntBetween(1, 1000)))); + assertFalse(future3.isDone()); // once the connection is unblocked we successfully connect to it. connectionBarrier.await(10, TimeUnit.SECONDS); - nodeConnectionBlocks.clear(); - future3.actionGet(); + future3.actionGet(10, TimeUnit.SECONDS); assertConnectedExactlyToNodes(nodes01); - // if we disconnect from a node while blocked trying to connect to it then we do eventually disconnect from it - nodeConnectionBlocks.put(node0, connectionBarrier::await); + // the reconnection is also blocked but the connection future doesn't wait, it completes straight away transportService.disconnectFromNode(node0); final PlainActionFuture future4 = new PlainActionFuture<>(); service.connectToNodes(nodes01, () -> future4.onResponse(null)); - future4.actionGet(); + future4.actionGet(10, TimeUnit.SECONDS); assertConnectedExactlyToNodes(nodes1); + // a blocked reconnection attempt doesn't also block the node from being deregistered service.disconnectFromNodesExcept(nodes1); + final PlainActionFuture disconnectFuture1 = new PlainActionFuture<>(); + assertTrue(disconnectListenerRef.compareAndSet(null, disconnectFuture1)); connectionBarrier.await(); - if (randomBoolean()) { - // assertBusy because the connection completes before disconnecting, so we might briefly observe a connection to node0 - assertBusy(() -> assertConnectedExactlyToNodes(nodes1)); - } - - // use ensureConnections() to wait until the service is idle - ensureConnections(service); + assertThat(disconnectFuture1.actionGet(10, TimeUnit.SECONDS), equalTo(node0)); // node0 connects briefly, must wait here assertConnectedExactlyToNodes(nodes1); - // if we disconnect from a node while blocked trying to connect to it then the listener is notified - final PlainActionFuture future6 = new PlainActionFuture<>(); - service.connectToNodes(nodes01, () -> future6.onResponse(null)); - expectThrows(ElasticsearchTimeoutException.class, () -> future6.actionGet(timeValueMillis(scaledRandomIntBetween(1, 1000)))); + // a blocked connection attempt to a new node also doesn't prevent an immediate deregistration + final PlainActionFuture future5 = new PlainActionFuture<>(); + service.connectToNodes(nodes01, () -> future5.onResponse(null)); + assertFalse(future5.isDone()); service.disconnectFromNodesExcept(nodes1); - future6.actionGet(); // completed even though the connection attempt is still blocked assertConnectedExactlyToNodes(nodes1); + final PlainActionFuture disconnectFuture2 = new PlainActionFuture<>(); + assertTrue(disconnectListenerRef.compareAndSet(null, disconnectFuture2)); connectionBarrier.await(10, TimeUnit.SECONDS); - nodeConnectionBlocks.clear(); - ensureConnections(service); + assertThat(disconnectFuture2.actionGet(10, TimeUnit.SECONDS), equalTo(node0)); // node0 connects briefly, must wait here assertConnectedExactlyToNodes(nodes1); + assertTrue(future5.isDone()); } finally { nodeConnectionBlocks.clear(); connectionBarrier.reset(); @@ -412,12 +422,6 @@ private void runTasksUntil(DeterministicTaskQueue deterministicTaskQueue, long e deterministicTaskQueue.runAllRunnableTasks(); } - private void ensureConnections(NodeConnectionsService service) { - final PlainActionFuture future = new PlainActionFuture<>(); - service.ensureConnections(() -> future.onResponse(null)); - future.actionGet(); - } - private void assertConnectedExactlyToNodes(DiscoveryNodes discoveryNodes) { assertConnectedExactlyToNodes(transportService, discoveryNodes); } @@ -454,7 +458,7 @@ public void tearDown() throws Exception { super.tearDown(); } - private final class TestTransportService extends TransportService { + private static final class TestTransportService extends TransportService { private TestTransportService(Transport transport, ThreadPool threadPool) { super(Settings.EMPTY, transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, @@ -473,25 +477,9 @@ public void connectToNode(DiscoveryNode node) throws ConnectTransportException { throw new AssertionError("no blocking connect"); } - @Override - public void connectToNode(DiscoveryNode node, ActionListener listener) throws ConnectTransportException { - final CheckedRunnable connectionBlock = nodeConnectionBlocks.get(node); - if (connectionBlock != null) { - getThreadPool().generic().execute(() -> { - try { - connectionBlock.run(); - super.connectToNode(node, listener); - } catch (Exception e) { - throw new AssertionError(e); - } - }); - } else { - super.connectToNode(node, listener); - } - } } - private static final class MockTransport implements Transport { + private final class MockTransport implements Transport { private final ResponseHandlers responseHandlers = new ResponseHandlers(); private final RequestHandlers requestHandlers = new RequestHandlers(); private volatile boolean randomConnectionExceptions = false; @@ -520,35 +508,90 @@ public TransportAddress[] addressesFromString(String address) { return new TransportAddress[0]; } + private void runConnectionBlock(CheckedRunnable connectionBlock) { + if (connectionBlock == null) { + return; + } + try { + connectionBlock.run(); + } catch (Exception e) { + throw new AssertionError(e); + } + } + @Override public void openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener listener) { + final CheckedRunnable connectionBlock = nodeConnectionBlocks.get(node); if (profile == null && randomConnectionExceptions && randomBoolean()) { - threadPool.generic().execute(() -> listener.onFailure(new ConnectTransportException(node, "simulated"))); + threadPool.generic().execute(() -> { + runConnectionBlock(connectionBlock); + listener.onFailure(new ConnectTransportException(node, "simulated")); + }); } else { - threadPool.generic().execute(() -> listener.onResponse(new Connection() { - @Override - public DiscoveryNode getNode() { - return node; - } - - @Override - public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) - throws TransportException { - } - - @Override - public void addCloseListener(ActionListener listener) { - } - - @Override - public void close() { - } - - @Override - public boolean isClosed() { - return false; - } - })); + threadPool.generic().execute(() -> { + runConnectionBlock(connectionBlock); + listener.onResponse(new Connection() { + private final ListenableActionFuture closeListener = new ListenableActionFuture<>(); + private final ListenableActionFuture removedListener = new ListenableActionFuture<>(); + + private final RefCounted refCounted = AbstractRefCounted.of(() -> closeListener.onResponse(null)); + + @Override + public DiscoveryNode getNode() { + return node; + } + + @Override + public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) + throws TransportException { + } + + @Override + public void addCloseListener(ActionListener listener1) { + closeListener.addListener(listener1); + } + + @Override + public void close() { + closeListener.onResponse(null); + } + + @Override + public boolean isClosed() { + return closeListener.isDone(); + } + + @Override + public void addRemovedListener(ActionListener listener) { + removedListener.addListener(listener); + } + + @Override + public void onRemoved() { + removedListener.onResponse(null); + } + + @Override + public void incRef() { + refCounted.incRef(); + } + + @Override + public boolean tryIncRef() { + return refCounted.tryIncRef(); + } + + @Override + public boolean decRef() { + return refCounted.decRef(); + } + + @Override + public boolean hasReferences() { + return refCounted.hasReferences(); + } + }); + }); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java index 39abaefee0858..cac67d74bd787 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java @@ -8,6 +8,7 @@ package org.elasticsearch.cluster.coordination; import org.apache.logging.log4j.Level; +import org.elasticsearch.Build; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.support.PlainActionFuture; @@ -25,8 +26,10 @@ import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.test.transport.CapturingTransport.CapturedRequest; import org.elasticsearch.test.transport.MockTransport; +import org.elasticsearch.transport.ClusterConnectionManager; import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.TransportException; +import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; @@ -38,6 +41,7 @@ import static org.elasticsearch.monitor.StatusInfo.Status.HEALTHY; import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY; +import static org.elasticsearch.transport.TransportService.HANDSHAKE_ACTION_NAME; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -47,11 +51,18 @@ public class JoinHelperTests extends ESTestCase { public void testJoinDeduplication() { DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(); - CapturingTransport capturingTransport = new CapturingTransport(); + CapturingTransport capturingTransport = new HandshakingCapturingTransport(); DiscoveryNode localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT); - TransportService transportService = capturingTransport.createTransportService(Settings.EMPTY, - deterministicTaskQueue.getThreadPool(), TransportService.NOOP_TRANSPORT_INTERCEPTOR, - x -> localNode, null, Collections.emptySet()); + TransportService transportService = new TransportService( + Settings.EMPTY, + capturingTransport, + deterministicTaskQueue.getThreadPool(), + TransportService.NOOP_TRANSPORT_INTERCEPTOR, + x -> localNode, + null, + Collections.emptySet(), + new ClusterConnectionManager(Settings.EMPTY, capturingTransport) + ); JoinHelper joinHelper = new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> null, (joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); }, Collections.emptyList(), (s, p, r) -> {}, @@ -60,6 +71,7 @@ public void testJoinDeduplication() { DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT); DiscoveryNode node2 = new DiscoveryNode("node2", buildNewFakeTransportAddress(), Version.CURRENT); + final boolean mightSucceed = randomBoolean(); assertFalse(joinHelper.isJoinPending()); @@ -88,11 +100,7 @@ public void testJoinDeduplication() { assertThat(capturingTransport.getCapturedRequestsAndClear().length, equalTo(0)); // complete the previous join to node1 - if (randomBoolean()) { - capturingTransport.handleResponse(capturedRequest1.requestId, TransportResponse.Empty.INSTANCE); - } else { - capturingTransport.handleRemoteError(capturedRequest1.requestId, new CoordinationStateRejectedException("dummy")); - } + completeJoinRequest(capturingTransport, capturedRequest1, mightSucceed); // check that sending another join to node1 now works again joinHelper.sendJoinRequest(node1, 0L, optionalJoin1); @@ -112,10 +120,28 @@ public void testJoinDeduplication() { // complete all the joins and check that isJoinPending is updated assertTrue(joinHelper.isJoinPending()); - capturingTransport.handleRemoteError(capturedRequest2.requestId, new CoordinationStateRejectedException("dummy")); - capturingTransport.handleRemoteError(capturedRequest1a.requestId, new CoordinationStateRejectedException("dummy")); - capturingTransport.handleRemoteError(capturedRequest2a.requestId, new CoordinationStateRejectedException("dummy")); + assertTrue(transportService.nodeConnected(node1)); + assertTrue(transportService.nodeConnected(node2)); + + completeJoinRequest(capturingTransport, capturedRequest2, mightSucceed); + completeJoinRequest(capturingTransport, capturedRequest1a, mightSucceed); + completeJoinRequest(capturingTransport, capturedRequest2a, mightSucceed); assertFalse(joinHelper.isJoinPending()); + + if (mightSucceed) { + // successful requests hold the connections open until the cluster state is applied + joinHelper.onClusterStateApplied(); + } + assertFalse(transportService.nodeConnected(node1)); + assertFalse(transportService.nodeConnected(node2)); + } + + private void completeJoinRequest(CapturingTransport capturingTransport, CapturedRequest request, boolean mightSucceed) { + if (mightSucceed && randomBoolean()) { + capturingTransport.handleResponse(request.requestId, TransportResponse.Empty.INSTANCE); + } else { + capturingTransport.handleRemoteError(request.requestId, new CoordinationStateRejectedException("dummy")); + } } public void testFailedJoinAttemptLogLevel() { @@ -209,7 +235,7 @@ public void testGetClusterUuidMismatchExplanation() { public void testJoinFailureOnUnhealthyNodes() { DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(); - CapturingTransport capturingTransport = new CapturingTransport(); + CapturingTransport capturingTransport = new HandshakingCapturingTransport(); DiscoveryNode localNode = new DiscoveryNode("node0", buildNewFakeTransportAddress(), Version.CURRENT); TransportService transportService = capturingTransport.createTransportService(Settings.EMPTY, deterministicTaskQueue.getThreadPool(), TransportService.NOOP_TRANSPORT_INTERCEPTOR, @@ -255,4 +281,21 @@ public void testJoinFailureOnUnhealthyNodes() { CapturedRequest capturedRequest1a = capturedRequests1a[0]; assertEquals(node1, capturedRequest1a.node); } + + private static class HandshakingCapturingTransport extends CapturingTransport { + + @Override + protected void onSendRequest(long requestId, String action, TransportRequest request, DiscoveryNode node) { + if (action.equals(HANDSHAKE_ACTION_NAME)) { + handleResponse(requestId, new TransportService.HandshakeResponse( + node.getVersion(), + Build.CURRENT.hash(), + node, + ClusterName.DEFAULT + )); + } else { + super.onSendRequest(requestId, action, request, node); + } + } + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/node/DiscoveryNodeTests.java b/server/src/test/java/org/elasticsearch/cluster/node/DiscoveryNodeTests.java index d8ec56cff75ab..4e91cb447e333 100644 --- a/server/src/test/java/org/elasticsearch/cluster/node/DiscoveryNodeTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/node/DiscoveryNodeTests.java @@ -185,6 +185,7 @@ public void testDiscoveryNodeDescriptionWithoutAttributes() { final String descriptionWithoutAttributes = stringBuilder.toString(); assertThat(node.toString(), allOf(startsWith(descriptionWithoutAttributes), containsString("test-attr=val"))); assertThat(descriptionWithoutAttributes, not(containsString("test-attr"))); + assertEquals(descriptionWithoutAttributes, node.descriptionWithoutAttributes()); } public void testDiscoveryNodeToXContent() { diff --git a/server/src/test/java/org/elasticsearch/discovery/HandshakingTransportAddressConnectorTests.java b/server/src/test/java/org/elasticsearch/discovery/HandshakingTransportAddressConnectorTests.java index dc29fa75a7d83..1e6e7eedfc79a 100644 --- a/server/src/test/java/org/elasticsearch/discovery/HandshakingTransportAddressConnectorTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/HandshakingTransportAddressConnectorTests.java @@ -124,10 +124,10 @@ public void testConnectsToMasterNode() throws InterruptedException { remoteClusterName = "local-cluster"; discoveryAddress = getDiscoveryAddress(); - handshakingTransportAddressConnector.connectToRemoteMasterNode(discoveryAddress, new ActionListener() { + handshakingTransportAddressConnector.connectToRemoteMasterNode(discoveryAddress, new ActionListener() { @Override - public void onResponse(DiscoveryNode discoveryNode) { - receivedNode.set(discoveryNode); + public void onResponse(ProbeConnectionResult connectResult) { + receivedNode.set(connectResult.getDiscoveryNode()); completionLatch.countDown(); } @@ -223,12 +223,12 @@ private TransportAddress getDiscoveryAddress() { return randomBoolean() ? remoteNode.getAddress() : buildNewFakeTransportAddress(); } - private static class FailureListener implements ActionListener { + private static class FailureListener implements ActionListener { final CountDownLatch completionLatch = new CountDownLatch(1); @Override - public void onResponse(DiscoveryNode discoveryNode) { - fail(discoveryNode.toString()); + public void onResponse(ProbeConnectionResult connectResult) { + fail(connectResult.getDiscoveryNode().toString()); } @Override diff --git a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java index 2691e9758ba21..d38fdb982daf5 100644 --- a/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/PeerFinderTests.java @@ -99,7 +99,7 @@ void addReachableNode(DiscoveryNode node) { } @Override - public void connectToRemoteMasterNode(TransportAddress transportAddress, ActionListener listener) { + public void connectToRemoteMasterNode(TransportAddress transportAddress, ActionListener listener) { assert localNode.getAddress().equals(transportAddress) == false : "should not probe local node"; final boolean isNotInFlight = inFlightConnectionAttempts.add(transportAddress); @@ -124,7 +124,11 @@ public void run() { disconnectedNodes.remove(discoveryNode); connectedNodes.add(discoveryNode); assertTrue(inFlightConnectionAttempts.remove(transportAddress)); - listener.onResponse(discoveryNode); + listener.onResponse(new ProbeConnectionResult(discoveryNode, () -> { + if (connectedNodes.remove(discoveryNode)) { + disconnectedNodes.add(discoveryNode); + } + })); return; } else { listener.onFailure(new ElasticsearchException("non-master node " + discoveryNode)); @@ -232,6 +236,7 @@ public void setup() { public void deactivateAndRunRemainingTasks() { peerFinder.deactivate(localNode); deterministicTaskQueue.runAllRunnableTasks(); + assertThat(connectedNodes, empty()); } public void testAddsReachableNodesFromUnicastHostsList() { @@ -350,6 +355,7 @@ public void testDeactivationClearsPastKnowledge() { assertFoundPeers(otherNode); peerFinder.deactivate(localNode); + assertThat(connectedNodes, empty()); providedAddresses.clear(); peerFinder.activate(lastAcceptedNodes); @@ -467,6 +473,7 @@ public void testDelegatesRequestHandlingWhenInactive() { final long term = randomNonNegativeLong(); peerFinder.setCurrentTerm(term); peerFinder.deactivate(masterNode); + assertThat(connectedNodes, empty()); final PeersResponse expectedResponse = new PeersResponse(Optional.of(masterNode), Collections.emptyList(), term); final PeersResponse peersResponse = peerFinder.handlePeersRequest(new PeersRequest(sourceNode, Collections.emptyList())); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 5d527319e04d7..0db258defb33d 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1646,7 +1646,10 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() { @Override protected void connectToNodesAndWait(ClusterState newClusterState) { - // don't do anything, and don't block + connectToNodesAsync(newClusterState, () -> { + // no need to block waiting for handshakes etc. to complete, it's enough to let the NodeConnectionsService + // take charge of these connections + }); } } ); diff --git a/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java b/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java index f44217e276ada..78a70617eca7b 100644 --- a/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ClusterConnectionManagerTests.java @@ -8,14 +8,23 @@ package org.elasticsearch.transport; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; import org.elasticsearch.core.TimeValue; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.MockLogAppender; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import org.junit.Before; @@ -29,11 +38,15 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import static org.elasticsearch.test.ActionListenerUtils.anyActionListener; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -116,6 +129,69 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti assertEquals(1, nodeDisconnectedCount.get()); } + @TestLogging(reason="testing log messages emitted on disconnect", value="org.elasticsearch.transport.ClusterConnectionManager:TRACE") + public void testDisconnectLogging() throws IllegalAccessException { + final Supplier nodeFactory = () -> new DiscoveryNode( + randomAlphaOfLength(10), + new TransportAddress(InetAddress.getLoopbackAddress(), 0), + Collections.singletonMap("attr", "val"), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT); + final DiscoveryNode remoteClose = nodeFactory.get(); + final DiscoveryNode localClose = nodeFactory.get(); + final DiscoveryNode shutdownClose = nodeFactory.get(); + + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + final ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + final DiscoveryNode discoveryNode = (DiscoveryNode) invocationOnMock.getArguments()[0]; + listener.onResponse(new TestConnect(discoveryNode)); + return null; + }).when(transport).openConnection(any(), eq(connectionProfile), anyActionListener()); + + final ConnectionManager.ConnectionValidator validator = (c, p, l) -> l.onResponse(null); + final AtomicReference toClose = new AtomicReference<>(); + + PlainActionFuture.get(f -> connectionManager.connectToNode(remoteClose, connectionProfile, validator, f.map(x -> null))); + PlainActionFuture.get(f -> connectionManager.connectToNode(shutdownClose, connectionProfile, validator, f.map(x -> null))); + PlainActionFuture.get(f -> connectionManager.connectToNode(localClose, connectionProfile, validator, f.map(toClose::getAndSet))); + + final Releasable localConnectionRef = toClose.getAndSet(null); + assertThat(localConnectionRef, notNullValue()); + + final String loggerName = "org.elasticsearch.transport.ClusterConnectionManager"; + final Logger logger = LogManager.getLogger(loggerName); + final MockLogAppender appender = new MockLogAppender(); + try { + appender.start(); + Loggers.addAppender(logger, appender); + appender.addExpectation(new MockLogAppender.SeenEventExpectation( + "locally-triggered close message", + loggerName, + Level.DEBUG, + "closing unused transport connection to [" + localClose + "]")); + appender.addExpectation(new MockLogAppender.SeenEventExpectation( + "remotely-triggered close message", + loggerName, + Level.INFO, + "transport connection to [" + remoteClose.descriptionWithoutAttributes() + "] closed by remote")); + appender.addExpectation(new MockLogAppender.SeenEventExpectation( + "shutdown-triggered close message", + loggerName, + Level.TRACE, + "connection manager shut down, closing transport connection to [" + shutdownClose + "]")); + + Releasables.close(localConnectionRef); + connectionManager.disconnectFromNode(remoteClose); + connectionManager.close(); + + appender.assertAllExpectationsMatched(); + } finally { + Loggers.removeAppender(logger, appender); + appender.stop(); + } + } + public void testConcurrentConnects() throws Exception { Set connections = Collections.newSetFromMap(new ConcurrentHashMap<>()); @@ -156,10 +232,16 @@ public void testConcurrentConnects() throws Exception { List threads = new ArrayList<>(); AtomicInteger nodeConnectedCount = new AtomicInteger(); + AtomicInteger nodeClosedCount = new AtomicInteger(); AtomicInteger nodeFailureCount = new AtomicInteger(); - CyclicBarrier barrier = new CyclicBarrier(11); - for (int i = 0; i < 10; i++) { + int threadCount = between(1, 10); + Releasable[] releasables = new Releasable[threadCount]; + + CyclicBarrier barrier = new CyclicBarrier(threadCount + 1); + Semaphore pendingCloses = new Semaphore(threadCount); + for (int i = 0; i < threadCount; i++) { + final int threadIndex = i; Thread thread = new Thread(() -> { try { barrier.await(); @@ -169,10 +251,18 @@ public void testConcurrentConnects() throws Exception { CountDownLatch latch = new CountDownLatch(1); connectionManager.connectToNode(node, connectionProfile, validator, ActionListener.wrap(c -> { - nodeConnectedCount.incrementAndGet(); - if (connectionManager.nodeConnected(node) == false) { - throw new AssertionError("Expected node to be connected"); + assert connectionManager.nodeConnected(node); + + if (randomBoolean()) { + releasables[threadIndex] = c; + nodeConnectedCount.incrementAndGet(); + } else { + assertTrue(pendingCloses.tryAcquire()); + connectionManager.getConnection(node).addRemovedListener(ActionListener.wrap(pendingCloses::release)); + Releasables.close(c); + nodeClosedCount.incrementAndGet(); } + assert latch.getCount() == 1; latch.countDown(); }, e -> { @@ -199,19 +289,23 @@ public void testConcurrentConnects() throws Exception { } }); - assertEquals(10, nodeConnectedCount.get() + nodeFailureCount.get()); - - int managedConnections = connectionManager.size(); - if (managedConnections != 0) { - assertEquals(1, managedConnections); + assertEquals(threadCount, nodeConnectedCount.get() + nodeClosedCount.get() + nodeFailureCount.get()); - // Only a single connection attempt should be open. - assertEquals(1, connections.stream().filter(c -> c.isClosed() == false).count()); + if (nodeConnectedCount.get() == 0) { + // Any successful connections were closed + assertTrue(pendingCloses.tryAcquire(threadCount, 10, TimeUnit.SECONDS)); + assertTrue(connections.stream().allMatch(Transport.Connection::isClosed)); + assertEquals(0, connectionManager.size()); } else { - // No connections succeeded - assertEquals(0, connections.stream().filter(c -> c.isClosed() == false).count()); + assertEquals(1, connectionManager.size()); + assertEquals(1L, connections.stream().filter(c -> c.isClosed() == false).count()); } + if (randomBoolean()) { + Releasables.close(releasables); + assertEquals(0, connectionManager.size()); + assertTrue(connections.stream().allMatch(Transport.Connection::isClosed)); + } connectionManager.close(); // The connection manager will close all open connections @@ -220,6 +314,68 @@ public void testConcurrentConnects() throws Exception { } } + public void testConcurrentConnectsAndDisconnects() throws Exception { + final DiscoveryNode node = new DiscoveryNode("", new TransportAddress(InetAddress.getLoopbackAddress(), 0), Version.CURRENT); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; + listener.onResponse(new TestConnect(node)); + return null; + }).when(transport).openConnection(eq(node), any(), anyActionListener()); + + final ConnectionManager.ConnectionValidator validator = (c, p, l) -> { + if (randomBoolean()) { + l.onResponse(null); + } else { + threadPool.generic().execute(() -> l.onResponse(null)); + } + }; + + final Semaphore pendingConnections = new Semaphore(1000); + final int threadCount = between(1, 10); + final CountDownLatch countDownLatch = new CountDownLatch(threadCount); + + final Runnable action = new Runnable() { + @Override + public void run() { + if (pendingConnections.tryAcquire()) { + connectionManager.connectToNode(node, null, validator, new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + if (connectionManager.nodeConnected(node) == false) { + final String description = releasable.toString(); + fail(description); + } + Releasables.close(releasable); + threadPool.generic().execute(() -> run()); + } + + @Override + public void onFailure(Exception e) { + if (e instanceof ConnectTransportException + && e.getMessage().contains("concurrently connecting and disconnecting")) { + pendingConnections.release(); + threadPool.generic().execute(() -> run()); + } else { + throw new AssertionError("unexpected", e); + } + } + }); + } else { + countDownLatch.countDown(); + } + } + }; + + for (int i = 0; i < threadCount; i++) { + threadPool.generic().execute(action); + } + + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); + assertFalse(connectionManager.nodeConnected(node)); + connectionManager.close(); + } + public void testConnectFailsDuringValidation() { AtomicInteger nodeConnectedCount = new AtomicInteger(); AtomicInteger nodeDisconnectedCount = new AtomicInteger(); @@ -249,9 +405,9 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti ConnectionManager.ConnectionValidator validator = (c, p, l) -> l.onFailure(new ConnectTransportException(node, "")); - PlainActionFuture fut = new PlainActionFuture<>(); + PlainActionFuture fut = new PlainActionFuture<>(); connectionManager.connectToNode(node, connectionProfile, validator, fut); - expectThrows(ConnectTransportException.class, () -> fut.actionGet()); + expectThrows(ConnectTransportException.class, fut::actionGet); assertTrue(connection.isClosed()); assertFalse(connectionManager.nodeConnected(node)); @@ -289,9 +445,9 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti ConnectionManager.ConnectionValidator validator = (c, p, l) -> l.onResponse(null); - PlainActionFuture fut = new PlainActionFuture<>(); + PlainActionFuture fut = new PlainActionFuture<>(); connectionManager.connectToNode(node, connectionProfile, validator, fut); - expectThrows(ConnectTransportException.class, () -> fut.actionGet()); + expectThrows(ConnectTransportException.class, fut::actionGet); assertFalse(connectionManager.nodeConnected(node)); expectThrows(NodeNotConnectedException.class, () -> connectionManager.getConnection(node)); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionManagerTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionManagerTests.java index a1e8ba6788631..0d7307c34dbfc 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteConnectionManagerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteConnectionManagerTests.java @@ -50,15 +50,15 @@ public void testGetConnection() { DiscoveryNode node1 = new DiscoveryNode("node-1", address, Version.CURRENT); PlainActionFuture future1 = PlainActionFuture.newFuture(); - remoteConnectionManager.connectToNode(node1, null, validator, future1); + remoteConnectionManager.connectToRemoteClusterNode(node1, validator, future1); assertTrue(future1.isDone()); // Add duplicate connect attempt to ensure that we do not get duplicate connections in the round robin - remoteConnectionManager.connectToNode(node1, null, validator, PlainActionFuture.newFuture()); + remoteConnectionManager.connectToRemoteClusterNode(node1, validator, PlainActionFuture.newFuture()); DiscoveryNode node2 = new DiscoveryNode("node-2", address, Version.CURRENT.minimumCompatibilityVersion()); PlainActionFuture future2 = PlainActionFuture.newFuture(); - remoteConnectionManager.connectToNode(node2, null, validator, future2); + remoteConnectionManager.connectToRemoteClusterNode(node2, validator, future2); assertTrue(future2.isDone()); assertEquals(node1, remoteConnectionManager.getConnection(node1).getNode()); diff --git a/server/src/test/java/org/elasticsearch/transport/TransportServiceDeserializationFailureTests.java b/server/src/test/java/org/elasticsearch/transport/TransportServiceDeserializationFailureTests.java index afc091de6b4dd..7f9c987f38d83 100644 --- a/server/src/test/java/org/elasticsearch/transport/TransportServiceDeserializationFailureTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TransportServiceDeserializationFailureTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; +import org.elasticsearch.core.Releasable; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskAwareRequest; import org.elasticsearch.tasks.TaskId; @@ -63,7 +64,7 @@ protected void onSendRequest(long requestId, String action, TransportRequest req transportService.start(); transportService.acceptIncomingRequests(); - final PlainActionFuture connectionFuture = new PlainActionFuture<>(); + final PlainActionFuture connectionFuture = new PlainActionFuture<>(); transportService.connectToNode(otherNode, connectionFuture); assertTrue(connectionFuture.isDone()); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java index 2d036d39de34b..e881c84ccb199 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -564,6 +564,13 @@ void stabilise(long stabilisationDurationMillis) { clusterNode.getLastAppliedClusterState().blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID), equalTo(false)); assertThat(nodeId + " has no STATE_NOT_RECOVERED_BLOCK", clusterNode.getLastAppliedClusterState().blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK), equalTo(false)); + + for (final ClusterNode otherNode : clusterNodes) { + if (isConnectedPair(leader, otherNode) && isConnectedPair(otherNode, clusterNode)) { + assertTrue(otherNode.getId() + " is connected to healthy node " + clusterNode.getId(), + otherNode.transportService.nodeConnected(clusterNode.localNode)); + } + } } else { assertThat(nodeId + " is not following " + leaderId, clusterNode.coordinator.getMode(), is(CANDIDATE)); assertThat(nodeId + " has no master", clusterNode.getLastAppliedClusterState().nodes().getMasterNode(), nullValue()); @@ -571,6 +578,13 @@ void stabilise(long stabilisationDurationMillis) { clusterNode.getLastAppliedClusterState().blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID), equalTo(true)); assertFalse(nodeId + " is not in the applied state on " + leaderId, leader.getLastAppliedClusterState().getNodes().nodeExists(nodeId)); + + for (final ClusterNode otherNode : clusterNodes) { + if (isConnectedPair(leader, otherNode)) { + assertFalse(otherNode.getId() + " is not connected to removed node " + clusterNode.getId(), + otherNode.transportService.nodeConnected(clusterNode.localNode)); + } + } } } @@ -1412,7 +1426,10 @@ public void onNewClusterState(String source, Supplier clusterState @Override protected void connectToNodesAndWait(ClusterState newClusterState) { - // don't do anything, and don't block + connectToNodesAsync(newClusterState, () -> { + // no need to block waiting for handshakes etc. to complete, it's enough to let the NodeConnectionsService take charge of + // these connections + }); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java b/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java index 3f31a87e42c26..0c55611b3b831 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java @@ -104,44 +104,61 @@ protected void onSendRequest(long requestId, String action, TransportRequest req assert destinationTransport.getLocalNode().equals(getLocalNode()) == false : "non-local message from " + getLocalNode() + " to itself"; + request.incRef(); + destinationTransport.execute(new RebootSensitiveRunnable() { @Override public void run() { - final ConnectionStatus connectionStatus = getConnectionStatus(destinationTransport.getLocalNode()); - switch (connectionStatus) { - case BLACK_HOLE: - case BLACK_HOLE_REQUESTS_ONLY: - onBlackholedDuringSend(requestId, action, destinationTransport); - break; - - case DISCONNECTED: - onDisconnectedDuringSend(requestId, action, destinationTransport); - break; - - case CONNECTED: - onConnectedDuringSend(requestId, action, request, destinationTransport); - break; - - default: - throw new AssertionError("unexpected status: " + connectionStatus); + try { + final ConnectionStatus connectionStatus = getConnectionStatus(destinationTransport.getLocalNode()); + switch (connectionStatus) { + case BLACK_HOLE: + case BLACK_HOLE_REQUESTS_ONLY: + onBlackholedDuringSend(requestId, action, destinationTransport); + break; + + case DISCONNECTED: + onDisconnectedDuringSend(requestId, action, destinationTransport); + break; + + case CONNECTED: + onConnectedDuringSend(requestId, action, request, destinationTransport); + break; + + default: + throw new AssertionError("unexpected status: " + connectionStatus); + } + } finally { + request.decRef(); } } @Override public void ifRebooted() { - deterministicTaskQueue.scheduleNow(() -> execute(new Runnable() { + request.decRef(); + deterministicTaskQueue.scheduleNow(new Runnable() { @Override public void run() { - handleRemoteError( - requestId, - new NodeNotConnectedException(destinationTransport.getLocalNode(), "node rebooted")); + execute(new Runnable() { + @Override + public void run() { + handleRemoteError( + requestId, + new NodeNotConnectedException(destinationTransport.getLocalNode(), "node rebooted")); + } + + @Override + public String toString() { + return "error response (reboot) to " + internalToString(); + } + }); } @Override public String toString() { - return "error response (reboot) to " + internalToString(); + return "scheduling of error response (reboot) to " + internalToString(); } - })); + }); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java index 29673b2d14846..abc1a2c92f2d4 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableConnectionManager.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.core.Releasable; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.ConnectionManager; @@ -86,7 +87,7 @@ public void removeListener(TransportConnectionListener listener) { @Override public void connectToNode(DiscoveryNode node, ConnectionProfile connectionProfile, - ConnectionValidator connectionValidator, ActionListener listener) + ConnectionValidator connectionValidator, ActionListener listener) throws ConnectTransportException { delegate.connectToNode(node, connectionProfile, connectionValidator, listener); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java index f078ad6d4bf75..c40eaa95a26bc 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/StubbableTransport.java @@ -229,6 +229,10 @@ public void addCloseListener(ActionListener listener) { connection.addCloseListener(listener); } + @Override + public void addRemovedListener(ActionListener listener) { + connection.addRemovedListener(listener); + } @Override public boolean isClosed() { @@ -250,6 +254,11 @@ public void close() { connection.close(); } + @Override + public void onRemoved() { + connection.onRemoved(); + } + public Transport.Connection getConnection() { return connection; } @@ -258,6 +267,26 @@ public Transport.Connection getConnection() { public String toString() { return "WrappedConnection[" + connection + "]"; } + + @Override + public void incRef() { + connection.incRef(); + } + + @Override + public boolean tryIncRef() { + return connection.tryIncRef(); + } + + @Override + public boolean decRef() { + return connection.decRef(); + } + + @Override + public boolean hasReferences() { + return connection.hasReferences(); + } } @FunctionalInterface diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 9d45c190a99da..e82fb60783527 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -21,8 +21,6 @@ import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.core.Nullable; -import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -35,9 +33,12 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.mocksocket.MockServerSocket; import org.elasticsearch.node.Node; @@ -2831,9 +2832,9 @@ public void sendRequest( serviceC.connectToNode( serviceA.getLocalDiscoNode(), ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY), - new ActionListener() { + new ActionListener() { @Override - public void onResponse(final Void v) { + public void onResponse(final Releasable ignored) { latch.countDown(); } diff --git a/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java b/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java index 4dd8e93c26693..b55dacf015bb5 100644 --- a/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; +import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Tuple; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.disruption.DisruptableMockTransport.ConnectionStatus; @@ -136,9 +137,9 @@ protected void execute(Runnable runnable) { service1.start(); service2.start(); - final PlainActionFuture fut1 = new PlainActionFuture<>(); + final PlainActionFuture fut1 = new PlainActionFuture<>(); service1.connectToNode(node2, fut1); - final PlainActionFuture fut2 = new PlainActionFuture<>(); + final PlainActionFuture fut2 = new PlainActionFuture<>(); service2.connectToNode(node1, fut2); deterministicTaskQueue.runAllTasksInTimeOrder(); assertTrue(fut1.isDone()); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java index b99bd4608ce2a..7e857600272f1 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java @@ -116,5 +116,10 @@ public boolean tryIncRef() { public boolean decRef() { return chunk.decRef(); } + + @Override + public boolean hasReferences() { + return chunk.hasReferences(); + } } }