Skip to content

Commit

Permalink
Establish seed node connections in async during node bootstrap (#8038) (
Browse files Browse the repository at this point in the history
#8077)

* Establish seed node connection setup in async

Signed-off-by: Ankit Kala <ankikala@amazon.com>
  • Loading branch information
ankitkala committed Jun 15, 2023
1 parent b642e69 commit bcd9962
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 35 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Implement concurrent aggregations support without profile option ([#7514](https://github.com/opensearch-project/OpenSearch/pull/7514))
- Add dynamic index and cluster setting for concurrent segment search ([#7956](https://github.com/opensearch-project/OpenSearch/pull/7956))
- Add descending order search optimization through reverse segment read. ([#7967](https://github.com/opensearch-project/OpenSearch/pull/7967))
- Make remote cluster connection setup in async ([#8038](https://github.com/opensearch-project/OpenSearch/pull/8038))

### Dependencies
- Bump `com.azure:azure-storage-common` from 12.21.0 to 12.21.1 (#7566, #7814)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.opensearch.action.OriginalIndices;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
Expand All @@ -63,7 +62,6 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -328,35 +326,23 @@ synchronized void updateRemoteCluster(String clusterAlias, Settings newSettings,
}

/**
* Connects to all remote clusters in a blocking fashion. This should be called on node startup to establish an initial connection
* Connects to all remote clusters in a non-blocking fashion. This should be called on node startup to establish an initial connection
* to all configured seed nodes.
*/
void initializeRemoteClusters() {
final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
final PlainActionFuture<Collection<Void>> future = new PlainActionFuture<>();
void initializeRemoteClusters(ActionListener<Void> listener) {
Set<String> enabledClusters = RemoteClusterAware.getEnabledRemoteClusters(settings);

if (enabledClusters.isEmpty()) {
listener.onResponse(null);
return;
}

GroupedActionListener<Void> listener = new GroupedActionListener<>(future, enabledClusters.size());
for (String clusterAlias : enabledClusters) {
updateRemoteCluster(clusterAlias, settings, listener);
}
GroupedActionListener<Void> groupListener = new GroupedActionListener<>(
ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure),
enabledClusters.size()
);

if (enabledClusters.isEmpty()) {
future.onResponse(null);
}

try {
future.get(timeValue.millis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (TimeoutException ex) {
logger.warn("failed to connect to remote clusters within {}", timeValue.toString());
} catch (Exception e) {
throw new IllegalStateException("failed to connect to remote clusters", e);
for (String clusterAlias : enabledClusters) {
updateRemoteCluster(clusterAlias, settings, groupListener);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,12 @@ protected void doStart() {

if (remoteClusterClient) {
// here we start to connect to the remote clusters
remoteClusterService.initializeRemoteClusters();
remoteClusterService.initializeRemoteClusters(
ActionListener.wrap(
r -> logger.info("Remote clusters initialized successfully."),
e -> logger.error("Remote clusters initialization failed partially", e)
)
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void testConnectAndExecuteRequest() throws Exception {
service.acceptIncomingRequests();
logger.info("now accepting incoming requests on local transport");
RemoteClusterService remoteClusterService = service.getRemoteClusterService();
assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode));
assertBusy(() -> { assertTrue(remoteClusterService.isRemoteNodeConnected("test", remoteNode)); }, 10, TimeUnit.SECONDS);
Client client = remoteClusterService.getRemoteClusterClient(threadPool, "test");
ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().get();
assertNotNull(clusterStateResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;

Expand Down Expand Up @@ -92,6 +93,20 @@ private MockTransportService startTransport(
return RemoteClusterConnectionTests.startTransport(id, knownNodes, version, threadPool, settings);
}

void initializeRemoteClusters(RemoteClusterService service) {
final PlainActionFuture<Void> future = new PlainActionFuture<>();
service.initializeRemoteClusters(future);
try {
future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (TimeoutException ex) {
logger.warn("Timed out connecting to remote clusters");
} catch (Exception e) {
throw new IllegalStateException("failed to connect to remote clusters", e);
}
}

public void testSettingsAreRegistered() {
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE));
assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING));
Expand Down Expand Up @@ -157,7 +172,7 @@ public void testGroupClusterIndices() throws IOException {
builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
initializeRemoteClusters(service);
assertTrue(service.isCrossClusterSearchEnabled());
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
assertTrue(service.isRemoteClusterRegistered("cluster_2"));
Expand Down Expand Up @@ -228,7 +243,7 @@ public void testGroupIndices() throws IOException {
builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
initializeRemoteClusters(service);
assertTrue(service.isCrossClusterSearchEnabled());
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
assertTrue(service.isRemoteClusterRegistered("cluster_2"));
Expand Down Expand Up @@ -321,7 +336,7 @@ public void testIncrementallyAddClusters() throws IOException {
builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
initializeRemoteClusters(service);
assertFalse(service.isCrossClusterSearchEnabled());
Settings cluster1Settings = createSettings(
"cluster_1",
Expand Down Expand Up @@ -384,7 +399,7 @@ public void testDefaultPingSchedule() throws IOException {
transportService.acceptIncomingRequests();
try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
initializeRemoteClusters(service);
assertTrue(service.isCrossClusterSearchEnabled());
service.validateAndUpdateRemoteCluster(
"cluster_1",
Expand Down Expand Up @@ -436,7 +451,7 @@ public void testCustomPingSchedule() throws IOException {
TimeValue.timeValueSeconds(randomIntBetween(1, 10));
builder.put("cluster.remote.cluster_2.transport.ping_schedule", pingSchedule2);
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
service.initializeRemoteClusters();
initializeRemoteClusters(service);
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
RemoteClusterConnection remoteClusterConnection1 = service.getRemoteClusterConnection("cluster_1");
assertEquals(pingSchedule1, remoteClusterConnection1.getConnectionManager().getConnectionProfile().getPingInterval());
Expand Down Expand Up @@ -467,7 +482,7 @@ public void testChangeSettings() throws Exception {
Settings.Builder builder = Settings.builder();
builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
service.initializeRemoteClusters();
initializeRemoteClusters(service);
RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1");
Settings.Builder settingsChange = Settings.builder();
TimeValue pingSchedule = TimeValue.timeValueSeconds(randomIntBetween(6, 8));
Expand Down Expand Up @@ -517,7 +532,7 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException {
builder.putList("cluster.remote.cluster_2.seed", c2N1Node.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
initializeRemoteClusters(service);
assertFalse(service.isCrossClusterSearchEnabled());

final CountDownLatch firstLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -580,7 +595,7 @@ public void testRemoteNodeRoles() throws IOException, InterruptedException {
builder.putList("cluster.remote.cluster_2.seed", c2N1Node.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
initializeRemoteClusters(service);
assertFalse(service.isCrossClusterSearchEnabled());

final CountDownLatch firstLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -648,7 +663,7 @@ public void testCollectNodes() throws InterruptedException, IOException {
builder.putList("cluster.remote.cluster_2.seed", c2N1Node.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
initializeRemoteClusters(service);
assertFalse(service.isCrossClusterSearchEnabled());

final CountDownLatch firstLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -896,7 +911,7 @@ public void testReconnectWhenStrategySettingsUpdated() throws Exception {
builder.putList("cluster.remote.cluster_test.seeds", Collections.singletonList(node0.getAddress().toString()));
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
initializeRemoteClusters(service);
assertTrue(service.isCrossClusterSearchEnabled());

final RemoteClusterConnection firstRemoteClusterConnection = service.getRemoteClusterConnection("cluster_test");
Expand Down

0 comments on commit bcd9962

Please sign in to comment.