Skip to content

Commit

Permalink
ESQL: INLINESTATS (elastic#109583)
Browse files Browse the repository at this point in the history
This implements `INLINESTATS`. Most of the heavy lifting is done by
`LOOKUP`, with this change mostly adding a new abstraction to logical
plans, and interface I'm calling `Phased`. Implementing this interface
allows a logical plan node to cut the query into phases. `INLINESTATS`
implements it by asking for a "first phase" that's the same query, up to
`INLINESTATS`, but with `INLINESTATS` replaced with `STATS`. The next
phase replaces the `INLINESTATS` with a `LOOKUP` on the results of the
first phase.

So, this query:
```
FROM foo
| EVAL bar = a * b
| INLINESTATS m = MAX(bar) BY b
| WHERE m = bar
| LIMIT 1
```

gets split into
```
FROM foo
| EVAL bar = a * b
| STATS m = MAX(bar) BY b
```

followed by
```
FROM foo
| EVAL bar = a * b
| LOOKUP (results of m = MAX(bar) BY b) ON b
| WHERE m = bar
| LIMIT 1
```
  • Loading branch information
nik9000 authored Jul 24, 2024
1 parent c80c16e commit b5c6c2d
Show file tree
Hide file tree
Showing 28 changed files with 1,569 additions and 36 deletions.
29 changes: 29 additions & 0 deletions docs/changelog/109583.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
pr: 109583
summary: "ESQL: INLINESTATS"
area: ES|QL
type: feature
issues:
- 107589
highlight:
title: "ESQL: INLINESTATS"
body: |-
This adds the `INLINESTATS` command to ESQL which performs a STATS and
then enriches the results into the output stream. So, this query:
[source,esql]
----
FROM test
| INLINESTATS m=MAX(a * b) BY b
| WHERE m == a * b
| SORT a DESC, b DESC
| LIMIT 3
----
Produces output like:
| a | b | m |
| --- | --- | ----- |
| 99 | 999 | 98901 |
| 99 | 998 | 98802 |
| 99 | 997 | 98703 |
notable: true
6 changes: 6 additions & 0 deletions docs/reference/esql/esql-commands.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ image::images/esql/processing-command.svg[A processing command changing an input
* <<esql-enrich>>
* <<esql-eval>>
* <<esql-grok>>
ifeval::["{release-state}"=="unreleased"]
* experimental:[] <<esql-inlinestats-by>>
endif::[]
* <<esql-keep>>
* <<esql-limit>>
ifeval::["{release-state}"=="unreleased"]
Expand All @@ -59,6 +62,9 @@ include::processing-commands/drop.asciidoc[]
include::processing-commands/enrich.asciidoc[]
include::processing-commands/eval.asciidoc[]
include::processing-commands/grok.asciidoc[]
ifeval::["{release-state}"=="unreleased"]
include::processing-commands/inlinestats.asciidoc[]
endif::[]
include::processing-commands/keep.asciidoc[]
include::processing-commands/limit.asciidoc[]
ifeval::["{release-state}"=="unreleased"]
Expand Down
102 changes: 102 additions & 0 deletions docs/reference/esql/processing-commands/inlinestats.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
[discrete]
[[esql-inlinestats-by]]
=== `INLINESTATS ... BY`

experimental::["INLINESTATS is highly experimental and only available in SNAPSHOT versions."]

The `INLINESTATS` command calculates an aggregate result and adds new columns
with the result to the stream of input data.

**Syntax**

[source,esql]
----
INLINESTATS [column1 =] expression1[, ..., [columnN =] expressionN]
[BY grouping_expression1[, ..., grouping_expressionN]]
----

*Parameters*

`columnX`::
The name by which the aggregated value is returned. If omitted, the name is
equal to the corresponding expression (`expressionX`). If multiple columns
have the same name, all but the rightmost column with this name will be ignored.

`expressionX`::
An expression that computes an aggregated value. If its name coincides with one
of the computed columns, that column will be ignored.

`grouping_expressionX`::
An expression that outputs the values to group by.

NOTE: Individual `null` values are skipped when computing aggregations.

*Description*

The `INLINESTATS` command calculates an aggregate result and merges that result
back into the stream of input data. Without the optional `BY` clause this will
produce a single result which is appended to each row. With a `BY` clause this
will produce one result per grouping and merge the result into the stream based on
matching group keys.

All of the <<esql-agg-functions,aggregation functions>> are supported.

*Examples*

Find the employees that speak the most languages (it's a tie!):

[source.merge.styled,esql]
----
include::{esql-specs}/inlinestats.csv-spec[tag=max-languages]
----
[%header.monospaced.styled,format=dsv,separator=|]
|===
include::{esql-specs}/inlinestats.csv-spec[tag=max-languages-result]
|===

Find the longest tenured employee who's last name starts with each letter of the alphabet:

[source.merge.styled,esql]
----
include::{esql-specs}/inlinestats.csv-spec[tag=longest-tenured-by-first]
----
[%header.monospaced.styled,format=dsv,separator=|]
|===
include::{esql-specs}/inlinestats.csv-spec[tag=longest-tenured-by-first-result]
|===

Find the northern and southern most airports:

[source.merge.styled,esql]
----
include::{esql-specs}/inlinestats.csv-spec[tag=extreme-airports]
----
[%header.monospaced.styled,format=dsv,separator=|]
|===
include::{esql-specs}/inlinestats.csv-spec[tag=extreme-airports-result]
|===

NOTE: Our test data doesn't have many "small" airports.

If a `BY` field is multivalued then `INLINESTATS` will put the row in *each*
bucket like <<esql-stats-by>>:

[source.merge.styled,esql]
----
include::{esql-specs}/inlinestats.csv-spec[tag=mv-group]
----
[%header.monospaced.styled,format=dsv,separator=|]
|===
include::{esql-specs}/inlinestats.csv-spec[tag=mv-group-result]
|===

To treat each group key as its own row use <<esql-mv_expand>> before `INLINESTATS`:

[source.merge.styled,esql]
----
include::{esql-specs}/inlinestats.csv-spec[tag=mv-expand]
----
[%header.monospaced.styled,format=dsv,separator=|]
|===
include::{esql-specs}/inlinestats.csv-spec[tag=mv-expand-result]
|===
2 changes: 1 addition & 1 deletion docs/reference/esql/processing-commands/lookup.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
[[esql-lookup]]
=== `LOOKUP`

experimental::["LOOKUP is a highly experimental and only available in SNAPSHOT versions."]
experimental::["LOOKUP is highly experimental and only available in SNAPSHOT versions."]

`LOOKUP` matches values from the input against a `table` provided in the request,
adding the other fields from the `table` to the output.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ public abstract sealed class RowInTableLookup implements Releasable permits Empt
public abstract String toString();

public static RowInTableLookup build(BlockFactory blockFactory, Block[] keys) {
if (keys.length < 1) {
throw new IllegalArgumentException("expected [keys] to be non-empty");
}
int positions = keys[0].getPositionCount();
for (int k = 0; k < keys.length; k++) {
if (positions != keys[k].getPositionCount()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ public String toString() {
* are never closed, so we need to build them from a non-tracking factory.
*/
public record Factory(Key[] keys, int[] blockMapping) implements Operator.OperatorFactory {
public Factory {
if (keys.length < 1) {
throw new IllegalArgumentException("expected [keys] to be non-empty");
}
}

@Override
public Operator get(DriverContext driverContext) {
return new RowInTableLookupOperator(driverContext.blockFactory(), keys, blockMapping);
Expand All @@ -56,6 +62,9 @@ public String describe() {
private final int[] blockMapping;

public RowInTableLookupOperator(BlockFactory blockFactory, Key[] keys, int[] blockMapping) {
if (keys.length < 1) {
throw new IllegalArgumentException("expected [keys] to be non-empty");
}
this.blockMapping = blockMapping;
this.keys = new ArrayList<>(keys.length);
Block[] blocks = new Block[keys.length];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ protected void shouldSkipTest(String testName) throws IOException {
"Test " + testName + " is skipped on " + Clusters.oldVersion(),
isEnabled(testName, instructions, Clusters.oldVersion())
);
assumeFalse("INLINESTATS not yet supported in CCS", testCase.requiredCapabilities.contains("inlinestats"));
}

private TestFeatureService remoteFeaturesService() throws IOException {
Expand Down
Loading

0 comments on commit b5c6c2d

Please sign in to comment.