Skip to content

Commit

Permalink
Support "shared" test clusters
Browse files Browse the repository at this point in the history
This commit introduces the concept of shared tests clusters using the
new JUnit testing framework. Unlike normal test clusters which are
exclusive to each test suite (class), shared clusters persist across
test suites to be reused. This can be useful for test projects with a
large number of test suites that can all use a single cluster, and the
overhead of creating these clusters is the dominating factor.
  • Loading branch information
mark-vieira committed Oct 18, 2023
1 parent 65f91cf commit 011ccc8
Show file tree
Hide file tree
Showing 29 changed files with 371 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
*/
public class RestTestBasePlugin implements Plugin<Project> {

private static final String TESTS_MAX_PARALLEL_FORKS_SYSPROP = "tests.max.parallel.forks";
private static final String TESTS_RUNTIME_JAVA_SYSPROP = "tests.runtime.java";
private static final String DEFAULT_DISTRIBUTION_SYSPROP = "tests.default.distribution";
private static final String INTEG_TEST_DISTRIBUTION_SYSPROP = "tests.integ-test.distribution";
Expand Down Expand Up @@ -123,6 +124,7 @@ public void apply(Project project) {

// Enable parallel execution for these tests since each test gets its own cluster
task.setMaxParallelForks(task.getProject().getGradle().getStartParameter().getMaxWorkerCount() / 2);
nonInputSystemProperties.systemProperty(TESTS_MAX_PARALLEL_FORKS_SYSPROP, () -> String.valueOf(task.getMaxParallelForks()));

// Disable test failure reporting since this stuff is now captured in build scans
task.getInputs().property(ElasticsearchTestBasePlugin.DUMP_OUTPUT_ON_FAILURE_PROP_NAME, false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.test;

import com.carrotsearch.randomizedtesting.ThreadFilter;

/**
* When using shared test clusters we launch processes that persist across test suites. This filter is used to ignore those in that case.
*/
public class TestClustersThreadFilter implements ThreadFilter {
@Override
public boolean reject(Thread t) {
return t.getName().endsWith("-log-forwarder") || t.getName().contains("node-executor");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,19 +209,7 @@ public void initClient() throws IOException {
assert clusterHosts == null;
assert availableFeatures == null;
assert nodeVersions == null;
String cluster = getTestRestCluster();
String[] stringUrls = cluster.split(",");
List<HttpHost> hosts = new ArrayList<>(stringUrls.length);
for (String stringUrl : stringUrls) {
int portSeparator = stringUrl.lastIndexOf(':');
if (portSeparator < 0) {
throw new IllegalArgumentException("Illegal cluster url [" + stringUrl + "]");
}
String host = stringUrl.substring(0, portSeparator);
int port = Integer.valueOf(stringUrl.substring(portSeparator + 1));
hosts.add(buildHttpHost(host, port));
}
clusterHosts = unmodifiableList(hosts);
clusterHosts = parseClusterHosts(getTestRestCluster());
logger.info("initializing REST clients against {}", clusterHosts);
client = buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[clusterHosts.size()]));
adminClient = buildClient(restAdminSettings(), clusterHosts.toArray(new HttpHost[clusterHosts.size()]));
Expand Down Expand Up @@ -281,6 +269,21 @@ protected static boolean has(ProductFeature feature) {
return availableFeatures.contains(feature);
}

protected List<HttpHost> parseClusterHosts(String hostsString) {
String[] stringUrls = hostsString.split(",");
List<HttpHost> hosts = new ArrayList<>(stringUrls.length);
for (String stringUrl : stringUrls) {
int portSeparator = stringUrl.lastIndexOf(':');
if (portSeparator < 0) {
throw new IllegalArgumentException("Illegal cluster url [" + stringUrl + "]");
}
String host = stringUrl.substring(0, portSeparator);
int port = Integer.valueOf(stringUrl.substring(portSeparator + 1));
hosts.add(buildHttpHost(host, port));
}
return unmodifiableList(hosts);
}

protected String getTestRestCluster() {
String cluster = System.getProperty("tests.rest.cluster");
if (cluster == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public abstract class AbstractLocalClusterSpecBuilder<T extends ElasticsearchClu
LocalClusterSpecBuilder<T>> implements LocalClusterSpecBuilder<T> {

private String name = "test-cluster";
private boolean shared = false;
private final List<DefaultLocalNodeSpecBuilder> nodeBuilders = new ArrayList<>();
private final List<User> users = new ArrayList<>();
private final List<Resource> roleFiles = new ArrayList<>();
Expand Down Expand Up @@ -107,12 +108,34 @@ public AbstractLocalClusterSpecBuilder<T> rolesFile(Resource rolesFile) {
return this;
}

@Override
public LocalClusterSpecBuilder<T> shared(Boolean isShared) {
if (Integer.parseInt(System.getProperty("tests.max.parallel.forks")) > 1) {
String taskPath = System.getProperty("tests.task");
String project = taskPath.substring(0, taskPath.lastIndexOf(':'));
String taskName = taskPath.substring(taskPath.lastIndexOf(':') + 1);

throw new IllegalStateException(
"Parallel test execution is not supported for shared clusters. Configure the build script for project '"
+ project
+ "':\n\n"
+ "tasks.named('"
+ taskName
+ "') {\n"
+ " maxParallelForks = 1\n"
+ "}"
);
}
this.shared = isShared;
return this;
}

protected LocalClusterSpec buildClusterSpec() {
// Apply lazily provided configuration
lazyConfigProviders.forEach(s -> s.get().apply(this));

List<User> clusterUsers = users.isEmpty() ? List.of(User.DEFAULT_USER) : users;
LocalClusterSpec clusterSpec = new LocalClusterSpec(name, clusterUsers, roleFiles);
LocalClusterSpec clusterSpec = new LocalClusterSpec(name, clusterUsers, roleFiles, shared);
List<LocalNodeSpec> nodeSpecs;

if (nodeBuilders.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import org.junit.runners.model.Statement;

import java.io.InputStream;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.function.Supplier;

public class DefaultLocalElasticsearchCluster<S extends LocalClusterSpec, H extends LocalClusterHandle> implements ElasticsearchCluster {
Expand All @@ -32,13 +35,20 @@ public Statement apply(Statement base, Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
S spec = specProvider.get();
try {
S spec = specProvider.get();
handle = clusterFactory.create(spec);
handle.start();
if (spec.isShared() == false || handle == null) {
if (spec.isShared()) {
maybeCheckThreadLeakFilters(description);
}
handle = clusterFactory.create(spec);
handle.start();
}
base.evaluate();
} finally {
close();
if (spec.isShared() == false) {
close();
}
}
}
};
Expand Down Expand Up @@ -155,4 +165,32 @@ protected void checkHandle() {
throw new IllegalStateException("Cluster handle has not been initialized. Did you forget the @ClassRule annotation?");
}
}

/**
* Check for {@code TestClustersThreadFilter} if necessary. We use reflection here to avoid a dependency on randomized runner.
*/
@SuppressWarnings("unchecked")
private void maybeCheckThreadLeakFilters(Description description) {
try {
Class<? extends Annotation> threadLeakFiltersClass = (Class<? extends Annotation>) Class.forName(
"com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters"
);
Annotation[] annotations = description.getTestClass().getAnnotationsByType(threadLeakFiltersClass);
for (Annotation annotation : annotations) {
try {
Class<?>[] classes = (Class<?>[]) annotation.getClass().getMethod("filters").invoke(annotation);
if (Arrays.stream(classes).noneMatch(c -> c.getName().equals("org.elasticsearch.test.TestClustersThreadFilter"))) {
throw new IllegalStateException(
"TestClustersThreadFilter is required when using shard clusters. Annotate your test with the following:\n\n"
+ " @ThreadLeakFilters(filters = TestClustersThreadFilter.class)\n"
);
}
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
throw new RuntimeException("Unable to inspect filters on " + annotation, e);
}
}
} catch (ClassNotFoundException e) {
// If randomized runner isn't on the classpath then we don't care
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ public class LocalClusterSpec implements ClusterSpec {
private final String name;
private final List<User> users;
private final List<Resource> roleFiles;
private final boolean shared;
private List<LocalNodeSpec> nodes;

public LocalClusterSpec(String name, List<User> users, List<Resource> roleFiles) {
public LocalClusterSpec(String name, List<User> users, List<Resource> roleFiles, boolean shared) {
this.name = name;
this.users = users;
this.roleFiles = roleFiles;
this.shared = shared;
}

public String getName() {
Expand All @@ -51,6 +53,10 @@ public List<LocalNodeSpec> getNodes() {
return nodes;
}

public boolean isShared() {
return shared;
}

public void setNodes(List<LocalNodeSpec> nodes) {
this.nodes = nodes;
}
Expand Down Expand Up @@ -281,7 +287,7 @@ public Map<String, String> resolveEnvironment() {
* @return a new local node spec
*/
private LocalNodeSpec getFilteredSpec(SettingsProvider filteredProvider, SettingsProvider filteredKeystoreProvider) {
LocalClusterSpec newCluster = new LocalClusterSpec(cluster.name, cluster.users, cluster.roleFiles);
LocalClusterSpec newCluster = new LocalClusterSpec(cluster.name, cluster.users, cluster.roleFiles, cluster.shared);

List<LocalNodeSpec> nodeSpecs = cluster.nodes.stream()
.map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,13 @@ public interface LocalClusterSpecBuilder<T extends ElasticsearchCluster> extends
*/
LocalClusterSpecBuilder<T> rolesFile(Resource rolesFile);

/**
* Configure whether this cluster should be shared across test suites (classes). If set to {@code true} then the cluster will not be
* shut down or recreated before the next test suite begins execution. This setting is {@code false} by default.
*
* @param isShared whether the cluster should be shared
*/
LocalClusterSpecBuilder<T> shared(Boolean isShared);

T build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,6 @@ private static void startLoggingThread(InputStream is, Consumer<String> logAppen
} catch (IOException e) {
throw new UncheckedIOException("Error reading output from process.", e);
}
}, name).start();
}, name + "-log-forwarder").start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Before;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

import static org.elasticsearch.common.Strings.hasText;

Expand All @@ -36,19 +37,14 @@ public abstract class RemoteClusterAwareEqlRestTestCase extends ESRestTestCase {
// client used for loading data on a remote cluster only.
private static RestClient remoteClient;

@BeforeClass
public static void initRemoteClients() throws IOException {
String crossClusterHost = System.getProperty("tests.rest.cluster.remote.host"); // gradle defined
if (crossClusterHost != null) {
int portSeparator = crossClusterHost.lastIndexOf(':');
if (portSeparator < 0) {
throw new IllegalArgumentException("Illegal cluster url [" + crossClusterHost + "]");
@Before
public void initRemoteClients() throws IOException {
if (remoteClient == null) {
String crossClusterHost = getRemoteCluster();
if (crossClusterHost != null) {
List<HttpHost> httpHosts = parseClusterHosts(crossClusterHost);
remoteClient = clientBuilder(secureRemoteClientSettings(), httpHosts.toArray(new HttpHost[0]));
}
String host = crossClusterHost.substring(0, portSeparator);
int port = Integer.parseInt(crossClusterHost.substring(portSeparator + 1));
HttpHost[] remoteHttpHosts = new HttpHost[] { new HttpHost(host, port) };

remoteClient = clientBuilder(secureRemoteClientSettings(), remoteHttpHosts);
}
}

Expand All @@ -61,6 +57,10 @@ public static void closeRemoteClients() throws IOException {
}
}

protected String getRemoteCluster() {
return System.getProperty("tests.rest.cluster.remote.host");
}

protected static RestHighLevelClient highLevelClient(RestClient client) {
return new RestHighLevelClient(client, ignore -> {}, Collections.emptyList()) {
};
Expand Down Expand Up @@ -123,8 +123,8 @@ protected Settings restClientSettings() {
}

protected static Settings secureRemoteClientSettings() {
String user = System.getProperty("tests.rest.cluster.remote.user"); // gradle defined
String pass = System.getProperty("tests.rest.cluster.remote.password");
String user = System.getProperty("tests.rest.cluster.remote.user", "test_user"); // gradle defined
String pass = System.getProperty("tests.rest.cluster.remote.password", "x-pack-test-password");
if (hasText(user) && hasText(pass)) {
String token = basicAuthHeaderValue(user, new SecureString(pass.toCharArray()));
return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build();
Expand Down
49 changes: 4 additions & 45 deletions x-pack/plugin/eql/qa/multi-cluster-with-security/build.gradle
Original file line number Diff line number Diff line change
@@ -1,51 +1,10 @@
import org.elasticsearch.gradle.testclusters.DefaultTestClustersTask

apply plugin: 'elasticsearch.legacy-java-rest-test'
apply plugin: 'elasticsearch.internal-java-rest-test'

dependencies {
javaRestTestImplementation project(path: xpackModule('eql:qa:common'))
}

def remoteClusterReg = testClusters.register('remote-cluster') {
testDistribution = 'DEFAULT'
numberOfNodes = 2
setting 'node.roles', '[data,ingest,master]'
setting 'xpack.ml.enabled', 'false'
setting 'xpack.watcher.enabled', 'false'
setting 'xpack.security.enabled', 'true'
setting 'xpack.security.autoconfiguration.enabled', 'false'

user username: "test_user", password: "x-pack-test-password"
}

def integTestClusterReg = testClusters.register('javaRestTest') {
testDistribution = 'DEFAULT'
setting 'xpack.ml.enabled', 'false'
setting 'xpack.watcher.enabled', 'false'
setting 'cluster.remote.my_remote_cluster.seeds', {
remoteClusterReg.get().getAllTransportPortURI().collect { "\"$it\"" }.toString()
}
setting 'cluster.remote.connections_per_cluster', "1"
setting 'xpack.security.enabled', 'true'
setting 'xpack.security.autoconfiguration.enabled', 'false'

user username: "test_user", password: "x-pack-test-password"
}

tasks.register("startRemoteCluster", DefaultTestClustersTask.class) {
useCluster remoteClusterReg
doLast {
"Starting remote cluster before integ tests and integTest cluster is started"
}
}

tasks.named("javaRestTest").configure {
dependsOn 'startRemoteCluster'
useCluster remoteClusterReg
doFirst {
nonInputProperties.systemProperty 'tests.rest.cluster.remote.host', remoteClusterReg.map(c->c.getAllHttpSocketURI().get(0))
nonInputProperties.systemProperty 'tests.rest.cluster.remote.user', "test_user"
nonInputProperties.systemProperty 'tests.rest.cluster.remote.password', "x-pack-test-password"
}
tasks.named('javaRestTest') {
usesDefaultDistribution()
maxParallelForks = 1
}
tasks.named("check").configure {dependsOn("javaRestTest") } // run these tests as part of the "check" task
Loading

0 comments on commit 011ccc8

Please sign in to comment.