Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing remote ENRICH by pushing the Enrich inside FragmentExec #114665

Merged
merged 9 commits into from
Oct 24, 2024
6 changes: 6 additions & 0 deletions docs/changelog/114665.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 114665
summary: Fixing remote ENRICH by pushing the Enrich inside `FragmentExec`
area: ES|QL
type: bug
issues:
- 105095
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -469,27 +470,112 @@ public void testEnrichRemoteWithVendor() {
}
}

public void testEnrichRemoteWithVendorNoSort() {
Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
Boolean requestIncludeMeta = includeCCSMetadata.v1();
boolean responseExpectMeta = includeCCSMetadata.v2();

for (Enrich.Mode hostMode : List.of(Enrich.Mode.ANY, Enrich.Mode.REMOTE)) {
var query = String.format(Locale.ROOT, """
FROM *:events,events
| LIMIT 100
| eval ip= TO_STR(host)
| %s
| %s
| stats c = COUNT(*) by vendor
""", enrichHosts(hostMode), enrichVendors(Enrich.Mode.REMOTE));
try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
var values = getValuesList(resp);
values.sort(Comparator.comparing(o -> (String) o.get(1), Comparator.nullsLast(Comparator.naturalOrder())));
assertThat(
values,
equalTo(
List.of(
List.of(6L, "Apple"),
List.of(7L, "Microsoft"),
List.of(1L, "Redhat"),
List.of(2L, "Samsung"),
List.of(1L, "Sony"),
List.of(2L, "Suse"),
Arrays.asList(3L, (String) null)
)
)
);
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2")));
assertCCSExecutionInfoDetails(executionInfo);
}
}
}

public void testTopNThenEnrichRemote() {
Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
Boolean requestIncludeMeta = includeCCSMetadata.v1();
boolean responseExpectMeta = includeCCSMetadata.v2();

String query = String.format(Locale.ROOT, """
FROM *:events,events
| eval ip= TO_STR(host)
| SORT ip
| SORT timestamp, user, ip
| LIMIT 5
| %s
| %s | KEEP host, timestamp, user, os
""", enrichHosts(Enrich.Mode.REMOTE));
var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close());
assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after LIMIT"));
try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
assertThat(
getValuesList(resp),
equalTo(
List.of(
List.of("192.168.1.2", 1L, "andres", "Windows"),
List.of("192.168.1.3", 1L, "matthew", "MacOS"),
Arrays.asList("192.168.1.25", 1L, "park", (String) null),
List.of("192.168.1.5", 2L, "akio", "Android"),
List.of("192.168.1.6", 2L, "sergio", "iOS")
)
)
);
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2")));
assertCCSExecutionInfoDetails(executionInfo);
}
}

public void testLimitThenEnrichRemote() {
Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
Boolean requestIncludeMeta = includeCCSMetadata.v1();
boolean responseExpectMeta = includeCCSMetadata.v2();

String query = String.format(Locale.ROOT, """
FROM *:events,events
| LIMIT 10
| LIMIT 25
| eval ip= TO_STR(host)
| %s
| %s | KEEP host, timestamp, user, os
""", enrichHosts(Enrich.Mode.REMOTE));
var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close());
assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after LIMIT"));
try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
var values = getValuesList(resp);
values.sort(
Comparator.comparingLong((List<Object> o) -> (Long) o.get(1))
.thenComparing(o -> (String) o.get(0))
.thenComparing(o -> (String) o.get(2))
);
assertThat(
values.subList(0, 5),
equalTo(
List.of(
List.of("192.168.1.2", 1L, "andres", "Windows"),
Arrays.asList("192.168.1.25", 1L, "park", (String) null),
List.of("192.168.1.3", 1L, "matthew", "MacOS"),
List.of("192.168.1.5", 2L, "akio", "Android"),
List.of("192.168.1.5", 2L, "simon", "Android")
)
)
);
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2")));
assertCCSExecutionInfoDetails(executionInfo);
}
}

public void testAggThenEnrichRemote() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,22 +609,15 @@ private static void checkForSortableDataTypes(LogicalPlan p, Set<Failure> localF
*/
private static void checkRemoteEnrich(LogicalPlan plan, Set<Failure> failures) {
boolean[] agg = { false };
boolean[] limit = { false };
boolean[] enrichCoord = { false };

plan.forEachUp(UnaryPlan.class, u -> {
if (u instanceof Limit) {
limit[0] = true; // TODO: Make Limit then enrich_remote work
}
if (u instanceof Aggregate) {
agg[0] = true;
} else if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) {
enrichCoord[0] = true;
}
if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {
if (limit[0]) {
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after LIMIT"));
}
if (agg[0]) {
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@
import org.elasticsearch.xpack.esql.plan.physical.RowExec;
import org.elasticsearch.xpack.esql.plan.physical.ShowExec;
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
import org.elasticsearch.xpack.esql.plan.physical.UnaryExec;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* <p>This class is part of the planner</p>
Expand Down Expand Up @@ -104,6 +106,46 @@ public PhysicalPlan map(LogicalPlan p) {
//
// Unary Plan
//
if (localMode == false && p instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {
// When we have remote enrich, we want to put it under FragmentExec, so it would be executed remotely.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remote typically implies remote cluster. I think you mean data node (or in ESQL terminology local as in local planning).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, here it is meant that remote enrich would go to the remote cluster. It may also do it effectively on the local cluster if the enrich policy & indexes are there, but the important part I think is that it'd also go to the remote.

// We're only going to do it on the coordinator node.
// The way we're going to do it is as follows:
// 1. Locate FragmentExec in the tree. If we have no FragmentExec, we won't do anything.
// 2. Put this Enrich under it, removing everything that was below it previously.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removing everything that was below it previously
This most certainly sounds like a no-go. Do you mean insert the node under FragmentExec and the node below maybe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything that is under the Enrich will still be under it when it's inserted into the Fragment, as a logical plan (which will later be converted to physical when processing the fragment as I understand).

// 3. Above FragmentExec, we should deal with pipeline breakers, since pipeline ops already are supposed to go under
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pipeline breaker influence the exchange data transfer, if you add another node it will break the data-node / coordinator contract.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean here, could you explain?

// FragmentExec.
// 4. Aggregates can't appear here since the plan should have errored out if we have aggregate inside remote Enrich.
// 5. So we should be keeping: LimitExec, ExchangeExec, OrderExec, TopNExec (actually OrderExec probably can't happen anyway).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like you want Enrich to be a pipeline breaker itself.
The comments above indicate a number of incorrect assumptions that are not accurate and doesn't explain the problem it tries to solve.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making Enrich pipeline breaker won't help us too much, by itself. The problem is that due to how mapping works right now, it has (had before the patch) no ability to place Enrich inside the fragment (and thus execute it remotely) if any pipeline breakers are present (and LIMIT at least is always present unless assimilated into TopN).


var child = map(enrich.child());
Comment on lines +109 to +120
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is similar to the code below under UnaryPlan, var child = map().. // line 150.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this part so far is, but the later part diverges.

AtomicBoolean hasFragment = new AtomicBoolean(false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Holder hasFragment = new Holder<>(false);


var childTransformed = child.transformUp((f) -> {
// Once we reached FragmentExec, we stuff our Enrich under it
if (f instanceof FragmentExec) {
hasFragment.set(true);
return new FragmentExec(p);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why this works!
This overrides the subplan on the datanode with the enrich plan up the tree, that sits on the coordinator essentially overriding the pipeline boundary regardless of what's underneath.
This needs to be changed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It basically does the same thing it does later to non-pipeline-breakers but with some complications.

}
if (f instanceof EnrichExec enrichExec) {
// It can only be ANY because COORDINATOR would have errored out earlier, and REMOTE should be under FragmentExec
assert enrichExec.mode() == Enrich.Mode.ANY : "enrich must be in ANY mode here";
return enrichExec.child();
}
if (f instanceof UnaryExec unaryExec) {
if (f instanceof LimitExec || f instanceof ExchangeExec || f instanceof OrderExec || f instanceof TopNExec) {
return f;
} else {
return unaryExec.child();
}
Comment on lines +134 to +139
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm, not sure what this is suppose to do - check if it's a pipeline breaker otherwise skip it?
So a FilterExec gets removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, FilterExec will be removed from this part - the filter will be inside Enrich plan under FragmentExec, which when converted to a physical plan would produce the FilterExec which will be executed.

}
// Currently, it's either UnaryExec or LeafExec. Leaf will either resolve to FragmentExec or we'll ignore it.
return f;
});

if (hasFragment.get()) {
return childTransformed;
}
}
Comment on lines +120 to +148
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section of code needs to be integrated with the one below (line 112-115, about Enrich and coordinator mode).


if (p instanceof UnaryPlan ua) {
var child = map(ua.child());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;

// @TestLogging(value = "org.elasticsearch.xpack.esql:TRACE", reason = "debug")
// @TestLogging(value = "org.elasticsearch.xpack.esql:DEBUG", reason = "debug")
public class PhysicalPlanOptimizerTests extends ESTestCase {

private static final String PARAM_FORMATTING = "%1$s";
Expand Down Expand Up @@ -5851,14 +5851,14 @@ public void testEnrichBeforeLimit() {
| EVAL employee_id = to_str(emp_no)
| ENRICH _remote:departments
| LIMIT 10""");
var enrich = as(plan, EnrichExec.class);
assertThat(enrich.mode(), equalTo(Enrich.Mode.REMOTE));
assertThat(enrich.concreteIndices(), equalTo(Map.of("cluster_1", ".enrich-departments-2")));
var eval = as(enrich.child(), EvalExec.class);
var finalLimit = as(eval.child(), LimitExec.class);
var finalLimit = as(plan, LimitExec.class);
var exchange = as(finalLimit.child(), ExchangeExec.class);
var fragment = as(exchange.child(), FragmentExec.class);
var partialLimit = as(fragment.fragment(), Limit.class);
var enrich = as(fragment.fragment(), Enrich.class);
assertThat(enrich.mode(), equalTo(Enrich.Mode.REMOTE));
assertThat(enrich.concreteIndices(), equalTo(Map.of("cluster_1", ".enrich-departments-2")));
var evalFragment = as(enrich.child(), Eval.class);
var partialLimit = as(evalFragment.child(), Limit.class);
as(partialLimit.child(), EsRelation.class);
}
}
Expand Down Expand Up @@ -5901,13 +5901,21 @@ public void testLimitThenEnrich() {
}

public void testLimitThenEnrichRemote() {
var error = expectThrows(VerificationException.class, () -> physicalPlan("""
var plan = physicalPlan("""
FROM test
| LIMIT 10
| EVAL employee_id = to_str(emp_no)
| ENRICH _remote:departments
"""));
assertThat(error.getMessage(), containsString("line 4:3: ENRICH with remote policy can't be executed after LIMIT"));
""");
var finalLimit = as(plan, LimitExec.class);
var exchange = as(finalLimit.child(), ExchangeExec.class);
var fragment = as(exchange.child(), FragmentExec.class);
var enrich = as(fragment.fragment(), Enrich.class);
assertThat(enrich.mode(), equalTo(Enrich.Mode.REMOTE));
assertThat(enrich.concreteIndices(), equalTo(Map.of("cluster_1", ".enrich-departments-2")));
var evalFragment = as(enrich.child(), Eval.class);
var partialLimit = as(evalFragment.child(), Limit.class);
as(partialLimit.child(), EsRelation.class);
}

public void testEnrichBeforeTopN() {
Expand Down Expand Up @@ -5961,6 +5969,23 @@ public void testEnrichBeforeTopN() {
var eval = as(enrich.child(), Eval.class);
as(eval.child(), EsRelation.class);
}
{
var plan = physicalPlan("""
FROM test
| EVAL employee_id = to_str(emp_no)
| ENRICH _remote:departments
| SORT department
| LIMIT 10""");
var topN = as(plan, TopNExec.class);
var exchange = as(topN.child(), ExchangeExec.class);
var fragment = as(exchange.child(), FragmentExec.class);
var partialTopN = as(fragment.fragment(), TopN.class);
var enrich = as(partialTopN.child(), Enrich.class);
assertThat(enrich.mode(), equalTo(Enrich.Mode.REMOTE));
assertThat(enrich.concreteIndices(), equalTo(Map.of("cluster_1", ".enrich-departments-2")));
var eval = as(enrich.child(), Eval.class);
as(eval.child(), EsRelation.class);
}
}

public void testEnrichAfterTopN() {
Expand Down Expand Up @@ -6000,6 +6025,24 @@ public void testEnrichAfterTopN() {
var partialTopN = as(fragment.fragment(), TopN.class);
as(partialTopN.child(), EsRelation.class);
}
{
var plan = physicalPlan("""
FROM test
| SORT emp_no
| LIMIT 10
| EVAL employee_id = to_str(emp_no)
| ENRICH _remote:departments
""");
var topN = as(plan, TopNExec.class);
var exchange = as(topN.child(), ExchangeExec.class);
var fragment = as(exchange.child(), FragmentExec.class);
var enrich = as(fragment.fragment(), Enrich.class);
assertThat(enrich.mode(), equalTo(Enrich.Mode.REMOTE));
assertThat(enrich.concreteIndices(), equalTo(Map.of("cluster_1", ".enrich-departments-2")));
var evalFragment = as(enrich.child(), Eval.class);
var partialTopN = as(evalFragment.child(), TopN.class);
as(partialTopN.child(), EsRelation.class);
}
}

public void testManyEnrich() {
Expand Down