-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixing remote ENRICH by pushing the Enrich inside FragmentExec #114665
Conversation
2723f7e
to
a478b6a
Compare
Hi @smalyshev, I've created a changelog YAML for you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @smalyshev
8303f87
to
57cf748
Compare
Pinging @elastic/es-analytical-engine (Team:Analytics) |
💔 Backport failedThe backport operation could not be completed due to the following error:
You can use sqren/backport to manually backport by running |
💚 All backports created successfully
Questions ?Please refer to the Backport tool documentation |
…ic#114665) * Fixing remote ENRICH by pushing the Enrich inside FragmentExec * Improve handling of more complex cases such as several enriches (cherry picked from commit e789039)
…ic#114665) * Fixing remote ENRICH by pushing the Enrich inside FragmentExec * Improve handling of more complex cases such as several enriches
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to a merge conflict I had the change to look at the code added in this PR and have some serious concerns the Mapper code.
I have a hard time following what the code does but the transformation looks scary to me.
var child = map(enrich.child()); | ||
AtomicBoolean hasFragment = new AtomicBoolean(false); | ||
|
||
var childTransformed = child.transformUp((f) -> { | ||
// Once we reached FragmentExec, we stuff our Enrich under it | ||
if (f instanceof FragmentExec) { | ||
hasFragment.set(true); | ||
return new FragmentExec(p); | ||
} | ||
if (f instanceof EnrichExec enrichExec) { | ||
// It can only be ANY because COORDINATOR would have errored out earlier, and REMOTE should be under FragmentExec | ||
assert enrichExec.mode() == Enrich.Mode.ANY : "enrich must be in ANY mode here"; | ||
return enrichExec.child(); | ||
} | ||
if (f instanceof UnaryExec unaryExec) { | ||
if (f instanceof LimitExec || f instanceof ExchangeExec || f instanceof OrderExec || f instanceof TopNExec) { | ||
return f; | ||
} else { | ||
return unaryExec.child(); | ||
} | ||
} | ||
// Currently, it's either UnaryExec or LeafExec. Leaf will either resolve to FragmentExec or we'll ignore it. | ||
return f; | ||
}); | ||
|
||
if (hasFragment.get()) { | ||
return childTransformed; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This section of code needs to be integrated with the one below (line 112-115, about Enrich and coordinator mode).
// 5. So we should be keeping: LimitExec, ExchangeExec, OrderExec, TopNExec (actually OrderExec probably can't happen anyway). | ||
|
||
var child = map(enrich.child()); | ||
AtomicBoolean hasFragment = new AtomicBoolean(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Holder hasFragment = new Holder<>(false);
if (localMode == false && p instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) { | ||
// When we have remote enrich, we want to put it under FragmentExec, so it would be executed remotely. | ||
// We're only going to do it on the coordinator node. | ||
// The way we're going to do it is as follows: | ||
// 1. Locate FragmentExec in the tree. If we have no FragmentExec, we won't do anything. | ||
// 2. Put this Enrich under it, removing everything that was below it previously. | ||
// 3. Above FragmentExec, we should deal with pipeline breakers, since pipeline ops already are supposed to go under | ||
// FragmentExec. | ||
// 4. Aggregates can't appear here since the plan should have errored out if we have aggregate inside remote Enrich. | ||
// 5. So we should be keeping: LimitExec, ExchangeExec, OrderExec, TopNExec (actually OrderExec probably can't happen anyway). | ||
|
||
var child = map(enrich.child()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is similar to the code below under UnaryPlan, var child = map().. // line 150.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this part so far is, but the later part diverges.
@@ -104,6 +106,46 @@ public PhysicalPlan map(LogicalPlan p) { | |||
// | |||
// Unary Plan | |||
// | |||
if (localMode == false && p instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) { | |||
// When we have remote enrich, we want to put it under FragmentExec, so it would be executed remotely. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remote typically implies remote cluster. I think you mean data node (or in ESQL terminology local as in local planning).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, here it is meant that remote enrich would go to the remote cluster. It may also do it effectively on the local cluster if the enrich policy & indexes are there, but the important part I think is that it'd also go to the remote.
// We're only going to do it on the coordinator node. | ||
// The way we're going to do it is as follows: | ||
// 1. Locate FragmentExec in the tree. If we have no FragmentExec, we won't do anything. | ||
// 2. Put this Enrich under it, removing everything that was below it previously. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removing everything that was below it previously
This most certainly sounds like a no-go. Do you mean insert the node under FragmentExec and the node below maybe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Everything that is under the Enrich will still be under it when it's inserted into the Fragment, as a logical plan (which will later be converted to physical when processing the fragment as I understand).
// The way we're going to do it is as follows: | ||
// 1. Locate FragmentExec in the tree. If we have no FragmentExec, we won't do anything. | ||
// 2. Put this Enrich under it, removing everything that was below it previously. | ||
// 3. Above FragmentExec, we should deal with pipeline breakers, since pipeline ops already are supposed to go under |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pipeline breaker influence the exchange data transfer, if you add another node it will break the data-node / coordinator contract.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what you mean here, could you explain?
// 3. Above FragmentExec, we should deal with pipeline breakers, since pipeline ops already are supposed to go under | ||
// FragmentExec. | ||
// 4. Aggregates can't appear here since the plan should have errored out if we have aggregate inside remote Enrich. | ||
// 5. So we should be keeping: LimitExec, ExchangeExec, OrderExec, TopNExec (actually OrderExec probably can't happen anyway). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds like you want Enrich to be a pipeline breaker itself.
The comments above indicate a number of incorrect assumptions that are not accurate and doesn't explain the problem it tries to solve.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making Enrich pipeline breaker won't help us too much, by itself. The problem is that due to how mapping works right now, it has (had before the patch) no ability to place Enrich inside the fragment (and thus execute it remotely) if any pipeline breakers are present (and LIMIT at least is always present unless assimilated into TopN).
// Once we reached FragmentExec, we stuff our Enrich under it | ||
if (f instanceof FragmentExec) { | ||
hasFragment.set(true); | ||
return new FragmentExec(p); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why this works!
This overrides the subplan on the datanode with the enrich plan up the tree, that sits on the coordinator essentially overriding the pipeline boundary regardless of what's underneath.
This needs to be changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It basically does the same thing it does later to non-pipeline-breakers but with some complications.
if (f instanceof UnaryExec unaryExec) { | ||
if (f instanceof LimitExec || f instanceof ExchangeExec || f instanceof OrderExec || f instanceof TopNExec) { | ||
return f; | ||
} else { | ||
return unaryExec.child(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm, not sure what this is suppose to do - check if it's a pipeline breaker otherwise skip it?
So a FilterExec gets removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, FilterExec will be removed from this part - the filter will be inside Enrich plan under FragmentExec, which when converted to a physical plan would produce the FilterExec which will be executed.
…ic#114665) * Fixing remote ENRICH by pushing the Enrich inside FragmentExec * Improve handling of more complex cases such as several enriches
Make
Mapper
to place remote enrich clauses into FragmentExec and reorder the tree so all work for this enrich also happens inside FragmentExec. Some operations require special handling, such as limits and sorts, because they happen both inside and outside Fragment.See also: #105095