From 91fe7013904c845cd541d6e1c750612d9ddb4e17 Mon Sep 17 00:00:00 2001 From: Ankit Kala Date: Tue, 13 Jun 2023 01:12:35 +0530 Subject: [PATCH] Establish seed node connection setup in async Signed-off-by: Ankit Kala --- CHANGELOG.md | 1 + .../transport/RemoteClusterService.java | 40 +++++++++---------- .../transport/TransportService.java | 7 +++- .../transport/RemoteClusterServiceTests.java | 35 +++++++++++----- 4 files changed, 51 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0473a086d91eb..b7c6b8bbdb536 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -92,6 +92,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Added - Add TokenManager Interface ([#7452](https://github.com/opensearch-project/OpenSearch/pull/7452)) - Add Remote store as a segment replication source ([#7653](https://github.com/opensearch-project/OpenSearch/pull/7653)) +- Make remote cluster connection setup in async ([#8038](https://github.com/opensearch-project/OpenSearch/pull/8038)) ### Dependencies - Bump `jackson` from 2.15.1 to 2.15.2 ([#7897](https://github.com/opensearch-project/OpenSearch/pull/7897)) diff --git a/server/src/main/java/org/opensearch/transport/RemoteClusterService.java b/server/src/main/java/org/opensearch/transport/RemoteClusterService.java index 838367baf8a2e..5a7d68a6323b8 100644 --- a/server/src/main/java/org/opensearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/opensearch/transport/RemoteClusterService.java @@ -38,7 +38,6 @@ import org.opensearch.action.OriginalIndices; import org.opensearch.action.support.GroupedActionListener; import org.opensearch.action.support.IndicesOptions; -import org.opensearch.action.support.PlainActionFuture; import org.opensearch.client.Client; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; @@ -63,7 +62,6 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Predicate; @@ -331,32 +329,32 @@ synchronized void updateRemoteCluster(String clusterAlias, Settings newSettings, * Connects to all remote clusters in a blocking fashion. This should be called on node startup to establish an initial connection * to all configured seed nodes. */ - void initializeRemoteClusters() { - final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings); - final PlainActionFuture> future = new PlainActionFuture<>(); + void initializeRemoteClusters(ActionListener listener) { Set enabledClusters = RemoteClusterAware.getEnabledRemoteClusters(settings); - if (enabledClusters.isEmpty()) { + listener.onResponse(null); return; } - GroupedActionListener listener = new GroupedActionListener<>(future, enabledClusters.size()); - for (String clusterAlias : enabledClusters) { - updateRemoteCluster(clusterAlias, settings, listener); - } + GroupedActionListener groupListener = new GroupedActionListener<>( + ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure), + enabledClusters.size() + ); - if (enabledClusters.isEmpty()) { - future.onResponse(null); - } + for (String clusterAlias : enabledClusters) { + updateRemoteCluster(clusterAlias, settings, new ActionListener<>() { + @Override + public void onResponse(Void unused) { + logger.debug("Successfully established connection with {}", clusterAlias); + groupListener.onResponse(unused); + } - try { - future.get(timeValue.millis(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (TimeoutException ex) { - logger.warn("failed to connect to remote clusters within {}", timeValue.toString()); - } catch (Exception e) { - throw new IllegalStateException("failed to connect to remote clusters", e); + @Override + public void onFailure(Exception e) { + logger.error("Failed to established connection with {}: {}", clusterAlias, e); + groupListener.onFailure(e); + } + }); } } diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index 0a31d2dbf6ec5..8cd46a42952ae 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -300,7 +300,12 @@ protected void doStart() { if (remoteClusterClient) { // here we start to connect to the remote clusters - remoteClusterService.initializeRemoteClusters(); + remoteClusterService.initializeRemoteClusters( + ActionListener.wrap( + r -> logger.info("Remote clusters initialized successfully."), + e -> logger.error("Remote clusters initialization failed partially", e) + ) + ); } } diff --git a/server/src/test/java/org/opensearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/opensearch/transport/RemoteClusterServiceTests.java index a25820622b2cf..52bf7aa08fe67 100644 --- a/server/src/test/java/org/opensearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/opensearch/transport/RemoteClusterServiceTests.java @@ -59,6 +59,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; @@ -92,6 +93,20 @@ private MockTransportService startTransport( return RemoteClusterConnectionTests.startTransport(id, knownNodes, version, threadPool, settings); } + void initializeRemoteClusters(RemoteClusterService service) { + final PlainActionFuture future = new PlainActionFuture<>(); + service.initializeRemoteClusters(future); + try { + future.get(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (TimeoutException ex) { + logger.warn("Timed out connecting to remote clusters"); + } catch (Exception e) { + throw new IllegalStateException("failed to connect to remote clusters", e); + } + } + public void testSettingsAreRegistered() { assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE)); assertTrue(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.contains(RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING)); @@ -157,7 +172,7 @@ public void testGroupClusterIndices() throws IOException { builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); assertTrue(service.isRemoteClusterRegistered("cluster_2")); @@ -228,7 +243,7 @@ public void testGroupIndices() throws IOException { builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertTrue(service.isCrossClusterSearchEnabled()); assertTrue(service.isRemoteClusterRegistered("cluster_1")); assertTrue(service.isRemoteClusterRegistered("cluster_2")); @@ -321,7 +336,7 @@ public void testIncrementallyAddClusters() throws IOException { builder.putList("cluster.remote.cluster_2.seeds", cluster2Seed.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(Settings.EMPTY, transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertFalse(service.isCrossClusterSearchEnabled()); Settings cluster1Settings = createSettings( "cluster_1", @@ -384,7 +399,7 @@ public void testDefaultPingSchedule() throws IOException { transportService.acceptIncomingRequests(); try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertTrue(service.isCrossClusterSearchEnabled()); service.validateAndUpdateRemoteCluster( "cluster_1", @@ -436,7 +451,7 @@ public void testCustomPingSchedule() throws IOException { TimeValue.timeValueSeconds(randomIntBetween(1, 10)); builder.put("cluster.remote.cluster_2.transport.ping_schedule", pingSchedule2); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertTrue(service.isRemoteClusterRegistered("cluster_1")); RemoteClusterConnection remoteClusterConnection1 = service.getRemoteClusterConnection("cluster_1"); assertEquals(pingSchedule1, remoteClusterConnection1.getConnectionManager().getConnectionProfile().getPingInterval()); @@ -467,7 +482,7 @@ public void testChangeSettings() throws Exception { Settings.Builder builder = Settings.builder(); builder.putList("cluster.remote.cluster_1.seeds", cluster1Seed.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { - service.initializeRemoteClusters(); + initializeRemoteClusters(service); RemoteClusterConnection remoteClusterConnection = service.getRemoteClusterConnection("cluster_1"); Settings.Builder settingsChange = Settings.builder(); TimeValue pingSchedule = TimeValue.timeValueSeconds(randomIntBetween(6, 8)); @@ -517,7 +532,7 @@ public void testRemoteNodeAttribute() throws IOException, InterruptedException { builder.putList("cluster.remote.cluster_2.seed", c2N1Node.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertFalse(service.isCrossClusterSearchEnabled()); final CountDownLatch firstLatch = new CountDownLatch(1); @@ -580,7 +595,7 @@ public void testRemoteNodeRoles() throws IOException, InterruptedException { builder.putList("cluster.remote.cluster_2.seed", c2N1Node.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertFalse(service.isCrossClusterSearchEnabled()); final CountDownLatch firstLatch = new CountDownLatch(1); @@ -648,7 +663,7 @@ public void testCollectNodes() throws InterruptedException, IOException { builder.putList("cluster.remote.cluster_2.seed", c2N1Node.getAddress().toString()); try (RemoteClusterService service = new RemoteClusterService(settings, transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertFalse(service.isCrossClusterSearchEnabled()); final CountDownLatch firstLatch = new CountDownLatch(1); @@ -896,7 +911,7 @@ public void testReconnectWhenStrategySettingsUpdated() throws Exception { builder.putList("cluster.remote.cluster_test.seeds", Collections.singletonList(node0.getAddress().toString())); try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) { assertFalse(service.isCrossClusterSearchEnabled()); - service.initializeRemoteClusters(); + initializeRemoteClusters(service); assertTrue(service.isCrossClusterSearchEnabled()); final RemoteClusterConnection firstRemoteClusterConnection = service.getRemoteClusterConnection("cluster_test");