Skip to content

Commit

Permalink
Merge branch 'main' into avoid-cluster-state
Browse files Browse the repository at this point in the history
  • Loading branch information
albertzaharovits committed Sep 3, 2024
2 parents da3fde0 + f83d6be commit ab2cd17
Show file tree
Hide file tree
Showing 42 changed files with 632 additions and 53 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/111226.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 111226
summary: "ES|QL: add Telemetry API and track top functions"
area: ES|QL
type: enhancement
issues: []
12 changes: 12 additions & 0 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,18 @@ tests:
- class: org.elasticsearch.xpack.spatial.index.query.LegacyGeoShapeWithDocValuesQueryTests
method: testIndexPointsFromLine
issue: https://github.com/elastic/elasticsearch/issues/112438
- class: org.elasticsearch.xpack.esql.expression.function.aggregate.SpatialCentroidTests
method: "testAggregateIntermediate {TestCase=<geo_point> #2}"
issue: https://github.com/elastic/elasticsearch/issues/112461
- class: org.elasticsearch.xpack.esql.expression.function.aggregate.SpatialCentroidTests
method: testAggregateIntermediate {TestCase=<geo_point>}
issue: https://github.com/elastic/elasticsearch/issues/112463
- class: org.elasticsearch.xpack.esql.action.ManyShardsIT
method: testRejection
issue: https://github.com/elastic/elasticsearch/issues/112406
- class: org.elasticsearch.xpack.esql.action.ManyShardsIT
method: testConcurrentQueries
issue: https://github.com/elastic/elasticsearch/issues/112424

# Examples:
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ It is preferable to use `EsqlCapabilities` for new features, although all existi

### Warnings

Some queries can return warnings, eg. for number overflows or when a multi-value is passed to a funciton
Some queries can return warnings, eg. for number overflows or when a multi-value is passed to a function
that does not support it.

Each CSV-SPEC test has to also assert all the expected warnings.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.action;

import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.telemetry.Measurement;
import org.elasticsearch.telemetry.TestTelemetryPlugin;
import org.elasticsearch.xpack.esql.stats.PlanningMetricsManager;
import org.junit.Before;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

public class TelemetryIT extends AbstractEsqlIntegTestCase {

record Test(String query, Map<String, Integer> expectedCommands, Map<String, Integer> expectedFunctions, boolean success) {}

private final Test testCase;

public TelemetryIT(@Name("TestCase") Test test) {
this.testCase = test;
}

@ParametersFactory
public static Iterable<Object[]> parameters() {
return List.of(
new Object[] {
new Test(
"""
FROM idx
| EVAL ip = to_ip(host), x = to_string(host), y = to_string(host)
| STATS s = COUNT(*) by ip
| KEEP ip
| EVAL a = 10""",
Map.ofEntries(Map.entry("FROM", 1), Map.entry("EVAL", 2), Map.entry("STATS", 1), Map.entry("KEEP", 1)),
Map.ofEntries(Map.entry("TO_IP", 1), Map.entry("TO_STRING", 2), Map.entry("COUNT", 1)),
true
) },
new Object[] {
new Test(
"FROM idx | EVAL ip = to_ip(host), x = to_string(host), y = to_string(host) "
+ "| STATS s = COUNT(*) by ip | KEEP ip | EVAL a = non_existing",
Map.ofEntries(Map.entry("FROM", 1), Map.entry("EVAL", 2), Map.entry("STATS", 1), Map.entry("KEEP", 1)),
Map.ofEntries(Map.entry("TO_IP", 1), Map.entry("TO_STRING", 2), Map.entry("COUNT", 1)),
false
) },
new Object[] {
new Test(
"""
FROM idx
| EVAL ip = to_ip(host), x = to_string(host), y = to_string(host)
| EVAL ip = to_ip(host), x = to_string(host), y = to_string(host)
| STATS s = COUNT(*) by ip | KEEP ip | EVAL a = 10
""",
Map.ofEntries(Map.entry("FROM", 1), Map.entry("EVAL", 3), Map.entry("STATS", 1), Map.entry("KEEP", 1)),
Map.ofEntries(Map.entry("TO_IP", 2), Map.entry("TO_STRING", 4), Map.entry("COUNT", 1)),
true
) },
new Object[] {
new Test(
"""
FROM idx | EVAL ip = to_ip(host), x = to_string(host), y = to_string(host)
| WHERE id is not null AND id > 100 AND host RLIKE \".*foo\"
| eval a = 10
| drop host
| rename a as foo
| DROP foo
""", // lowercase on purpose
Map.ofEntries(
Map.entry("FROM", 1),
Map.entry("EVAL", 2),
Map.entry("WHERE", 1),
Map.entry("DROP", 2),
Map.entry("RENAME", 1)
),
Map.ofEntries(Map.entry("TO_IP", 1), Map.entry("TO_STRING", 2)),
true
) },
new Object[] {
new Test(
"""
FROM idx
| EVAL ip = to_ip(host), x = to_string(host), y = to_string(host)
| GROK host "%{WORD:name} %{WORD}"
| DISSECT host "%{surname}"
""",
Map.ofEntries(Map.entry("FROM", 1), Map.entry("EVAL", 1), Map.entry("GROK", 1), Map.entry("DISSECT", 1)),
Map.ofEntries(Map.entry("TO_IP", 1), Map.entry("TO_STRING", 2)),
true
) },
new Object[] {
new Test("METRICS idx | LIMIT 10", Map.ofEntries(Map.entry("METRICS", 1), Map.entry("LIMIT", 1)), Map.ofEntries(), true) },
new Object[] {
new Test(
"METRICS idx max(id) BY host | LIMIT 10",
Map.ofEntries(Map.entry("METRICS", 1), Map.entry("LIMIT", 1), Map.entry("FROM TS", 1)),
Map.ofEntries(Map.entry("MAX", 1)),
true
) },
new Object[] {
new Test(
"""
FROM idx
| EVAL ip = to_ip(host), x = to_string(host), y = to_string(host)
| INLINESTATS max(id)
""",
Map.ofEntries(Map.entry("FROM", 1), Map.entry("EVAL", 1), Map.entry("INLINESTATS", 1)),
Map.ofEntries(Map.entry("MAX", 1), Map.entry("TO_IP", 1), Map.entry("TO_STRING", 2)),
true
) }
);
}

@Before
public void init() {
DiscoveryNode dataNode = randomDataNode();
final String nodeName = dataNode.getName();
loadData(nodeName);
}

public void testMetrics() throws Exception {
DiscoveryNode dataNode = randomDataNode();
testQuery(dataNode, testCase);
}

private static void testQuery(DiscoveryNode dataNode, Test test) throws InterruptedException {
testQuery(dataNode, test.query, test.success, test.expectedCommands, test.expectedFunctions);
}

private static void testQuery(
DiscoveryNode dataNode,
String query,
Boolean success,
Map<String, Integer> expectedCommands,
Map<String, Integer> expectedFunctions
) throws InterruptedException {
final var plugins = internalCluster().getInstance(PluginsService.class, dataNode.getName())
.filterPlugins(TestTelemetryPlugin.class)
.toList();
assertThat(plugins, hasSize(1));
TestTelemetryPlugin plugin = plugins.get(0);

try {
int successIterations = randomInt(10);
for (int i = 0; i < successIterations; i++) {
EsqlQueryRequest request = executeQuery(query);
CountDownLatch latch = new CountDownLatch(1);

final long iteration = i + 1;
client(dataNode.getName()).execute(EsqlQueryAction.INSTANCE, request, ActionListener.running(() -> {
try {
// test total commands used
final List<Measurement> commandMeasurementsAll = measurements(plugin, PlanningMetricsManager.FEATURE_METRICS_ALL);
assertAllUsages(expectedCommands, commandMeasurementsAll, iteration, success);

// test num of queries using a command
final List<Measurement> commandMeasurements = measurements(plugin, PlanningMetricsManager.FEATURE_METRICS);
assertUsageInQuery(expectedCommands, commandMeasurements, iteration, success);

// test total functions used
final List<Measurement> functionMeasurementsAll = measurements(plugin, PlanningMetricsManager.FUNCTION_METRICS_ALL);
assertAllUsages(expectedFunctions, functionMeasurementsAll, iteration, success);

// test number of queries using a function
final List<Measurement> functionMeasurements = measurements(plugin, PlanningMetricsManager.FUNCTION_METRICS);
assertUsageInQuery(expectedFunctions, functionMeasurements, iteration, success);
} finally {
latch.countDown();
}
}));
latch.await(30, TimeUnit.SECONDS);
}
} finally {
plugin.resetMeter();
}

}

private static void assertAllUsages(Map<String, Integer> expected, List<Measurement> metrics, long iteration, Boolean success) {
Set<String> found = featureNames(metrics);
assertThat(found, is(expected.keySet()));
for (Measurement metric : metrics) {
assertThat(metric.attributes().get(PlanningMetricsManager.SUCCESS), is(success));
String featureName = (String) metric.attributes().get(PlanningMetricsManager.FEATURE_NAME);
assertThat(metric.getLong(), is(iteration * expected.get(featureName)));
}
}

private static void assertUsageInQuery(Map<String, Integer> expected, List<Measurement> found, long iteration, Boolean success) {
Set<String> functionsFound;
functionsFound = featureNames(found);
assertThat(functionsFound, is(expected.keySet()));
for (Measurement measurement : found) {
assertThat(measurement.attributes().get(PlanningMetricsManager.SUCCESS), is(success));
assertThat(measurement.getLong(), is(iteration));
}
}

private static List<Measurement> measurements(TestTelemetryPlugin plugin, String metricKey) {
return Measurement.combine(plugin.getLongCounterMeasurement(metricKey));
}

private static Set<String> featureNames(List<Measurement> functionMeasurements) {
return functionMeasurements.stream()
.map(x -> x.attributes().get(PlanningMetricsManager.FEATURE_NAME))
.map(String.class::cast)
.collect(Collectors.toSet());
}

private static EsqlQueryRequest executeQuery(String query) {
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
request.query(query);
request.pragmas(randomPragmas());
return request;
}

private static void loadData(String nodeName) {
int numDocs = randomIntBetween(1, 15);
assertAcked(
client().admin()
.indices()
.prepareCreate("idx")
.setSettings(
Settings.builder()
.put("index.routing.allocation.require._name", nodeName)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 5))
)
.setMapping("host", "type=keyword", "id", "type=long")
);
for (int i = 0; i < numDocs; i++) {
client().prepareIndex("idx").setSource("host", "192." + i, "id", i).get();
}

client().admin().indices().prepareRefresh("idx").get();
}

private DiscoveryNode randomDataNode() {
return randomFrom(clusterService().state().nodes().getDataNodes().values());
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), TestTelemetryPlugin.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ protected LogicalPlan rule(UnresolvedRelation plan, AnalyzerContext context) {
plan.frozen(),
plan.metadataFields(),
plan.indexMode(),
context.indexResolution().toString()
context.indexResolution().toString(),
plan.commandName()
);
}
TableIdentifier table = plan.table();
Expand All @@ -202,7 +203,8 @@ protected LogicalPlan rule(UnresolvedRelation plan, AnalyzerContext context) {
plan.frozen(),
plan.metadataFields(),
plan.indexMode(),
"invalid [" + table + "] resolution to [" + context.indexResolution() + "]"
"invalid [" + table + "] resolution to [" + context.indexResolution() + "]",
plan.commandName()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package org.elasticsearch.xpack.esql.execution;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
import org.elasticsearch.xpack.esql.analysis.PreAnalyzer;
import org.elasticsearch.xpack.esql.analysis.Verifier;
Expand All @@ -22,6 +23,8 @@
import org.elasticsearch.xpack.esql.session.IndexResolver;
import org.elasticsearch.xpack.esql.session.Result;
import org.elasticsearch.xpack.esql.stats.Metrics;
import org.elasticsearch.xpack.esql.stats.PlanningMetrics;
import org.elasticsearch.xpack.esql.stats.PlanningMetricsManager;
import org.elasticsearch.xpack.esql.stats.QueryMetric;

import java.util.function.BiConsumer;
Expand All @@ -36,14 +39,16 @@ public class PlanExecutor {
private final Mapper mapper;
private final Metrics metrics;
private final Verifier verifier;
private final PlanningMetricsManager planningMetricsManager;

public PlanExecutor(IndexResolver indexResolver) {
public PlanExecutor(IndexResolver indexResolver, MeterRegistry meterRegistry) {
this.indexResolver = indexResolver;
this.preAnalyzer = new PreAnalyzer();
this.functionRegistry = new EsqlFunctionRegistry();
this.mapper = new Mapper(functionRegistry);
this.metrics = new Metrics();
this.verifier = new Verifier(metrics);
this.planningMetricsManager = new PlanningMetricsManager(meterRegistry);
}

public void esql(
Expand All @@ -54,6 +59,7 @@ public void esql(
BiConsumer<PhysicalPlan, ActionListener<Result>> runPhase,
ActionListener<Result> listener
) {
final PlanningMetrics planningMetrics = new PlanningMetrics();
final var session = new EsqlSession(
sessionId,
cfg,
Expand All @@ -63,13 +69,18 @@ public void esql(
functionRegistry,
new LogicalPlanOptimizer(new LogicalOptimizerContext(cfg)),
mapper,
verifier
verifier,
planningMetrics
);
QueryMetric clientId = QueryMetric.fromString("rest");
metrics.total(clientId);
session.execute(request, runPhase, wrap(listener::onResponse, ex -> {
session.execute(request, runPhase, wrap(x -> {
planningMetricsManager.publish(planningMetrics, true);
listener.onResponse(x);
}, ex -> {
// TODO when we decide if we will differentiate Kibana from REST, this String value will likely come from the request
metrics.failed(clientId);
planningMetricsManager.publish(planningMetrics, false);
listener.onFailure(ex);
}));
}
Expand Down
Loading

0 comments on commit ab2cd17

Please sign in to comment.