Skip to content

Commit

Permalink
Improve control of outgoing connection lifecycles (#77672)
Browse files Browse the repository at this point in the history
Today we open connections to other nodes in various places and largely
assume that they remain open as needed, only closing them when applying
a cluster state that removes the remote node from the cluster. This
isn't ideal: we might preserve unnecessary connections to remote nodes
that aren't in the cluster if they never manage to join the cluster, and
we might also disconnect from a node that left the cluster while it's in
the process of re-joining too (see #67873).

With this commit we move to a model in which each user of a connection
to a remote node acquires a reference to the connection that must be
released once it's no longer needed. Connections remain open while there
are any live references, but are now actively closed when all references
are released.

Fixes #67873
Backport of #77295
  • Loading branch information
DaveCTurner authored Sep 14, 2021
1 parent 431aca0 commit 4d4f2a9
Show file tree
Hide file tree
Showing 45 changed files with 1,285 additions and 596 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public final boolean decRef() {
return false;
}

@Override
public final boolean hasReferences() {
return refCount.get() > 0;
}

/**
* Called whenever the ref count is incremented or decremented. Can be overridden to record access to the instance for debugging
* purposes.
Expand All @@ -74,7 +79,7 @@ protected void alreadyClosed() {
/**
* Returns the current reference count.
*/
public int refCount() {
public final int refCount() {
return this.refCount.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,12 @@ public interface RefCounted {
* @return returns {@code true} if the ref count dropped to 0 as a result of calling this method
*/
boolean decRef();

/**
* Returns {@code true} only if there was at least one active reference when the method was called; if it returns {@code false} then the
* object is closed; future attempts to acquire references will fail.
*
* @return whether there are currently any active references to this object.
*/
boolean hasReferences();
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,7 @@ public void testRefCount() {
assertThat(
expectThrows(IllegalStateException.class, counted::incRef).getMessage(),
equalTo(AbstractRefCounted.ALREADY_CLOSED_MESSAGE));

try {
counted.ensureOpen();
fail(" expected exception");
} catch (IllegalStateException ex) {
assertThat(ex.getMessage(), equalTo("closed"));
}
assertThat(expectThrows(IllegalStateException.class, counted::ensureOpen).getMessage(), equalTo("closed"));
}

public void testMultiThreaded() throws InterruptedException {
Expand All @@ -79,6 +73,7 @@ public void testMultiThreaded() throws InterruptedException {
latch.await();
for (int j = 0; j < 10000; j++) {
counted.incRef();
assertTrue(counted.hasReferences());
try {
counted.ensureOpen();
} finally {
Expand All @@ -96,13 +91,11 @@ public void testMultiThreaded() throws InterruptedException {
thread.join();
}
counted.decRef();
try {
counted.ensureOpen();
fail("expected to be closed");
} catch (IllegalStateException ex) {
assertThat(ex.getMessage(), equalTo("closed"));
}
assertThat(expectThrows(IllegalStateException.class, counted::ensureOpen).getMessage(), equalTo("closed"));
assertThat(expectThrows(IllegalStateException.class, counted::incRef).getMessage(),
equalTo(AbstractRefCounted.ALREADY_CLOSED_MESSAGE));
assertThat(counted.refCount(), is(0));
assertFalse(counted.hasReferences());
assertThat(exceptions, Matchers.emptyIterable());
}

Expand All @@ -117,7 +110,8 @@ protected void closeInternal() {

public void ensureOpen() {
if (closed.get()) {
assert this.refCount() == 0;
assertEquals(0, this.refCount());
assertFalse(hasReferences());
throw new IllegalStateException("closed");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,9 @@ public void testRemoveBanParentsOnDisconnect() throws Exception {
if (bannedParent.getNodeId().equals(node.getId()) && randomBoolean()) {
Collection<Transport.Connection> childConns = taskManager.startBanOnChildTasks(bannedParent.getId(), "", () -> {});
for (Transport.Connection connection : randomSubsetOf(childConns)) {
connection.close();
if (connection.getNode().equals(node) == false) {
connection.close();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,22 @@
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
import org.elasticsearch.cluster.coordination.FollowersChecker;
import org.elasticsearch.cluster.coordination.LagDetector;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
Expand All @@ -40,6 +43,8 @@
import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.junit.annotations.TestIssueLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -58,6 +63,11 @@

import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED;
import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING;
import static org.elasticsearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_INTERVAL_SETTING;
import static org.elasticsearch.cluster.coordination.LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING;
import static org.elasticsearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
Expand Down Expand Up @@ -494,4 +504,63 @@ public void testRestartNodeWhileIndexing() throws Exception {
}
}

public void testRejoinWhileBeingRemoved() {
final String masterNode = internalCluster().startMasterOnlyNode(Settings.builder()
.put(FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), "100ms")
.put(FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), "1")
.build());
final String dataNode = internalCluster().startDataOnlyNode(Settings.builder()
.put(DISCOVERY_FIND_PEERS_INTERVAL_SETTING.getKey(), "100ms")
.put(LEADER_CHECK_INTERVAL_SETTING.getKey(), "100ms")
.put(LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), "1")
.build());

final ClusterService masterClusterService = internalCluster().getInstance(ClusterService.class, masterNode);
final PlainActionFuture<Void> removedNode = new PlainActionFuture<>();
masterClusterService.addListener(clusterChangedEvent -> {
if (removedNode.isDone() == false && clusterChangedEvent.state().nodes().getDataNodes().isEmpty()) {
removedNode.onResponse(null);
}
});

final ClusterService dataClusterService = internalCluster().getInstance(ClusterService.class, dataNode);
final PlainActionFuture<Void> failedLeader = new PlainActionFuture<Void>() {
@Override
protected boolean blockingAllowed() {
// we're deliberately blocking the cluster applier on the master until the data node starts to rejoin
return true;
}
};
final AtomicBoolean dataNodeHasMaster = new AtomicBoolean(true);
dataClusterService.addListener(clusterChangedEvent -> {
dataNodeHasMaster.set(clusterChangedEvent.state().nodes().getMasterNode() != null);
if (failedLeader.isDone() == false && dataNodeHasMaster.get() == false) {
failedLeader.onResponse(null);
}
});

masterClusterService.addHighPriorityApplier(event -> {
failedLeader.actionGet();
if (dataNodeHasMaster.get() == false) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new AssertionError("unexpected", e);
}
}
});

final MockTransportService dataTransportService
= (MockTransportService) internalCluster().getInstance(TransportService.class, dataNode);
dataTransportService.addRequestHandlingBehavior(FollowersChecker.FOLLOWER_CHECK_ACTION_NAME, (handler, request, channel, task) -> {
if (removedNode.isDone() == false) {
channel.sendResponse(new ElasticsearchException("simulated check failure"));
} else {
handler.messageReceived(request, channel, task);
}
});

removedNode.actionGet(10, TimeUnit.SECONDS);
ensureStableCluster(2);
}
}
Loading

0 comments on commit 4d4f2a9

Please sign in to comment.