From 9b86e66fcd1cd4570c24c85bb4b10d6311f0d284 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Fri, 13 Dec 2024 18:54:51 -0700 Subject: [PATCH] refactor test --- ...AbstractCrossClustersUsageTelemetryIT.java | 160 +++++++++++++++++ .../action/CrossClustersUsageTelemetryIT.java | 164 +----------------- ...rossClustersUsageTelemetryNoLicenseIT.java | 4 +- 3 files changed, 171 insertions(+), 157 deletions(-) create mode 100644 x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClustersUsageTelemetryIT.java diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClustersUsageTelemetryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClustersUsageTelemetryIT.java new file mode 100644 index 000000000000..7e49c1a0c399 --- /dev/null +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClustersUsageTelemetryIT.java @@ -0,0 +1,160 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.admin.cluster.stats.CCSTelemetrySnapshot; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.AbstractMultiClustersTestCase; +import org.elasticsearch.test.SkipUnavailableRule; +import org.elasticsearch.usage.UsageService; +import org.junit.Assert; +import org.junit.Rule; + +import java.util.*; +import java.util.concurrent.ExecutionException; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; + +public class AbstractCrossClustersUsageTelemetryIT extends AbstractMultiClustersTestCase { + private static final Logger LOGGER = LogManager.getLogger(AbstractCrossClustersUsageTelemetryIT.class); + private static final String REMOTE1 = "cluster-a"; + private static final String REMOTE2 = "cluster-b"; + private static final String LOCAL_INDEX = "logs-1"; + private static final String REMOTE_INDEX = "logs-2"; + + protected CCSTelemetrySnapshot getTelemetryFromQuery(String query, String client) throws ExecutionException, InterruptedException { + EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); + request.query(query); + request.pragmas(AbstractEsqlIntegTestCase.randomPragmas()); + request.columnar(randomBoolean()); + request.includeCCSMetadata(randomBoolean()); + return getTelemetryFromQuery(request, client); + } + + protected CCSTelemetrySnapshot getTelemetryFromQuery(EsqlQueryRequest request, String client) throws ExecutionException, + InterruptedException { + // We want to send search to a specific node (we don't care which one) so that we could + // collect the CCS telemetry from it later + String nodeName = cluster(LOCAL_CLUSTER).getRandomNodeName(); + // We don't care here too much about the response, we just want to trigger the telemetry collection. + // So we check it's not null and leave the rest to other tests. + if (client != null) { + assertResponse( + cluster(LOCAL_CLUSTER).client(nodeName) + .filterWithHeader(Map.of(Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER, client)) + .execute(EsqlQueryAction.INSTANCE, request), + Assert::assertNotNull + ); + + } else { + assertResponse(cluster(LOCAL_CLUSTER).client(nodeName).execute(EsqlQueryAction.INSTANCE, request), Assert::assertNotNull); + } + return getTelemetrySnapshot(nodeName); + } + + protected CCSTelemetrySnapshot getTelemetryFromFailedQuery(String query) throws Exception { + // We want to send search to a specific node (we don't care which one) so that we could + // collect the CCS telemetry from it later + String nodeName = cluster(LOCAL_CLUSTER).getRandomNodeName(); + EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); + request.query(query); + request.pragmas(AbstractEsqlIntegTestCase.randomPragmas()); + request.columnar(randomBoolean()); + request.includeCCSMetadata(randomBoolean()); + + ExecutionException ee = expectThrows( + ExecutionException.class, + cluster(LOCAL_CLUSTER).client(nodeName).execute(EsqlQueryAction.INSTANCE, request)::get + ); + assertNotNull(ee.getCause()); + + return getTelemetrySnapshot(nodeName); + } + + private CCSTelemetrySnapshot getTelemetrySnapshot(String nodeName) { + var usage = cluster(LOCAL_CLUSTER).getInstance(UsageService.class, nodeName); + return usage.getEsqlUsageHolder().getCCSTelemetrySnapshot(); + } + + @Override + protected boolean reuseClusters() { + return false; + } + + @Override + protected List remoteClusterAlias() { + return List.of(REMOTE1, REMOTE2); + } + + @Rule + public SkipUnavailableRule skipOverride = new SkipUnavailableRule(REMOTE1, REMOTE2); + + protected Map setupClusters() { + int numShardsLocal = randomIntBetween(1, 5); + populateLocalIndices(LOCAL_INDEX, numShardsLocal); + + int numShardsRemote = randomIntBetween(1, 5); + populateRemoteIndices(REMOTE1, REMOTE_INDEX, numShardsRemote); + + Map clusterInfo = new HashMap<>(); + clusterInfo.put("local.num_shards", numShardsLocal); + clusterInfo.put("local.index", LOCAL_INDEX); + clusterInfo.put("remote.num_shards", numShardsRemote); + clusterInfo.put("remote.index", REMOTE_INDEX); + + int numShardsRemote2 = randomIntBetween(1, 5); + populateRemoteIndices(REMOTE2, REMOTE_INDEX, numShardsRemote2); + clusterInfo.put("remote2.index", REMOTE_INDEX); + clusterInfo.put("remote2.num_shards", numShardsRemote2); + + return clusterInfo; + } + + void populateLocalIndices(String indexName, int numShards) { + Client localClient = client(LOCAL_CLUSTER); + assertAcked( + localClient.admin() + .indices() + .prepareCreate(indexName) + .setSettings(Settings.builder().put("index.number_of_shards", numShards)) + .setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long") + ); + for (int i = 0; i < 10; i++) { + localClient.prepareIndex(indexName).setSource("id", "local-" + i, "tag", "local", "v", i).get(); + } + localClient.admin().indices().prepareRefresh(indexName).get(); + } + + void populateRemoteIndices(String clusterAlias, String indexName, int numShards) { + Client remoteClient = client(clusterAlias); + assertAcked( + remoteClient.admin() + .indices() + .prepareCreate(indexName) + .setSettings(Settings.builder().put("index.number_of_shards", numShards)) + .setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long") + ); + for (int i = 0; i < 10; i++) { + remoteClient.prepareIndex(indexName).setSource("id", "remote-" + i, "tag", "remote", "v", i * i).get(); + } + remoteClient.admin().indices().prepareRefresh(indexName).get(); + } + + @Override + protected Map skipUnavailableForRemoteClusters() { + var map = skipOverride.getMap(); + LOGGER.info("Using skip_unavailable map: [{}]", map); + return map; + } +} diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersUsageTelemetryIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersUsageTelemetryIT.java index d5bc13583088..3dac8e43382c 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersUsageTelemetryIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersUsageTelemetryIT.java @@ -7,37 +7,24 @@ package org.elasticsearch.xpack.esql.action; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.admin.cluster.stats.CCSTelemetrySnapshot; -import org.elasticsearch.client.internal.Client; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.tasks.Task; -import org.elasticsearch.test.AbstractMultiClustersTestCase; -import org.elasticsearch.test.SkipUnavailableRule; -import org.elasticsearch.usage.UsageService; -import org.junit.Assert; -import org.junit.Rule; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; import static org.elasticsearch.action.admin.cluster.stats.CCSUsageTelemetry.ASYNC_FEATURE; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; import static org.hamcrest.Matchers.equalTo; -public class CrossClustersUsageTelemetryIT extends AbstractMultiClustersTestCase { - private static final Logger LOGGER = LogManager.getLogger(CrossClustersUsageTelemetryIT.class); - private static final String REMOTE1 = "cluster-a"; - private static final String REMOTE2 = "cluster-b"; - private static final String LOCAL_INDEX = "logs-1"; - private static final String REMOTE_INDEX = "logs-2"; +public class CrossClustersUsageTelemetryIT extends AbstractCrossClustersUsageTelemetryIT { + + @Override + protected Collection> nodePlugins(String clusterAlias) { + List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); + plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class); + plugins.add(CrossClustersQueryIT.InternalExchangePlugin.class); + return plugins; + } public void testLocalRemote() throws Exception { setupClusters(); @@ -66,137 +53,4 @@ public void testLocalRemote() throws Exception { } } - - protected CCSTelemetrySnapshot getTelemetryFromQuery(String query, String client) throws ExecutionException, InterruptedException { - EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); - request.query(query); - request.pragmas(AbstractEsqlIntegTestCase.randomPragmas()); - request.columnar(randomBoolean()); - request.includeCCSMetadata(randomBoolean()); - return getTelemetryFromQuery(request, client); - } - - protected CCSTelemetrySnapshot getTelemetryFromQuery(EsqlQueryRequest request, String client) throws ExecutionException, - InterruptedException { - // We want to send search to a specific node (we don't care which one) so that we could - // collect the CCS telemetry from it later - String nodeName = cluster(LOCAL_CLUSTER).getRandomNodeName(); - // We don't care here too much about the response, we just want to trigger the telemetry collection. - // So we check it's not null and leave the rest to other tests. - if (client != null) { - assertResponse( - cluster(LOCAL_CLUSTER).client(nodeName) - .filterWithHeader(Map.of(Task.X_ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER, client)) - .execute(EsqlQueryAction.INSTANCE, request), - Assert::assertNotNull - ); - - } else { - assertResponse(cluster(LOCAL_CLUSTER).client(nodeName).execute(EsqlQueryAction.INSTANCE, request), Assert::assertNotNull); - } - return getTelemetrySnapshot(nodeName); - } - - protected CCSTelemetrySnapshot getTelemetryFromFailedQuery(String query) throws Exception { - // We want to send search to a specific node (we don't care which one) so that we could - // collect the CCS telemetry from it later - String nodeName = cluster(LOCAL_CLUSTER).getRandomNodeName(); - EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); - request.query(query); - request.pragmas(AbstractEsqlIntegTestCase.randomPragmas()); - request.columnar(randomBoolean()); - request.includeCCSMetadata(randomBoolean()); - - ExecutionException ee = expectThrows( - ExecutionException.class, - cluster(LOCAL_CLUSTER).client(nodeName).execute(EsqlQueryAction.INSTANCE, request)::get - ); - assertNotNull(ee.getCause()); - - return getTelemetrySnapshot(nodeName); - } - - private CCSTelemetrySnapshot getTelemetrySnapshot(String nodeName) { - var usage = cluster(LOCAL_CLUSTER).getInstance(UsageService.class, nodeName); - return usage.getEsqlUsageHolder().getCCSTelemetrySnapshot(); - } - - @Override - protected boolean reuseClusters() { - return false; - } - - @Override - protected List remoteClusterAlias() { - return List.of(REMOTE1, REMOTE2); - } - - @Rule - public SkipUnavailableRule skipOverride = new SkipUnavailableRule(REMOTE1, REMOTE2); - - protected Map setupClusters() { - int numShardsLocal = randomIntBetween(1, 5); - populateLocalIndices(LOCAL_INDEX, numShardsLocal); - - int numShardsRemote = randomIntBetween(1, 5); - populateRemoteIndices(REMOTE1, REMOTE_INDEX, numShardsRemote); - - Map clusterInfo = new HashMap<>(); - clusterInfo.put("local.num_shards", numShardsLocal); - clusterInfo.put("local.index", LOCAL_INDEX); - clusterInfo.put("remote.num_shards", numShardsRemote); - clusterInfo.put("remote.index", REMOTE_INDEX); - - int numShardsRemote2 = randomIntBetween(1, 5); - populateRemoteIndices(REMOTE2, REMOTE_INDEX, numShardsRemote2); - clusterInfo.put("remote2.index", REMOTE_INDEX); - clusterInfo.put("remote2.num_shards", numShardsRemote2); - - return clusterInfo; - } - - void populateLocalIndices(String indexName, int numShards) { - Client localClient = client(LOCAL_CLUSTER); - assertAcked( - localClient.admin() - .indices() - .prepareCreate(indexName) - .setSettings(Settings.builder().put("index.number_of_shards", numShards)) - .setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long") - ); - for (int i = 0; i < 10; i++) { - localClient.prepareIndex(indexName).setSource("id", "local-" + i, "tag", "local", "v", i).get(); - } - localClient.admin().indices().prepareRefresh(indexName).get(); - } - - void populateRemoteIndices(String clusterAlias, String indexName, int numShards) { - Client remoteClient = client(clusterAlias); - assertAcked( - remoteClient.admin() - .indices() - .prepareCreate(indexName) - .setSettings(Settings.builder().put("index.number_of_shards", numShards)) - .setMapping("id", "type=keyword", "tag", "type=keyword", "v", "type=long") - ); - for (int i = 0; i < 10; i++) { - remoteClient.prepareIndex(indexName).setSource("id", "remote-" + i, "tag", "remote", "v", i * i).get(); - } - remoteClient.admin().indices().prepareRefresh(indexName).get(); - } - - @Override - protected Collection> nodePlugins(String clusterAlias) { - List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); - plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class); - plugins.add(CrossClustersQueryIT.InternalExchangePlugin.class); - return plugins; - } - - @Override - protected Map skipUnavailableForRemoteClusters() { - var map = skipOverride.getMap(); - LOGGER.info("Using skip_unavailable map: [{}]", map); - return map; - } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersUsageTelemetryNoLicenseIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersUsageTelemetryNoLicenseIT.java index a3ccda8916b0..2b993e947406 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersUsageTelemetryNoLicenseIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersUsageTelemetryNoLicenseIT.java @@ -17,13 +17,13 @@ import static org.hamcrest.Matchers.equalTo; -public class CrossClustersUsageTelemetryNoLicenseIT extends CrossClustersUsageTelemetryIT { +public class CrossClustersUsageTelemetryNoLicenseIT extends AbstractCrossClustersUsageTelemetryIT { @Override protected Collection> nodePlugins(String clusterAlias) { List> plugins = new ArrayList<>(super.nodePlugins(clusterAlias)); - plugins.remove(EsqlPluginWithEnterpriseOrTrialLicense.class); plugins.add(EsqlPluginWithNonEnterpriseOrExpiredLicense.class); + plugins.add(CrossClustersQueryIT.InternalExchangePlugin.class); return plugins; }