Skip to content

Commit

Permalink
Fix BWC for ES|QL cluster request (#117865)
Browse files Browse the repository at this point in the history
We identified a BWC bug in the cluster computer request. Specifically, 
the indices options were not properly selected for requests from an
older querying cluster. This caused the search_shards API on the remote
cluster to use restricted indices options, leading to failures when
resolving wildcard index patterns.

Our tests didn't catch this issue because the current BWC tests for 
cross-cluster queries only cover one direction: the querying cluster on
the current version and the remote cluster on a compatible version.

This PR fixes the issue and expands BWC tests to support both 
directions: the querying cluster on the current version with the remote
cluster on a compatible version, and vice versa.
  • Loading branch information
dnhatn authored Dec 3, 2024
1 parent 5ed106a commit 267dc1a
Show file tree
Hide file tree
Showing 9 changed files with 345 additions and 42 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/117865.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 117865
summary: Fix BWC for ES|QL cluster request
area: ES|QL
type: bug
issues: []
17 changes: 15 additions & 2 deletions x-pack/plugin/esql/qa/server/multi-clusters/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,22 @@ def supportedVersion = bwcVersion -> {
}

buildParams.bwcVersions.withWireCompatible(supportedVersion) { bwcVersion, baseName ->
tasks.register(bwcTaskName(bwcVersion), StandaloneRestIntegTestTask) {
tasks.register("${baseName}#newToOld", StandaloneRestIntegTestTask) {
usesBwcDistribution(bwcVersion)
systemProperty("tests.version.remote_cluster", bwcVersion)
maxParallelForks = 1
}

tasks.register("${baseName}#oldToNew", StandaloneRestIntegTestTask) {
usesBwcDistribution(bwcVersion)
systemProperty("tests.old_cluster_version", bwcVersion)
systemProperty("tests.version.local_cluster", bwcVersion)
maxParallelForks = 1
}

// TODO: avoid running tests twice with the current version
tasks.register(bwcTaskName(bwcVersion), StandaloneRestIntegTestTask) {
dependsOn tasks.named("${baseName}#oldToNew")
dependsOn tasks.named("${baseName}#newToOld")
maxParallelForks = 1
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public static ElasticsearchCluster remoteCluster() {
return ElasticsearchCluster.local()
.name(REMOTE_CLUSTER_NAME)
.distribution(DistributionType.DEFAULT)
.version(Version.fromString(System.getProperty("tests.old_cluster_version")))
.version(distributionVersion("tests.version.remote_cluster"))
.nodes(2)
.setting("node.roles", "[data,ingest,master]")
.setting("xpack.security.enabled", "false")
Expand All @@ -34,7 +34,7 @@ public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteClust
return ElasticsearchCluster.local()
.name(LOCAL_CLUSTER_NAME)
.distribution(DistributionType.DEFAULT)
.version(Version.CURRENT)
.version(distributionVersion("tests.version.local_cluster"))
.nodes(2)
.setting("xpack.security.enabled", "false")
.setting("xpack.license.self_generated.type", "trial")
Expand All @@ -46,7 +46,18 @@ public static ElasticsearchCluster localCluster(ElasticsearchCluster remoteClust
.build();
}

public static org.elasticsearch.Version oldVersion() {
return org.elasticsearch.Version.fromString(System.getProperty("tests.old_cluster_version"));
public static org.elasticsearch.Version localClusterVersion() {
String prop = System.getProperty("tests.version.local_cluster");
return prop != null ? org.elasticsearch.Version.fromString(prop) : org.elasticsearch.Version.CURRENT;
}

public static org.elasticsearch.Version remoteClusterVersion() {
String prop = System.getProperty("tests.version.remote_cluster");
return prop != null ? org.elasticsearch.Version.fromString(prop) : org.elasticsearch.Version.CURRENT;
}

private static Version distributionVersion(String key) {
final String val = System.getProperty(key);
return val != null ? Version.fromString(val) : Version.CURRENT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.apache.http.HttpHost;
import org.elasticsearch.Version;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.test.TestClustersThreadFilter;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.xpack.esql.qa.rest.EsqlRestValidationTestCase;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
Expand Down Expand Up @@ -78,4 +80,9 @@ private RestClient remoteClusterClient() throws IOException {
}
return remoteClient;
}

@Before
public void skipTestOnOldVersions() {
assumeTrue("skip on old versions", Clusters.localClusterVersion().equals(Version.V_8_16_0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
Expand Down Expand Up @@ -118,10 +119,8 @@ protected void shouldSkipTest(String testName) throws IOException {
// Do not run tests including "METADATA _index" unless marked with metadata_fields_remote_test,
// because they may produce inconsistent results with multiple clusters.
assumeFalse("can't test with _index metadata", (remoteMetadata == false) && hasIndexMetadata(testCase.query));
assumeTrue(
"Test " + testName + " is skipped on " + Clusters.oldVersion(),
isEnabled(testName, instructions, Clusters.oldVersion())
);
Version oldVersion = Version.min(Clusters.localClusterVersion(), Clusters.remoteClusterVersion());
assumeTrue("Test " + testName + " is skipped on " + oldVersion, isEnabled(testName, instructions, oldVersion));
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS.capabilityName()));
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(INLINESTATS_V2.capabilityName()));
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_PLANNING_V1.capabilityName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;

import org.apache.http.HttpHost;
import org.elasticsearch.Version;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
Expand All @@ -29,7 +30,6 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -127,10 +127,12 @@ void indexDocs(RestClient client, String index, List<Doc> docs) throws IOExcepti
}

private Map<String, Object> run(String query, boolean includeCCSMetadata) throws IOException {
Map<String, Object> resp = runEsql(
new RestEsqlTestCase.RequestObjectBuilder().query(query).includeCCSMetadata(includeCCSMetadata).build()
);
logger.info("--> query {} response {}", query, resp);
var queryBuilder = new RestEsqlTestCase.RequestObjectBuilder().query(query);
if (includeCCSMetadata) {
queryBuilder.includeCCSMetadata(true);
}
Map<String, Object> resp = runEsql(queryBuilder.build());
logger.info("--> query {} response {}", queryBuilder, resp);
return resp;
}

Expand All @@ -156,7 +158,7 @@ private Map<String, Object> runEsql(RestEsqlTestCase.RequestObjectBuilder reques

public void testCount() throws Exception {
{
boolean includeCCSMetadata = randomBoolean();
boolean includeCCSMetadata = includeCCSMetadata();
Map<String, Object> result = run("FROM test-local-index,*:test-remote-index | STATS c = COUNT(*)", includeCCSMetadata);
var columns = List.of(Map.of("name", "c", "type", "long"));
var values = List.of(List.of(localDocs.size() + remoteDocs.size()));
Expand All @@ -165,13 +167,16 @@ public void testCount() throws Exception {
if (includeCCSMetadata) {
mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
}
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
if (ccsMetadataAvailable()) {
mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
}
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values));
if (includeCCSMetadata) {
assertClusterDetailsMap(result, false);
}
}
{
boolean includeCCSMetadata = randomBoolean();
boolean includeCCSMetadata = includeCCSMetadata();
Map<String, Object> result = run("FROM *:test-remote-index | STATS c = COUNT(*)", includeCCSMetadata);
var columns = List.of(Map.of("name", "c", "type", "long"));
var values = List.of(List.of(remoteDocs.size()));
Expand All @@ -180,7 +185,10 @@ public void testCount() throws Exception {
if (includeCCSMetadata) {
mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
}
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
if (ccsMetadataAvailable()) {
mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
}
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values));
if (includeCCSMetadata) {
assertClusterDetailsMap(result, true);
}
Expand All @@ -189,7 +197,7 @@ public void testCount() throws Exception {

public void testUngroupedAggs() throws Exception {
{
boolean includeCCSMetadata = randomBoolean();
boolean includeCCSMetadata = includeCCSMetadata();
Map<String, Object> result = run("FROM test-local-index,*:test-remote-index | STATS total = SUM(data)", includeCCSMetadata);
var columns = List.of(Map.of("name", "total", "type", "long"));
long sum = Stream.concat(localDocs.stream(), remoteDocs.stream()).mapToLong(d -> d.data).sum();
Expand All @@ -200,13 +208,16 @@ public void testUngroupedAggs() throws Exception {
if (includeCCSMetadata) {
mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
}
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
if (ccsMetadataAvailable()) {
mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
}
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values));
if (includeCCSMetadata) {
assertClusterDetailsMap(result, false);
}
}
{
boolean includeCCSMetadata = randomBoolean();
boolean includeCCSMetadata = includeCCSMetadata();
Map<String, Object> result = run("FROM *:test-remote-index | STATS total = SUM(data)", includeCCSMetadata);
var columns = List.of(Map.of("name", "total", "type", "long"));
long sum = remoteDocs.stream().mapToLong(d -> d.data).sum();
Expand All @@ -216,12 +227,16 @@ public void testUngroupedAggs() throws Exception {
if (includeCCSMetadata) {
mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
}
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
if (ccsMetadataAvailable()) {
mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
}
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values));
if (includeCCSMetadata) {
assertClusterDetailsMap(result, true);
}
}
{
assumeTrue("requires ccs metadata", ccsMetadataAvailable());
Map<String, Object> result = runWithColumnarAndIncludeCCSMetadata("FROM *:test-remote-index | STATS total = SUM(data)");
var columns = List.of(Map.of("name", "total", "type", "long"));
long sum = remoteDocs.stream().mapToLong(d -> d.data).sum();
Expand Down Expand Up @@ -293,7 +308,7 @@ private void assertClusterDetailsMap(Map<String, Object> result, boolean remoteO

public void testGroupedAggs() throws Exception {
{
boolean includeCCSMetadata = randomBoolean();
boolean includeCCSMetadata = includeCCSMetadata();
Map<String, Object> result = run(
"FROM test-local-index,*:test-remote-index | STATS total = SUM(data) BY color | SORT color",
includeCCSMetadata
Expand All @@ -311,13 +326,16 @@ public void testGroupedAggs() throws Exception {
if (includeCCSMetadata) {
mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
}
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
if (ccsMetadataAvailable()) {
mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
}
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values));
if (includeCCSMetadata) {
assertClusterDetailsMap(result, false);
}
}
{
boolean includeCCSMetadata = randomBoolean();
boolean includeCCSMetadata = includeCCSMetadata();
Map<String, Object> result = run(
"FROM *:test-remote-index | STATS total = SUM(data) by color | SORT color",
includeCCSMetadata
Expand All @@ -336,29 +354,57 @@ public void testGroupedAggs() throws Exception {
if (includeCCSMetadata) {
mapMatcher = mapMatcher.entry("_clusters", any(Map.class));
}
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values).entry("took", greaterThanOrEqualTo(0)));
if (ccsMetadataAvailable()) {
mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
}
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values));
if (includeCCSMetadata) {
assertClusterDetailsMap(result, true);
}
}
}

public void testIndexPattern() throws Exception {
{
String indexPattern = randomFrom(
"test-local-index,*:test-remote-index",
"test-local-index,*:test-remote-*",
"test-local-index,*:test-*",
"test-*,*:test-remote-index"
);
Map<String, Object> result = run("FROM " + indexPattern + " | STATS c = COUNT(*)", false);
var columns = List.of(Map.of("name", "c", "type", "long"));
var values = List.of(List.of(localDocs.size() + remoteDocs.size()));
MapMatcher mapMatcher = matchesMap();
if (ccsMetadataAvailable()) {
mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
}
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values));
}
{
String indexPattern = randomFrom("*:test-remote-index", "*:test-remote-*", "*:test-*");
Map<String, Object> result = run("FROM " + indexPattern + " | STATS c = COUNT(*)", false);
var columns = List.of(Map.of("name", "c", "type", "long"));
var values = List.of(List.of(remoteDocs.size()));

MapMatcher mapMatcher = matchesMap();
if (ccsMetadataAvailable()) {
mapMatcher = mapMatcher.entry("took", greaterThanOrEqualTo(0));
}
assertMap(result, mapMatcher.entry("columns", columns).entry("values", values));
}
}

private RestClient remoteClusterClient() throws IOException {
var clusterHosts = parseClusterHosts(remoteCluster.getHttpAddresses());
return buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[0]));
}

private TestFeatureService remoteFeaturesService() throws IOException {
if (remoteFeaturesService == null) {
try (RestClient remoteClient = remoteClusterClient()) {
var remoteNodeVersions = readVersionsFromNodesInfo(remoteClient);
var semanticNodeVersions = remoteNodeVersions.stream()
.map(ESRestTestCase::parseLegacyVersion)
.flatMap(Optional::stream)
.collect(Collectors.toSet());
remoteFeaturesService = createTestFeatureService(getClusterStateFeatures(remoteClient), semanticNodeVersions);
}
}
return remoteFeaturesService;
private static boolean ccsMetadataAvailable() {
return Clusters.localClusterVersion().onOrAfter(Version.V_8_16_0);
}

private static boolean includeCCSMetadata() {
return ccsMetadataAvailable() && randomBoolean();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ public void testBasicEsql() throws IOException {
indexTimestampData(1);

RequestObjectBuilder builder = requestObjectBuilder().query(fromIndex() + " | stats avg(value)");
requestObjectBuilder().includeCCSMetadata(randomBoolean());
if (Build.current().isSnapshot()) {
builder.pragmas(Settings.builder().put("data_partitioning", "shard").build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;

import java.io.IOException;
import java.util.Arrays;
import java.util.Objects;

record RemoteClusterPlan(PhysicalPlan plan, String[] targetIndices, OriginalIndices originalIndices) {
static RemoteClusterPlan from(PlanStreamInput planIn) throws IOException {
Expand All @@ -24,7 +26,8 @@ static RemoteClusterPlan from(PlanStreamInput planIn) throws IOException {
if (planIn.getTransportVersion().onOrAfter(TransportVersions.ESQL_ORIGINAL_INDICES)) {
originalIndices = OriginalIndices.readOriginalIndices(planIn);
} else {
originalIndices = new OriginalIndices(planIn.readStringArray(), IndicesOptions.strictSingleIndexNoExpandForbidClosed());
// fallback to the previous behavior
originalIndices = new OriginalIndices(planIn.readStringArray(), SearchRequest.DEFAULT_INDICES_OPTIONS);
}
return new RemoteClusterPlan(plan, targetIndices, originalIndices);
}
Expand All @@ -38,4 +41,18 @@ public void writeTo(PlanStreamOutput out) throws IOException {
out.writeStringArray(originalIndices.indices());
}
}

@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
RemoteClusterPlan that = (RemoteClusterPlan) o;
return Objects.equals(plan, that.plan)
&& Objects.deepEquals(targetIndices, that.targetIndices)
&& Objects.equals(originalIndices, that.originalIndices);
}

@Override
public int hashCode() {
return Objects.hash(plan, Arrays.hashCode(targetIndices), originalIndices);
}
}
Loading

0 comments on commit 267dc1a

Please sign in to comment.