-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ESQL: Paginate MV_EXPAND output #100598
ESQL: Paginate MV_EXPAND output #100598
Conversation
Pinging @elastic/es-ql (Team:QL) |
Pinging @elastic/elasticsearch-esql (:Query Languages/ES|QL) |
assertThat(status.pagesProcessed(), equalTo(1)); | ||
assertThat(status.noops(), equalTo(0)); | ||
} | ||
|
||
// TODO: remove this once possible | ||
// https://github.com/elastic/elasticsearch/issues/99826 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The memory accounting should be fine now.
I also ran this test in a loop a few hundred times and it never failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can remove the whole canLeak
method I think. This is the last caller and it needs to go anyway.
@@ -582,7 +582,8 @@ private PhysicalOperation planLimit(LimitExec limit, LocalExecutionPlannerContex | |||
|
|||
private PhysicalOperation planMvExpand(MvExpandExec mvExpandExec, LocalExecutionPlannerContext context) { | |||
PhysicalOperation source = plan(mvExpandExec.child(), context); | |||
return source.with(new MvExpandOperator.Factory(source.layout.get(mvExpandExec.target().id()).channel()), source.layout); | |||
int blockSize = 5000;// TODO estimate row size and use context.pageSize() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs a bit more plumbing, probably it's not too complicated but we could also consider it for a follow-up PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
} | ||
|
||
protected Page process() { | ||
Block expandingBlock = prev.getBlock(channel); | ||
Block expandedBlock = expandingBlock.expand(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uhm, this can be done once per block, no need to do it at every process()
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
} | ||
|
||
protected Page process() { | ||
Block expandingBlock = prev.getBlock(channel); | ||
Block expandedBlock = expandingBlock.expand(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
return new Page(expandedBlock); | ||
} | ||
|
||
int[] duplicateFilter = buildDuplicateExpandingFilter(expandingBlock, expandedBlock.getPositionCount()); | ||
int[] duplicateFilter = nextDuplicateExpandingFilter(expandingBlock, pageSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pageSize
, right? I think maybe once you move expandedBlock
to a member variable too then this won't take any args, right?
private int[] buildDuplicateExpandingFilter(Block expandingBlock, int newPositions) { | ||
int[] duplicateFilter = new int[newPositions]; | ||
private int[] nextDuplicateExpandingFilter(Block expandingBlock, int size) { | ||
int[] duplicateFilter = new int[size]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably min(size, expanded.positionCount - nextPositionToProcess)
. Maybe?
int n = 0; | ||
for (int p = 0; p < expandingBlock.getPositionCount(); p++) { | ||
int count = expandingBlock.getValueCount(p); | ||
while (nextPositionToProcess < expandingBlock.getPositionCount()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tend to use while (true)
if here are multiple interesting ways to break from the loop. It's kind of a signal to the reader that "something weird is here"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a big fan of while(true)
, but in this case I think it will make the code easier to read, so 👍
prevCompleted = true; | ||
} | ||
} else { | ||
nextMvToProcess = nextMvToProcess + toAdd; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be worth comments about the meanings of these two arms. I think one is "we're done expanding this page" and another is "we've filled the page we're building and maybe exhausted the current position. But we might not have, we might be half way through expanding a position.
public static final class Status extends AbstractPageMappingOperator.Status { | ||
public static final class Status implements Operator.Status { | ||
|
||
private final int pagesProcessed; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be nice to have this as pagesIn
and pagesOut
. That way you can see what the multiplication factor is too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
assertThat(status.pagesProcessed(), equalTo(1)); | ||
assertThat(status.noops(), equalTo(0)); | ||
} | ||
|
||
// TODO: remove this once possible | ||
// https://github.com/elastic/elasticsearch/issues/99826 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can remove the whole canLeak
method I think. This is the last caller and it needs to go anyway.
@@ -582,7 +582,8 @@ private PhysicalOperation planLimit(LimitExec limit, LocalExecutionPlannerContex | |||
|
|||
private PhysicalOperation planMvExpand(MvExpandExec mvExpandExec, LocalExecutionPlannerContext context) { | |||
PhysicalOperation source = plan(mvExpandExec.child(), context); | |||
return source.with(new MvExpandOperator.Factory(source.layout.get(mvExpandExec.target().id()).channel()), source.layout); | |||
int blockSize = 5000;// TODO estimate row size and use context.pageSize() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
@Override | ||
protected boolean canLeak() { | ||
return true; | ||
public void testExpandWithBytesRefs() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice.
I guess this is ready for a final review now |
@elasticmachine update branch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a small comment about the silly mutation testing. Otherwise, looks great to me.
@@ -35,20 +35,22 @@ protected Writeable.Reader<MvExpandOperator.Status> instanceReader() { | |||
|
|||
@Override | |||
public MvExpandOperator.Status createTestInstance() { | |||
return new MvExpandOperator.Status(randomNonNegativeInt(), randomNonNegativeInt()); | |||
return new MvExpandOperator.Status(randomNonNegativeInt(), randomNonNegativeInt(), randomNonNegativeInt()); | |||
} | |||
|
|||
@Override | |||
protected MvExpandOperator.Status mutateInstance(MvExpandOperator.Status instance) { | |||
switch (between(0, 1)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should change to between(0, 2)
and there should be an arm that just changes pagesIn and another that just changes pagesOut.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, doing it now
@Override | ||
protected boolean canLeak() { | ||
return true; | ||
public void testExpandWithBytesRefs() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice.
@elasticmachine run elasticsearch-ci/bwc |
@elasticmachine update branch |
💚 Backport successful
|
Let MV_EXPAND operator paginate the output, so that the memory footprint remains low.
Now also queries like
consume a small amount of memory and avoid to calculate elements that will be discarded by a subsequent LIMIT.
Fixes #100533