Skip to content

Commit

Permalink
Introduce more parallelism into cross cluster test bootstrapping (ela…
Browse files Browse the repository at this point in the history
…stic#117820)

We can parallelize starting the clusters and a few other things
to effectively speed up these tests by 2x which comes out to about a minute
of execution time saved for all of those in :server:internalClusterTests
on my workstation.
  • Loading branch information
original-brownbear authored Dec 4, 2024
1 parent 9a81eb2 commit eb0020f
Show file tree
Hide file tree
Showing 26 changed files with 80 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public class ResolveClusterDataStreamIT extends AbstractMultiClustersTestCase {
private static long LATEST_TIMESTAMP = 1691348820000L;

@Override
protected Collection<String> remoteClusterAlias() {
protected List<String> remoteClusterAlias() {
return List.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class CrossClusterPainlessExecuteIT extends AbstractMultiClustersTestCase
private static final String KEYWORD_FIELD = "my_field";

@Override
protected Collection<String> remoteClusterAlias() {
protected List<String> remoteClusterAlias() {
return List.of(REMOTE_CLUSTER);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ protected boolean reuseClusters() {
}

@Override
protected Collection<String> remoteClusterAlias() {
protected List<String> remoteClusterAlias() {
return List.of(REMOTE_CLUSTER);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.NodeRoles;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand All @@ -24,7 +23,7 @@

public class RemoteInfoIT extends AbstractMultiClustersTestCase {
@Override
protected Collection<String> remoteClusterAlias() {
protected List<String> remoteClusterAlias() {
if (randomBoolean()) {
return List.of();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.test.InternalTestCluster;
import org.junit.Assert;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -51,7 +50,7 @@ protected boolean reuseClusters() {
}

@Override
protected Collection<String> remoteClusterAlias() {
protected List<String> remoteClusterAlias() {
return List.of(REMOTE1, REMOTE2);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class CCSPointInTimeIT extends AbstractMultiClustersTestCase {
public static final String REMOTE_CLUSTER = "remote_cluster";

@Override
protected Collection<String> remoteClusterAlias() {
protected List<String> remoteClusterAlias() {
return List.of(REMOTE_CLUSTER);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.elasticsearch.transport.RemoteClusterAware;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -54,7 +53,7 @@ public class ResolveClusterIT extends AbstractMultiClustersTestCase {
private static long LATEST_TIMESTAMP = 1691348820000L;

@Override
protected Collection<String> remoteClusterAlias() {
protected List<String> remoteClusterAlias() {
return List.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class CCSCanMatchIT extends AbstractMultiClustersTestCase {
static final String REMOTE_CLUSTER = "cluster_a";

@Override
protected Collection<String> remoteClusterAlias() {
protected List<String> remoteClusterAlias() {
return List.of("cluster_a");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,19 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.stats.CCSTelemetrySnapshot;
import org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry.Result;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.search.ClosePointInTimeRequest;
import org.elasticsearch.action.search.OpenPointInTimeRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.TransportClosePointInTimeAction;
import org.elasticsearch.action.search.TransportOpenPointInTimeAction;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -78,7 +81,7 @@ protected boolean reuseClusters() {
}

@Override
protected Collection<String> remoteClusterAlias() {
protected List<String> remoteClusterAlias() {
return List.of(REMOTE1, REMOTE2);
}

Expand Down Expand Up @@ -126,12 +129,9 @@ private CCSTelemetrySnapshot getTelemetryFromFailedSearch(SearchRequest searchRe
// We want to send search to a specific node (we don't care which one) so that we could
// collect the CCS telemetry from it later
String nodeName = cluster(LOCAL_CLUSTER).getRandomNodeName();
PlainActionFuture<SearchResponse> queryFuture = new PlainActionFuture<>();
cluster(LOCAL_CLUSTER).client(nodeName).search(searchRequest, queryFuture);
assertBusy(() -> assertTrue(queryFuture.isDone()));

// We expect failure, but we don't care too much which failure it is in this test
ExecutionException ee = expectThrows(ExecutionException.class, queryFuture::get);
ExecutionException ee = expectThrows(ExecutionException.class, cluster(LOCAL_CLUSTER).client(nodeName).search(searchRequest)::get);
assertNotNull(ee.getCause());

return getTelemetrySnapshot(nodeName);
Expand Down Expand Up @@ -637,56 +637,62 @@ private CCSTelemetrySnapshot getTelemetrySnapshot(String nodeName) {
return usage.getCcsUsageHolder().getCCSTelemetrySnapshot();
}

private Map<String, Object> setupClusters() {
private Map<String, Object> setupClusters() throws ExecutionException, InterruptedException {
String localIndex = "demo";
String remoteIndex = "prod";
int numShardsLocal = randomIntBetween(2, 10);
Settings localSettings = indexSettings(numShardsLocal, randomIntBetween(0, 1)).build();
assertAcked(
final PlainActionFuture<Void> future = new PlainActionFuture<>();
try (RefCountingListener refCountingListener = new RefCountingListener(future)) {
client(LOCAL_CLUSTER).admin()
.indices()
.prepareCreate(localIndex)
.setSettings(localSettings)
.setMapping("@timestamp", "type=date", "f", "type=text")
);
indexDocs(client(LOCAL_CLUSTER), localIndex);

String remoteIndex = "prod";
int numShardsRemote = randomIntBetween(2, 10);
for (String clusterAlias : remoteClusterAlias()) {
final InternalTestCluster remoteCluster = cluster(clusterAlias);
remoteCluster.ensureAtLeastNumDataNodes(randomIntBetween(2, 3));
assertAcked(
.execute(refCountingListener.acquire(r -> {
assertAcked(r);
indexDocs(client(LOCAL_CLUSTER), localIndex, refCountingListener.acquire());
}));

int numShardsRemote = randomIntBetween(2, 10);
var remotes = remoteClusterAlias();
runInParallel(remotes.size(), i -> {
final String clusterAlias = remotes.get(i);
final InternalTestCluster remoteCluster = cluster(clusterAlias);
remoteCluster.ensureAtLeastNumDataNodes(randomIntBetween(2, 3));
client(clusterAlias).admin()
.indices()
.prepareCreate(remoteIndex)
.setSettings(indexSettings(numShardsRemote, randomIntBetween(0, 1)))
.setMapping("@timestamp", "type=date", "f", "type=text")
);
assertFalse(
client(clusterAlias).admin()
.cluster()
.prepareHealth(TEST_REQUEST_TIMEOUT, remoteIndex)
.setWaitForYellowStatus()
.setTimeout(TimeValue.timeValueSeconds(10))
.get()
.isTimedOut()
);
indexDocs(client(clusterAlias), remoteIndex);
.execute(refCountingListener.acquire(r -> {
assertAcked(r);
client(clusterAlias).admin()
.cluster()
.prepareHealth(TEST_REQUEST_TIMEOUT, remoteIndex)
.setWaitForYellowStatus()
.setTimeout(TimeValue.timeValueSeconds(10))
.execute(refCountingListener.acquire(healthResponse -> {
assertFalse(healthResponse.isTimedOut());
indexDocs(client(clusterAlias), remoteIndex, refCountingListener.acquire());
}));
}));
});
}

future.get();
Map<String, Object> clusterInfo = new HashMap<>();
clusterInfo.put("local.index", localIndex);
clusterInfo.put("remote.index", remoteIndex);
return clusterInfo;
}

private int indexDocs(Client client, String index) {
private void indexDocs(Client client, String index, ActionListener<Void> listener) {
int numDocs = between(5, 20);
final BulkRequestBuilder bulkRequest = client.prepareBulk();
for (int i = 0; i < numDocs; i++) {
client.prepareIndex(index).setSource("f", "v", "@timestamp", randomNonNegativeLong()).get();
bulkRequest.add(client.prepareIndex(index).setSource("f", "v", "@timestamp", randomNonNegativeLong()));
}
client.admin().indices().prepareRefresh(index).get();
return numDocs;
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).execute(listener.safeMap(r -> null));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
public class CrossClusterIT extends AbstractMultiClustersTestCase {

@Override
protected Collection<String> remoteClusterAlias() {
protected List<String> remoteClusterAlias() {
return List.of("cluster_a");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class CrossClusterSearchIT extends AbstractMultiClustersTestCase {
private static long LATEST_TIMESTAMP = 1691348820000L;

@Override
protected Collection<String> remoteClusterAlias() {
protected List<String> remoteClusterAlias() {
return List.of(REMOTE_CLUSTER);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
public class CrossClusterSearchLeakIT extends AbstractMultiClustersTestCase {

@Override
protected Collection<String> remoteClusterAlias() {
protected List<String> remoteClusterAlias() {
return List.of("cluster_a");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
public class CCSFieldCapabilitiesIT extends AbstractMultiClustersTestCase {

@Override
protected Collection<String> remoteClusterAlias() {
protected List<String> remoteClusterAlias() {
return List.of("remote_cluster");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -43,7 +42,7 @@ public class MinimalCompoundRetrieverIT extends AbstractMultiClustersTestCase {
private static final String REMOTE_CLUSTER = "cluster_a";

@Override
protected Collection<String> remoteClusterAlias() {
protected List<String> remoteClusterAlias() {
return List.of(REMOTE_CLUSTER);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand All @@ -58,7 +58,7 @@ public abstract class AbstractMultiClustersTestCase extends ESTestCase {

private static volatile ClusterGroup clusterGroup;

protected Collection<String> remoteClusterAlias() {
protected List<String> remoteClusterAlias() {
return randomSubsetOf(List.of("cluster-a", "cluster-b"));
}

Expand Down Expand Up @@ -100,17 +100,18 @@ public final void startClusters() throws Exception {
return;
}
stopClusters();
final Map<String, InternalTestCluster> clusters = new HashMap<>();
final Map<String, InternalTestCluster> clusters = new ConcurrentHashMap<>();
final List<String> clusterAliases = new ArrayList<>(remoteClusterAlias());
clusterAliases.add(LOCAL_CLUSTER);
for (String clusterAlias : clusterAliases) {
final List<Class<? extends Plugin>> mockPlugins = List.of(
MockHttpTransport.TestPlugin.class,
MockTransportService.TestPlugin.class,
getTestTransportPlugin()
);
runInParallel(clusterAliases.size(), i -> {
String clusterAlias = clusterAliases.get(i);
final String clusterName = clusterAlias.equals(LOCAL_CLUSTER) ? "main-cluster" : clusterAlias;
final int numberOfNodes = randomIntBetween(1, 3);
final List<Class<? extends Plugin>> mockPlugins = List.of(
MockHttpTransport.TestPlugin.class,
MockTransportService.TestPlugin.class,
getTestTransportPlugin()
);
final Collection<Class<? extends Plugin>> nodePlugins = nodePlugins(clusterAlias);

final NodeConfigurationSource nodeConfigurationSource = nodeConfigurationSource(nodeSettings(), nodePlugins);
Expand All @@ -128,10 +129,14 @@ public final void startClusters() throws Exception {
mockPlugins,
Function.identity()
);
cluster.beforeTest(random());
try {
cluster.beforeTest(random());
} catch (Exception e) {
throw new RuntimeException(e);
}
clusters.put(clusterAlias, cluster);
}
clusterGroup = new ClusterGroup(clusters);
});
clusterGroup = new ClusterGroup(Map.copyOf(clusters));
configureAndConnectsToRemoteClusters();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ protected boolean reuseClusters() {
}

@Override
protected Collection<String> remoteClusterAlias() {
protected List<String> remoteClusterAlias() {
return List.of(REMOTE1, REMOTE2);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class CrossClusterAsyncSearchIT extends AbstractMultiClustersTestCase {
private static final long LATEST_TIMESTAMP = 1691348820000L;

@Override
protected Collection<String> remoteClusterAlias() {
protected List<String> remoteClusterAlias() {
return List.of(REMOTE_CLUSTER);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public class CCSTermsEnumIT extends AbstractMultiClustersTestCase {

@Override
protected Collection<String> remoteClusterAlias() {
protected List<String> remoteClusterAlias() {
return List.of("remote_cluster");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class CrossClusterAsyncQueryIT extends AbstractMultiClustersTestCase {
private static final String INDEX_WITH_RUNTIME_MAPPING = "blocking";

@Override
protected Collection<String> remoteClusterAlias() {
protected List<String> remoteClusterAlias() {
return List.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class CrossClusterEnrichUnavailableClustersIT extends AbstractMultiCluste
public static String REMOTE_CLUSTER_2 = "c2";

@Override
protected Collection<String> remoteClusterAlias() {
protected List<String> remoteClusterAlias() {
return List.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2);
}

Expand Down
Loading

0 comments on commit eb0020f

Please sign in to comment.