diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MvExpandOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MvExpandOperator.java index f6156507dffa2..c322520d8853b 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MvExpandOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MvExpandOperator.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.core.Releasables; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; @@ -31,11 +32,12 @@ * 2 | 2 | "foo" * */ -public class MvExpandOperator extends AbstractPageMappingOperator { - public record Factory(int channel) implements OperatorFactory { +public class MvExpandOperator implements Operator { + + public record Factory(int channel, int blockSize) implements OperatorFactory { @Override public Operator get(DriverContext driverContext) { - return new MvExpandOperator(channel); + return new MvExpandOperator(channel, blockSize); } @Override @@ -46,49 +48,158 @@ public String describe() { private final int channel; + private final int pageSize; + private int noops; - public MvExpandOperator(int channel) { + private Page prev; + private boolean prevCompleted = false; + private boolean finished = false; + + private Block expandingBlock; + private Block expandedBlock; + + private int nextPositionToProcess = 0; + private int nextMvToProcess = 0; + private int nextItemOnExpanded = 0; + + /** + * Count of pages that have been processed by this operator. + */ + private int pagesIn; + private int pagesOut; + + public MvExpandOperator(int channel, int pageSize) { this.channel = channel; + this.pageSize = pageSize; + assert pageSize > 0; } @Override - protected Page process(Page page) { - Block expandingBlock = page.getBlock(channel); - Block expandedBlock = expandingBlock.expand(); + public final Page getOutput() { + if (prev == null) { + return null; + } + pagesOut++; + if (prev.getPositionCount() == 0 || expandingBlock.mayHaveMultivaluedFields() == false) { + noops++; + Page result = prev; + prev = null; + return result; + } + + try { + return process(); + } finally { + if (prevCompleted && prev != null) { + prev.releaseBlocks(); + prev = null; + } + } + } + + protected Page process() { if (expandedBlock == expandingBlock) { noops++; - return page; + prevCompleted = true; + return prev; } - if (page.getBlockCount() == 1) { + if (prev.getBlockCount() == 1) { assert channel == 0; + prevCompleted = true; return new Page(expandedBlock); } - int[] duplicateFilter = buildDuplicateExpandingFilter(expandingBlock, expandedBlock.getPositionCount()); + int[] duplicateFilter = nextDuplicateExpandingFilter(); - Block[] result = new Block[page.getBlockCount()]; + Block[] result = new Block[prev.getBlockCount()]; + int[] expandedMask = new int[duplicateFilter.length]; + for (int i = 0; i < expandedMask.length; i++) { + expandedMask[i] = i + nextItemOnExpanded; + } + nextItemOnExpanded += expandedMask.length; for (int b = 0; b < result.length; b++) { - result[b] = b == channel ? expandedBlock : page.getBlock(b).filter(duplicateFilter); + result[b] = b == channel ? expandedBlock.filter(expandedMask) : prev.getBlock(b).filter(duplicateFilter); + } + if (nextItemOnExpanded == expandedBlock.getPositionCount()) { + nextItemOnExpanded = 0; } return new Page(result); } - private int[] buildDuplicateExpandingFilter(Block expandingBlock, int newPositions) { - int[] duplicateFilter = new int[newPositions]; + private int[] nextDuplicateExpandingFilter() { + int[] duplicateFilter = new int[Math.min(pageSize, expandedBlock.getPositionCount() - nextPositionToProcess)]; int n = 0; - for (int p = 0; p < expandingBlock.getPositionCount(); p++) { - int count = expandingBlock.getValueCount(p); + while (true) { + int count = expandingBlock.getValueCount(nextPositionToProcess); int positions = count == 0 ? 1 : count; - Arrays.fill(duplicateFilter, n, n + positions, p); - n += positions; + int toAdd = Math.min(pageSize - n, positions - nextMvToProcess); + Arrays.fill(duplicateFilter, n, n + toAdd, nextPositionToProcess); + n += toAdd; + + if (n == pageSize) { + if (nextMvToProcess + toAdd == positions) { + // finished expanding this position, let's move on to next position (that will be expanded with next call) + nextMvToProcess = 0; + nextPositionToProcess++; + if (nextPositionToProcess == expandingBlock.getPositionCount()) { + nextPositionToProcess = 0; + prevCompleted = true; + } + } else { + // there are still items to expand in current position, but the duplicate filter is full, so we'll deal with them at + // next call + nextMvToProcess = nextMvToProcess + toAdd; + } + return duplicateFilter; + } + + nextMvToProcess = 0; + nextPositionToProcess++; + if (nextPositionToProcess == expandingBlock.getPositionCount()) { + nextPositionToProcess = 0; + nextMvToProcess = 0; + prevCompleted = true; + return n < pageSize ? Arrays.copyOfRange(duplicateFilter, 0, n) : duplicateFilter; + } } - return duplicateFilter; } @Override - protected AbstractPageMappingOperator.Status status(int pagesProcessed) { - return new Status(pagesProcessed, noops); + public final boolean needsInput() { + return prev == null && finished == false; + } + + @Override + public final void addInput(Page page) { + assert prev == null : "has pending input page"; + prev = page; + this.expandingBlock = prev.getBlock(channel); + this.expandedBlock = expandingBlock.expand(); + pagesIn++; + prevCompleted = false; + } + + @Override + public final void finish() { + finished = true; + } + + @Override + public final boolean isFinished() { + return finished && prev == null; + } + + @Override + public final Status status() { + return new Status(pagesIn, pagesOut, noops); + } + + @Override + public void close() { + if (prev != null) { + Releasables.closeExpectNoException(() -> prev.releaseBlocks()); + } } @Override @@ -96,35 +207,42 @@ public String toString() { return "MvExpandOperator[channel=" + channel + "]"; } - public static final class Status extends AbstractPageMappingOperator.Status { + public static final class Status implements Operator.Status { + + private final int pagesIn; + private final int pagesOut; + private final int noops; + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( Operator.Status.class, "mv_expand", Status::new ); - private final int noops; - - Status(int pagesProcessed, int noops) { - super(pagesProcessed); + Status(int pagesIn, int pagesOut, int noops) { + this.pagesIn = pagesIn; + this.pagesOut = pagesOut; this.noops = noops; } Status(StreamInput in) throws IOException { - super(in); + pagesIn = in.readVInt(); + pagesOut = in.readVInt(); noops = in.readVInt(); } @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); + out.writeVInt(pagesIn); + out.writeVInt(pagesOut); out.writeVInt(noops); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field("pages_processed", pagesProcessed()); + builder.field("pages_in", pagesIn); + builder.field("pages_out", pagesOut); builder.field("noops", noops); return builder.endObject(); } @@ -147,12 +265,20 @@ public boolean equals(Object o) { return false; } Status status = (Status) o; - return noops == status.noops && pagesProcessed() == status.pagesProcessed(); + return noops == status.noops && pagesIn == status.pagesIn && pagesOut == status.pagesOut; + } + + public int pagesIn() { + return pagesIn; + } + + public int pagesOut() { + return pagesOut; } @Override public int hashCode() { - return Objects.hash(noops, pagesProcessed()); + return Objects.hash(noops, pagesIn, pagesOut); } @Override diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorStatusTests.java index fe281bbf16131..9527388a0d3cf 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorStatusTests.java @@ -16,12 +16,12 @@ public class MvExpandOperatorStatusTests extends AbstractWireSerializingTestCase { public static MvExpandOperator.Status simple() { - return new MvExpandOperator.Status(10, 9); + return new MvExpandOperator.Status(10, 15, 9); } public static String simpleToJson() { return """ - {"pages_processed":10,"noops":9}"""; + {"pages_in":10,"pages_out":15,"noops":9}"""; } public void testToXContent() { @@ -35,20 +35,28 @@ protected Writeable.Reader instanceReader() { @Override public MvExpandOperator.Status createTestInstance() { - return new MvExpandOperator.Status(randomNonNegativeInt(), randomNonNegativeInt()); + return new MvExpandOperator.Status(randomNonNegativeInt(), randomNonNegativeInt(), randomNonNegativeInt()); } @Override protected MvExpandOperator.Status mutateInstance(MvExpandOperator.Status instance) { - switch (between(0, 1)) { + switch (between(0, 2)) { case 0: return new MvExpandOperator.Status( - randomValueOtherThan(instance.pagesProcessed(), ESTestCase::randomNonNegativeInt), + randomValueOtherThan(instance.pagesIn(), ESTestCase::randomNonNegativeInt), + instance.pagesOut(), instance.noops() ); case 1: return new MvExpandOperator.Status( - instance.pagesProcessed(), + instance.pagesIn(), + randomValueOtherThan(instance.pagesOut(), ESTestCase::randomNonNegativeInt), + instance.noops() + ); + case 2: + return new MvExpandOperator.Status( + instance.pagesIn(), + instance.pagesOut(), randomValueOtherThan(instance.noops(), ESTestCase::randomNonNegativeInt) ); default: diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java index 69c965fc91323..f99685609ff78 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java @@ -9,17 +9,19 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.compute.data.BasicBlockTests; +import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.Page; +import java.util.Iterator; import java.util.List; import static org.elasticsearch.compute.data.BasicBlockTests.randomBlock; import static org.elasticsearch.compute.data.BasicBlockTests.valuesAtPositions; +import static org.elasticsearch.compute.data.BlockTestUtils.deepCopyOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -47,7 +49,7 @@ protected Page createPage(int positionOffset, int length) { @Override protected Operator.OperatorFactory simple(BigArrays bigArrays) { - return new MvExpandOperator.Factory(0); + return new MvExpandOperator.Factory(0, randomIntBetween(1, 1000)); } @Override @@ -60,47 +62,143 @@ protected String expectedToStringOfSimple() { return expectedDescriptionOfSimple(); } - @Override - protected void assertSimpleOutput(List input, List results) { - assertThat(results, hasSize(results.size())); - for (int i = 0; i < results.size(); i++) { - IntBlock origExpanded = input.get(i).getBlock(0); - IntBlock resultExpanded = results.get(i).getBlock(0); - int np = 0; - for (int op = 0; op < origExpanded.getPositionCount(); op++) { - if (origExpanded.isNull(op)) { - assertThat(resultExpanded.isNull(np), equalTo(true)); - assertThat(resultExpanded.getValueCount(np++), equalTo(0)); - continue; - } - List oValues = BasicBlockTests.valuesAtPositions(origExpanded, op, op + 1).get(0); - for (Object ov : oValues) { - assertThat(resultExpanded.isNull(np), equalTo(false)); - assertThat(resultExpanded.getValueCount(np), equalTo(1)); - assertThat(BasicBlockTests.valuesAtPositions(resultExpanded, np, ++np).get(0), equalTo(List.of(ov))); + class BlockListIterator implements Iterator { + private final Iterator pagesIterator; + private final int channel; + private Block currentBlock; + private int nextPosition; + + BlockListIterator(List pages, int channel) { + this.pagesIterator = pages.iterator(); + this.channel = channel; + this.currentBlock = pagesIterator.next().getBlock(channel); + this.nextPosition = 0; + } + + @Override + public boolean hasNext() { + if (currentBlock == null) { + return false; + } + + return currentBlock.getValueCount(nextPosition) == 0 + || nextPosition < currentBlock.getPositionCount() + || pagesIterator.hasNext(); + } + + @Override + public Object next() { + if (currentBlock != null && currentBlock.getValueCount(nextPosition) == 0) { + nextPosition++; + if (currentBlock.getPositionCount() == nextPosition) { + loadNextBlock(); } + return null; } + List items = valuesAtPositions(currentBlock, nextPosition, nextPosition + 1).get(0); + nextPosition++; + if (currentBlock.getPositionCount() == nextPosition) { + loadNextBlock(); + } + return items.size() == 1 ? items.get(0) : items; + } - IntBlock origDuplicated = input.get(i).getBlock(1); - IntBlock resultDuplicated = results.get(i).getBlock(1); - np = 0; - for (int op = 0; op < origDuplicated.getPositionCount(); op++) { - int copies = origExpanded.isNull(op) ? 1 : origExpanded.getValueCount(op); - for (int c = 0; c < copies; c++) { - if (origDuplicated.isNull(op)) { - assertThat(resultDuplicated.isNull(np), equalTo(true)); - assertThat(resultDuplicated.getValueCount(np++), equalTo(0)); - continue; - } - assertThat(resultDuplicated.isNull(np), equalTo(false)); - assertThat(resultDuplicated.getValueCount(np), equalTo(origDuplicated.getValueCount(op))); - assertThat( - BasicBlockTests.valuesAtPositions(resultDuplicated, np, ++np).get(0), - equalTo(BasicBlockTests.valuesAtPositions(origDuplicated, op, op + 1).get(0)) - ); + private void loadNextBlock() { + if (pagesIterator.hasNext() == false) { + currentBlock = null; + return; + } + this.currentBlock = pagesIterator.next().getBlock(channel); + nextPosition = 0; + } + } + + class BlockListIteratorExpander implements Iterator { + private final Iterator pagesIterator; + private final int channel; + private Block currentBlock; + private int nextPosition; + private int nextInPosition; + + BlockListIteratorExpander(List pages, int channel) { + this.pagesIterator = pages.iterator(); + this.channel = channel; + this.currentBlock = pagesIterator.next().getBlock(channel); + this.nextPosition = 0; + this.nextInPosition = 0; + } + + @Override + public boolean hasNext() { + if (currentBlock == null) { + return false; + } + + return currentBlock.getValueCount(nextPosition) == 0 + || nextInPosition < currentBlock.getValueCount(nextPosition) + || nextPosition < currentBlock.getPositionCount() + || pagesIterator.hasNext(); + } + + @Override + public Object next() { + if (currentBlock != null && currentBlock.getValueCount(nextPosition) == 0) { + nextPosition++; + if (currentBlock.getPositionCount() == nextPosition) { + loadNextBlock(); } + return null; + } + List items = valuesAtPositions(currentBlock, nextPosition, nextPosition + 1).get(0); + Object result = items == null ? null : items.get(nextInPosition++); + if (nextInPosition == currentBlock.getValueCount(nextPosition)) { + nextPosition++; + nextInPosition = 0; + } + if (currentBlock.getPositionCount() == nextPosition) { + loadNextBlock(); + } + return result; + } + + private void loadNextBlock() { + if (pagesIterator.hasNext() == false) { + currentBlock = null; + return; + } + this.currentBlock = pagesIterator.next().getBlock(channel); + nextPosition = 0; + nextInPosition = 0; + } + } + + @Override + protected void assertSimpleOutput(List input, List results) { + assertThat(results, hasSize(results.size())); + + var inputIter = new BlockListIteratorExpander(input, 0); + var resultIter = new BlockListIteratorExpander(results, 0); + + while (inputIter.hasNext()) { + assertThat(resultIter.hasNext(), equalTo(true)); + assertThat(resultIter.next(), equalTo(inputIter.next())); + } + assertThat(resultIter.hasNext(), equalTo(false)); + + var originalMvIter = new BlockListIterator(input, 0); + var inputIter2 = new BlockListIterator(input, 1); + var resultIter2 = new BlockListIterator(results, 1); + + while (originalMvIter.hasNext()) { + Object originalMv = originalMvIter.next(); + int originalMvSize = originalMv instanceof List l ? l.size() : 1; + assertThat(resultIter2.hasNext(), equalTo(true)); + Object inputValue = inputIter2.next(); + for (int j = 0; j < originalMvSize; j++) { + assertThat(resultIter2.next(), equalTo(inputValue)); } } + assertThat(resultIter2.hasNext(), equalTo(false)); } @Override @@ -110,7 +208,7 @@ protected ByteSizeValue smallEnoughToCircuitBreak() { } public void testNoopStatus() { - MvExpandOperator op = new MvExpandOperator(0); + MvExpandOperator op = new MvExpandOperator(0, randomIntBetween(1, 1000)); List result = drive( op, List.of(new Page(IntVector.newVectorBuilder(2).appendInt(1).appendInt(2).build().asBlock())).iterator(), @@ -118,26 +216,45 @@ public void testNoopStatus() { ); assertThat(result, hasSize(1)); assertThat(valuesAtPositions(result.get(0).getBlock(0), 0, 2), equalTo(List.of(List.of(1), List.of(2)))); - MvExpandOperator.Status status = (MvExpandOperator.Status) op.status(); - assertThat(status.pagesProcessed(), equalTo(1)); + MvExpandOperator.Status status = op.status(); + assertThat(status.pagesIn(), equalTo(1)); + assertThat(status.pagesOut(), equalTo(1)); assertThat(status.noops(), equalTo(1)); } public void testExpandStatus() { - MvExpandOperator op = new MvExpandOperator(0); + MvExpandOperator op = new MvExpandOperator(0, randomIntBetween(1, 1)); var builder = IntBlock.newBlockBuilder(2).beginPositionEntry().appendInt(1).appendInt(2).endPositionEntry(); List result = drive(op, List.of(new Page(builder.build())).iterator(), driverContext()); assertThat(result, hasSize(1)); assertThat(valuesAtPositions(result.get(0).getBlock(0), 0, 2), equalTo(List.of(List.of(1), List.of(2)))); - MvExpandOperator.Status status = (MvExpandOperator.Status) op.status(); - assertThat(status.pagesProcessed(), equalTo(1)); + MvExpandOperator.Status status = op.status(); + assertThat(status.pagesIn(), equalTo(1)); + assertThat(status.pagesOut(), equalTo(1)); assertThat(status.noops(), equalTo(0)); } - // TODO: remove this once possible - // https://github.com/elastic/elasticsearch/issues/99826 - @Override - protected boolean canLeak() { - return true; + public void testExpandWithBytesRefs() { + DriverContext context = driverContext(); + List input = CannedSourceOperator.collectPages(new AbstractBlockSourceOperator(context.blockFactory(), 8 * 1024) { + private int idx; + + @Override + protected int remaining() { + return 10000 - idx; + } + + @Override + protected Page createPage(int positionOffset, int length) { + idx += length; + return new Page( + randomBlock(context.blockFactory(), ElementType.BYTES_REF, length, true, 1, 10, 0, 0).block(), + randomBlock(context.blockFactory(), ElementType.INT, length, false, 1, 10, 0, 0).block() + ); + } + }); + List origInput = deepCopyOf(input, BlockFactory.getNonBreakingInstance()); + List results = drive(new MvExpandOperator(0, randomIntBetween(1, 1000)), input.iterator(), context); + assertSimpleOutput(origInput, results); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java index 63f601669636c..5d881f03bd07f 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java @@ -212,7 +212,7 @@ protected final void assertSimple(DriverContext context, int size) { unreleasedInputs++; } } - if ((canLeak() == false) && unreleasedInputs > 0) { + if (unreleasedInputs > 0) { throw new AssertionError("[" + unreleasedInputs + "] unreleased input blocks"); } } @@ -308,12 +308,6 @@ protected void start(Driver driver, ActionListener driverListener) { } } - // TODO: Remove this once all operators do not leak anymore - // https://github.com/elastic/elasticsearch/issues/99826 - protected boolean canLeak() { - return false; - } - public static void assertDriverContext(DriverContext driverContext) { assertTrue(driverContext.isFinished()); assertThat(driverContext.getSnapshot().releasables(), empty()); diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mv_expand.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mv_expand.csv-spec index 7cc11c6fab5b3..ae27e8f56f9f7 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mv_expand.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/mv_expand.csv-spec @@ -24,3 +24,77 @@ a:integer | b:keyword | j:keyword 3 | b | "a" 3 | b | "b" ; + + +explosion +row +a = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30], +b = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30], +c = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30], +d = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30], +e = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30], +f = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30], +g = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30], +x = 10000000000000 +| mv_expand a | mv_expand b | mv_expand c | mv_expand d | mv_expand e | mv_expand f | mv_expand g +| limit 10; + +a:integer | b:integer | c:integer | d:integer | e:integer | f:integer | g:integer | x:long +1 | 1 | 1 | 1 | 1 | 1 | 1 | 10000000000000 +1 | 1 | 1 | 1 | 1 | 1 | 2 | 10000000000000 +1 | 1 | 1 | 1 | 1 | 1 | 3 | 10000000000000 +1 | 1 | 1 | 1 | 1 | 1 | 4 | 10000000000000 +1 | 1 | 1 | 1 | 1 | 1 | 5 | 10000000000000 +1 | 1 | 1 | 1 | 1 | 1 | 6 | 10000000000000 +1 | 1 | 1 | 1 | 1 | 1 | 7 | 10000000000000 +1 | 1 | 1 | 1 | 1 | 1 | 8 | 10000000000000 +1 | 1 | 1 | 1 | 1 | 1 | 9 | 10000000000000 +1 | 1 | 1 | 1 | 1 | 1 | 10 | 10000000000000 +; + + +explosionStats +row +a = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30], +b = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30], +c = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30], +d = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30], +e = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30], +x = 10000000000000 +| mv_expand a | mv_expand b | mv_expand c | mv_expand d | mv_expand e +| stats sum_a = sum(a) by b +| sort b; + +//12555000 = sum(1..30) * 30 * 30 * 30 +sum_a:long | b:integer +12555000 | 1 +12555000 | 2 +12555000 | 3 +12555000 | 4 +12555000 | 5 +12555000 | 6 +12555000 | 7 +12555000 | 8 +12555000 | 9 +12555000 | 10 +12555000 | 11 +12555000 | 12 +12555000 | 13 +12555000 | 14 +12555000 | 15 +12555000 | 16 +12555000 | 17 +12555000 | 18 +12555000 | 19 +12555000 | 20 +12555000 | 21 +12555000 | 22 +12555000 | 23 +12555000 | 24 +12555000 | 25 +12555000 | 26 +12555000 | 27 +12555000 | 28 +12555000 | 29 +12555000 | 30 +; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index b86072e1b6da0..bdc1c948f2055 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -584,7 +584,8 @@ private PhysicalOperation planLimit(LimitExec limit, LocalExecutionPlannerContex private PhysicalOperation planMvExpand(MvExpandExec mvExpandExec, LocalExecutionPlannerContext context) { PhysicalOperation source = plan(mvExpandExec.child(), context); - return source.with(new MvExpandOperator.Factory(source.layout.get(mvExpandExec.target().id()).channel()), source.layout); + int blockSize = 5000;// TODO estimate row size and use context.pageSize() + return source.with(new MvExpandOperator.Factory(source.layout.get(mvExpandExec.target().id()).channel(), blockSize), source.layout); } /**