From c074fd11246026363870fd95b85bfdd4b6fcfda6 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Thu, 12 Oct 2023 14:22:53 -0400 Subject: [PATCH] ESQL: Copy blocks on expand (#100778) Until we get lovely reference tracking, let's copy blocks when "expanding" them. For now, that's simple and it works and helps make memory tracking cleaner. I expect we won't copy forever. We can likely share the array once we get reference tracking. But for now, let's copy! Closes #100548 --- .../compute/data/BooleanArrayBlock.java | 20 ++- .../compute/data/BytesRefArrayBlock.java | 21 ++- .../compute/data/DoubleArrayBlock.java | 20 ++- .../compute/data/IntArrayBlock.java | 20 ++- .../compute/data/LongArrayBlock.java | 20 ++- .../compute/data/X-ArrayBlock.java.st | 31 +++- .../compute/operator/MvExpandOperator.java | 111 +++++++++---- .../compute/data/BlockMultiValuedTests.java | 151 ++++++++++++------ .../operator/MvExpandOperatorTests.java | 9 +- .../compute/operator/OperatorTestCase.java | 8 +- 10 files changed, 282 insertions(+), 129 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanArrayBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanArrayBlock.java index 9a66bf00fa71f..eb9364c57e755 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BooleanArrayBlock.java @@ -11,7 +11,6 @@ import java.util.Arrays; import java.util.BitSet; -import java.util.stream.IntStream; /** * Block implementation that stores an array of boolean. @@ -83,12 +82,21 @@ public BooleanBlock expand() { if (firstValueIndexes == null) { return this; } - int end = firstValueIndexes[getPositionCount()]; - if (nullsMask == null) { - return new BooleanArrayVector(values, end).asBlock(); + // TODO use reference counting to share the values + try (var builder = blockFactory.newBooleanBlockBuilder(firstValueIndexes[getPositionCount()])) { + for (int pos = 0; pos < getPositionCount(); pos++) { + if (isNull(pos)) { + builder.appendNull(); + continue; + } + int first = getFirstValueIndex(pos); + int end = first + getValueCount(pos); + for (int i = first; i < end; i++) { + builder.appendBoolean(getBoolean(i)); + } + } + return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build(); } - int[] firstValues = IntStream.range(0, end + 1).toArray(); - return new BooleanArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory); } public static long ramBytesEstimated(boolean[] values, int[] firstValueIndexes, BitSet nullsMask) { diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefArrayBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefArrayBlock.java index 9e6631b6807c6..b2729ed370b32 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/BytesRefArrayBlock.java @@ -13,7 +13,6 @@ import org.elasticsearch.core.Releasables; import java.util.BitSet; -import java.util.stream.IntStream; /** * Block implementation that stores an array of BytesRef. @@ -86,12 +85,22 @@ public BytesRefBlock expand() { if (firstValueIndexes == null) { return this; } - int end = firstValueIndexes[getPositionCount()]; - if (nullsMask == null) { - return new BytesRefArrayVector(values, end).asBlock(); + // TODO use reference counting to share the values + final BytesRef scratch = new BytesRef(); + try (var builder = blockFactory.newBytesRefBlockBuilder(firstValueIndexes[getPositionCount()])) { + for (int pos = 0; pos < getPositionCount(); pos++) { + if (isNull(pos)) { + builder.appendNull(); + continue; + } + int first = getFirstValueIndex(pos); + int end = first + getValueCount(pos); + for (int i = first; i < end; i++) { + builder.appendBytesRef(getBytesRef(i, scratch)); + } + } + return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build(); } - int[] firstValues = IntStream.range(0, end + 1).toArray(); - return new BytesRefArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory); } public static long ramBytesEstimated(BytesRefArray values, int[] firstValueIndexes, BitSet nullsMask) { diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleArrayBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleArrayBlock.java index f9e1fe0c6e199..b6b1fae0ded03 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/DoubleArrayBlock.java @@ -11,7 +11,6 @@ import java.util.Arrays; import java.util.BitSet; -import java.util.stream.IntStream; /** * Block implementation that stores an array of double. @@ -83,12 +82,21 @@ public DoubleBlock expand() { if (firstValueIndexes == null) { return this; } - int end = firstValueIndexes[getPositionCount()]; - if (nullsMask == null) { - return new DoubleArrayVector(values, end).asBlock(); + // TODO use reference counting to share the values + try (var builder = blockFactory.newDoubleBlockBuilder(firstValueIndexes[getPositionCount()])) { + for (int pos = 0; pos < getPositionCount(); pos++) { + if (isNull(pos)) { + builder.appendNull(); + continue; + } + int first = getFirstValueIndex(pos); + int end = first + getValueCount(pos); + for (int i = first; i < end; i++) { + builder.appendDouble(getDouble(i)); + } + } + return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build(); } - int[] firstValues = IntStream.range(0, end + 1).toArray(); - return new DoubleArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory); } public static long ramBytesEstimated(double[] values, int[] firstValueIndexes, BitSet nullsMask) { diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntArrayBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntArrayBlock.java index 95344bd8367c0..31f71d292f95d 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/IntArrayBlock.java @@ -11,7 +11,6 @@ import java.util.Arrays; import java.util.BitSet; -import java.util.stream.IntStream; /** * Block implementation that stores an array of int. @@ -83,12 +82,21 @@ public IntBlock expand() { if (firstValueIndexes == null) { return this; } - int end = firstValueIndexes[getPositionCount()]; - if (nullsMask == null) { - return new IntArrayVector(values, end).asBlock(); + // TODO use reference counting to share the values + try (var builder = blockFactory.newIntBlockBuilder(firstValueIndexes[getPositionCount()])) { + for (int pos = 0; pos < getPositionCount(); pos++) { + if (isNull(pos)) { + builder.appendNull(); + continue; + } + int first = getFirstValueIndex(pos); + int end = first + getValueCount(pos); + for (int i = first; i < end; i++) { + builder.appendInt(getInt(i)); + } + } + return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build(); } - int[] firstValues = IntStream.range(0, end + 1).toArray(); - return new IntArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory); } public static long ramBytesEstimated(int[] values, int[] firstValueIndexes, BitSet nullsMask) { diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongArrayBlock.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongArrayBlock.java index a45abb1ed9248..8a71703441ebb 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongArrayBlock.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/LongArrayBlock.java @@ -11,7 +11,6 @@ import java.util.Arrays; import java.util.BitSet; -import java.util.stream.IntStream; /** * Block implementation that stores an array of long. @@ -83,12 +82,21 @@ public LongBlock expand() { if (firstValueIndexes == null) { return this; } - int end = firstValueIndexes[getPositionCount()]; - if (nullsMask == null) { - return new LongArrayVector(values, end).asBlock(); + // TODO use reference counting to share the values + try (var builder = blockFactory.newLongBlockBuilder(firstValueIndexes[getPositionCount()])) { + for (int pos = 0; pos < getPositionCount(); pos++) { + if (isNull(pos)) { + builder.appendNull(); + continue; + } + int first = getFirstValueIndex(pos); + int end = first + getValueCount(pos); + for (int i = first; i < end; i++) { + builder.appendLong(getLong(i)); + } + } + return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build(); } - int[] firstValues = IntStream.range(0, end + 1).toArray(); - return new LongArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory); } public static long ramBytesEstimated(long[] values, int[] firstValueIndexes, BitSet nullsMask) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-ArrayBlock.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-ArrayBlock.java.st index 6a8185b43ecab..49a4c43709cde 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-ArrayBlock.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-ArrayBlock.java.st @@ -19,7 +19,6 @@ import org.apache.lucene.util.RamUsageEstimator; import java.util.Arrays; $endif$ import java.util.BitSet; -import java.util.stream.IntStream; /** * Block implementation that stores an array of $type$. @@ -69,9 +68,9 @@ $endif$ @Override public $Type$Block filter(int... positions) { - $if(BytesRef)$ +$if(BytesRef)$ final BytesRef scratch = new BytesRef(); - $endif$ +$endif$ try (var builder = blockFactory.new$Type$BlockBuilder(positions.length)) { for (int pos : positions) { if (isNull(pos)) { @@ -104,12 +103,28 @@ $endif$ if (firstValueIndexes == null) { return this; } - int end = firstValueIndexes[getPositionCount()]; - if (nullsMask == null) { - return new $Type$ArrayVector(values, end).asBlock(); + // TODO use reference counting to share the values +$if(BytesRef)$ + final BytesRef scratch = new BytesRef(); +$endif$ + try (var builder = blockFactory.new$Type$BlockBuilder(firstValueIndexes[getPositionCount()])) { + for (int pos = 0; pos < getPositionCount(); pos++) { + if (isNull(pos)) { + builder.appendNull(); + continue; + } + int first = getFirstValueIndex(pos); + int end = first + getValueCount(pos); + for (int i = first; i < end; i++) { +$if(BytesRef)$ + builder.append$Type$(get$Type$(i, scratch)); +$else$ + builder.append$Type$(get$Type$(i)); +$endif$ + } + } + return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build(); } - int[] firstValues = IntStream.range(0, end + 1).toArray(); - return new $Type$ArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory); } public static long ramBytesEstimated($if(BytesRef)$BytesRefArray$else$$type$[]$endif$ values, int[] firstValueIndexes, BitSet nullsMask) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MvExpandOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MvExpandOperator.java index c322520d8853b..503913dbb67e8 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MvExpandOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MvExpandOperator.java @@ -14,6 +14,8 @@ import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.Page; import org.elasticsearch.core.Releasables; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; @@ -33,6 +35,7 @@ * */ public class MvExpandOperator implements Operator { + private static final Logger logger = LogManager.getLogger(MvExpandOperator.class); public record Factory(int channel, int blockSize) implements OperatorFactory { @Override @@ -53,7 +56,7 @@ public String describe() { private int noops; private Page prev; - private boolean prevCompleted = false; + private boolean prevCompleted; private boolean finished = false; private Block expandingBlock; @@ -81,49 +84,87 @@ public final Page getOutput() { return null; } pagesOut++; - if (prev.getPositionCount() == 0 || expandingBlock.mayHaveMultivaluedFields() == false) { - noops++; - Page result = prev; - prev = null; - return result; - } - try { - return process(); - } finally { - if (prevCompleted && prev != null) { + if (expandedBlock == null) { + /* + * If we're emitting the first block from this page + * then we have to expand it. + */ + logger.trace("starting {}", prev); + expandingBlock = prev.getBlock(channel); + if (expandingBlock.mayHaveMultivaluedFields() == false) { + logger.trace("can't have multivalued fields"); + noops++; + Page result = prev; + prev = null; + expandingBlock = null; + return result; + } + expandedBlock = expandingBlock.expand(); + if (expandedBlock == expandingBlock) { + // The expand was a noop - just return the previous page and clear state. + logger.trace("expanded to same"); + noops++; + Page result = prev; + prev = null; + expandingBlock = null; + expandedBlock = null; + return result; + } + if (prev.getBlockCount() == 1) { + /* + * The expand wasn't a noop, but there's only a single block in the result + * so the expansion didn't really make it take more memory. It should be safe + * to return it directly. + */ + logger.trace("single block output"); + assert channel == 0; prev.releaseBlocks(); prev = null; + expandingBlock = null; + Page result = new Page(expandedBlock); + expandedBlock = null; + return result; } } + logger.trace("slicing"); + return sliceExpandedIntoPages(); } - protected Page process() { - if (expandedBlock == expandingBlock) { - noops++; - prevCompleted = true; - return prev; - } - if (prev.getBlockCount() == 1) { - assert channel == 0; - prevCompleted = true; - return new Page(expandedBlock); - } - + private Page sliceExpandedIntoPages() { + prevCompleted = false; int[] duplicateFilter = nextDuplicateExpandingFilter(); Block[] result = new Block[prev.getBlockCount()]; - int[] expandedMask = new int[duplicateFilter.length]; - for (int i = 0; i < expandedMask.length; i++) { - expandedMask[i] = i + nextItemOnExpanded; - } - nextItemOnExpanded += expandedMask.length; - for (int b = 0; b < result.length; b++) { - result[b] = b == channel ? expandedBlock.filter(expandedMask) : prev.getBlock(b).filter(duplicateFilter); + boolean success = false; + try { + int[] expandedMask = new int[duplicateFilter.length]; + for (int i = 0; i < expandedMask.length; i++) { + expandedMask[i] = i + nextItemOnExpanded; + } + nextItemOnExpanded += expandedMask.length; + for (int b = 0; b < result.length; b++) { + result[b] = b == channel ? expandedBlock.filter(expandedMask) : prev.getBlock(b).filter(duplicateFilter); + } + success = true; + } finally { + if (success == false) { + Releasables.closeExpectNoException(result); + } } if (nextItemOnExpanded == expandedBlock.getPositionCount()) { nextItemOnExpanded = 0; } + if (prevCompleted) { + Releasables.closeExpectNoException(() -> { + if (prev != null) { + prev.releaseBlocks(); + prev = null; + } + }, expandedBlock); + expandingBlock = null; + expandedBlock = null; + } return new Page(result); } @@ -175,9 +216,7 @@ public final void addInput(Page page) { assert prev == null : "has pending input page"; prev = page; this.expandingBlock = prev.getBlock(channel); - this.expandedBlock = expandingBlock.expand(); pagesIn++; - prevCompleted = false; } @Override @@ -197,9 +236,11 @@ public final Status status() { @Override public void close() { - if (prev != null) { - Releasables.closeExpectNoException(() -> prev.releaseBlocks()); - } + Releasables.closeExpectNoException(() -> { + if (prev != null) { + prev.releaseBlocks(); + } + }, expandedBlock); } @Override diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java index 9a362ad4e3ca3..1b0e61cea8135 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/data/BlockMultiValuedTests.java @@ -11,7 +11,14 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.PageCacheRecycler; +import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.test.ESTestCase; +import org.junit.After; import java.util.ArrayList; import java.util.Arrays; @@ -45,20 +52,23 @@ public BlockMultiValuedTests(@Name("elementType") ElementType elementType, @Name public void testMultiValued() { int positionCount = randomIntBetween(1, 16 * 1024); - var b = BasicBlockTests.randomBlock(elementType, positionCount, nullAllowed, 0, 10, 0, 0); + var b = BasicBlockTests.randomBlock(blockFactory(), elementType, positionCount, nullAllowed, 0, 10, 0, 0); + try { + assertThat(b.block().getPositionCount(), equalTo(positionCount)); + assertThat(b.block().getTotalValueCount(), equalTo(b.valueCount())); + for (int p = 0; p < positionCount; p++) { + BlockTestUtils.assertPositionValues(b.block(), p, equalTo(b.values().get(p))); + } - assertThat(b.block().getPositionCount(), equalTo(positionCount)); - assertThat(b.block().getTotalValueCount(), equalTo(b.valueCount())); - for (int p = 0; p < positionCount; p++) { - BlockTestUtils.assertPositionValues(b.block(), p, equalTo(b.values().get(p))); + assertThat(b.block().mayHaveMultivaluedFields(), equalTo(b.values().stream().anyMatch(l -> l != null && l.size() > 1))); + } finally { + b.block().close(); } - - assertThat(b.block().mayHaveMultivaluedFields(), equalTo(b.values().stream().anyMatch(l -> l != null && l.size() > 1))); } public void testExpand() { int positionCount = randomIntBetween(1, 16 * 1024); - var b = BasicBlockTests.randomBlock(elementType, positionCount, nullAllowed, 0, 100, 0, 0); + var b = BasicBlockTests.randomBlock(blockFactory(), elementType, positionCount, nullAllowed, 0, 100, 0, 0); assertExpanded(b.block()); } @@ -96,31 +106,37 @@ public void testFilteredJumbledSubsetThenExpanded() { private void assertFiltered(boolean all, boolean shuffled) { int positionCount = randomIntBetween(1, 16 * 1024); - var b = BasicBlockTests.randomBlock(elementType, positionCount, nullAllowed, 0, 10, 0, 0); - int[] positions = randomFilterPositions(b.block(), all, shuffled); - Block filtered = b.block().filter(positions); - - assertThat(filtered.getPositionCount(), equalTo(positions.length)); - - int expectedValueCount = 0; - for (int p : positions) { - List values = b.values().get(p); - if (values != null) { - expectedValueCount += values.size(); + var b = BasicBlockTests.randomBlock(blockFactory(), elementType, positionCount, nullAllowed, 0, 10, 0, 0); + try { + int[] positions = randomFilterPositions(b.block(), all, shuffled); + Block filtered = b.block().filter(positions); + try { + assertThat(filtered.getPositionCount(), equalTo(positions.length)); + + int expectedValueCount = 0; + for (int p : positions) { + List values = b.values().get(p); + if (values != null) { + expectedValueCount += values.size(); + } + } + assertThat(filtered.getTotalValueCount(), equalTo(expectedValueCount)); + for (int r = 0; r < positions.length; r++) { + if (b.values().get(positions[r]) == null) { + assertThat(filtered.getValueCount(r), equalTo(0)); + assertThat(filtered.isNull(r), equalTo(true)); + } else { + assertThat(filtered.getValueCount(r), equalTo(b.values().get(positions[r]).size())); + assertThat(BasicBlockTests.valuesAtPositions(filtered, r, r + 1).get(0), equalTo(b.values().get(positions[r]))); + } + } + } finally { + filtered.close(); } + assertThat(b.block().mayHaveMultivaluedFields(), equalTo(b.values().stream().anyMatch(l -> l != null && l.size() > 1))); + } finally { + b.block().close(); } - assertThat(filtered.getTotalValueCount(), equalTo(expectedValueCount)); - for (int r = 0; r < positions.length; r++) { - if (b.values().get(positions[r]) == null) { - assertThat(filtered.getValueCount(r), equalTo(0)); - assertThat(filtered.isNull(r), equalTo(true)); - } else { - assertThat(filtered.getValueCount(r), equalTo(b.values().get(positions[r]).size())); - assertThat(BasicBlockTests.valuesAtPositions(filtered, r, r + 1).get(0), equalTo(b.values().get(positions[r]))); - } - } - - assertThat(b.block().mayHaveMultivaluedFields(), equalTo(b.values().stream().anyMatch(l -> l != null && l.size() > 1))); } private int[] randomFilterPositions(Block orig, boolean all, boolean shuffled) { @@ -135,30 +151,65 @@ private int[] randomFilterPositions(Block orig, boolean all, boolean shuffled) { } private void assertExpanded(Block orig) { - Block expanded = orig.expand(); - assertThat(expanded.getPositionCount(), equalTo(orig.getTotalValueCount() + orig.nullValuesCount())); - assertThat(expanded.getTotalValueCount(), equalTo(orig.getTotalValueCount())); - - int np = 0; - for (int op = 0; op < orig.getPositionCount(); op++) { - if (orig.isNull(op)) { - assertThat(expanded.isNull(np), equalTo(true)); - assertThat(expanded.getValueCount(np++), equalTo(0)); - continue; - } - List oValues = BasicBlockTests.valuesAtPositions(orig, op, op + 1).get(0); - for (Object ov : oValues) { - assertThat(expanded.isNull(np), equalTo(false)); - assertThat(expanded.getValueCount(np), equalTo(1)); - assertThat(BasicBlockTests.valuesAtPositions(expanded, np, ++np).get(0), equalTo(List.of(ov))); + try (orig; Block expanded = orig.expand()) { + assertThat(expanded.getPositionCount(), equalTo(orig.getTotalValueCount() + orig.nullValuesCount())); + assertThat(expanded.getTotalValueCount(), equalTo(orig.getTotalValueCount())); + + int np = 0; + for (int op = 0; op < orig.getPositionCount(); op++) { + if (orig.isNull(op)) { + assertThat(expanded.isNull(np), equalTo(true)); + assertThat(expanded.getValueCount(np++), equalTo(0)); + continue; + } + List oValues = BasicBlockTests.valuesAtPositions(orig, op, op + 1).get(0); + for (Object ov : oValues) { + assertThat(expanded.isNull(np), equalTo(false)); + assertThat(expanded.getValueCount(np), equalTo(1)); + assertThat(BasicBlockTests.valuesAtPositions(expanded, np, ++np).get(0), equalTo(List.of(ov))); + } } } } private void assertFilteredThenExpanded(boolean all, boolean shuffled) { int positionCount = randomIntBetween(1, 16 * 1024); - var b = BasicBlockTests.randomBlock(elementType, positionCount, nullAllowed, 0, 10, 0, 0); - int[] positions = randomFilterPositions(b.block(), all, shuffled); - assertExpanded(b.block().filter(positions)); + var b = BasicBlockTests.randomBlock(blockFactory(), elementType, positionCount, nullAllowed, 0, 10, 0, 0); + try { + int[] positions = randomFilterPositions(b.block(), all, shuffled); + assertExpanded(b.block().filter(positions)); + } finally { + b.block().close(); + } + } + + private final List breakers = new ArrayList<>(); + private final List blockFactories = new ArrayList<>(); + + /** + * A {@link DriverContext} with a breaking {@link BigArrays} and {@link BlockFactory}. + */ + protected BlockFactory blockFactory() { // TODO move this to driverContext once everyone supports breaking + BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, ByteSizeValue.ofGb(1)).withCircuitBreaking(); + CircuitBreaker breaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST); + breakers.add(breaker); + BlockFactory factory = new MockBlockFactory(breaker, bigArrays); + blockFactories.add(factory); + return factory; + } + + @After + public void allBreakersEmpty() throws Exception { + // first check that all big arrays are released, which can affect breakers + MockBigArrays.ensureAllArraysAreReleased(); + + for (CircuitBreaker breaker : breakers) { + for (var factory : blockFactories) { + if (factory instanceof MockBlockFactory mockBlockFactory) { + mockBlockFactory.ensureAllBlocksAreReleased(); + } + } + assertThat("Unexpected used in breaker: " + breaker, breaker.getUsed(), equalTo(0L)); + } } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java index f99685609ff78..a105818a2bf03 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java @@ -40,8 +40,8 @@ protected int remaining() { protected Page createPage(int positionOffset, int length) { idx += length; return new Page( - randomBlock(ElementType.INT, length, true, 1, 10, 0, 0).block(), - randomBlock(ElementType.INT, length, false, 1, 10, 0, 0).block() + randomBlock(blockFactory, ElementType.INT, length, true, 1, 10, 0, 0).block(), + randomBlock(blockFactory, ElementType.INT, length, false, 1, 10, 0, 0).block() ); } }; @@ -257,4 +257,9 @@ protected Page createPage(int positionOffset, int length) { List results = drive(new MvExpandOperator(0, randomIntBetween(1, 1000)), input.iterator(), context); assertSimpleOutput(origInput, results); } + + @Override + protected DriverContext driverContext() { + return breakingDriverContext(); + } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java index 505714fee4d65..cf8cca4de652f 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java @@ -129,17 +129,17 @@ public final void testSimpleWithCranky() { BlockFactory blockFactory = BlockFactory.getInstance(cranky.getBreaker(CircuitBreaker.REQUEST), bigArrays); DriverContext driverContext = new DriverContext(bigArrays, blockFactory); - boolean[] driverStarted = new boolean[1]; + boolean driverStarted = false; try { Operator operator = simple(bigArrays).get(driverContext); - driverStarted[0] = true; - List result = drive(operator, input.iterator(), driverContext); + driverStarted = true; + drive(operator, input.iterator(), driverContext); // Either we get lucky and cranky doesn't throw and the test completes or we don't and it throws } catch (CircuitBreakingException e) { logger.info("broken", e); assertThat(e.getMessage(), equalTo(CrankyCircuitBreakerService.ERROR_MESSAGE)); } - if (driverStarted[0] == false) { + if (driverStarted == false) { // if drive hasn't even started then we need to release the input pages Releasables.closeExpectNoException(Releasables.wrap(() -> Iterators.map(input.iterator(), p -> p::releaseBlocks))); }