Skip to content

Commit

Permalink
ESQL: Copy blocks on expand (#100778)
Browse files Browse the repository at this point in the history
Until we get lovely reference tracking, let's copy blocks when
"expanding" them. For now, that's simple and it works and helps make
memory tracking cleaner. I expect we won't copy forever. We can likely
share the array once we get reference tracking. But for now, let's copy!

Closes #100548
  • Loading branch information
nik9000 authored Oct 12, 2023
1 parent c1c8d1e commit 79b2e9a
Show file tree
Hide file tree
Showing 10 changed files with 282 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import java.util.Arrays;
import java.util.BitSet;
import java.util.stream.IntStream;

/**
* Block implementation that stores an array of boolean.
Expand Down Expand Up @@ -83,12 +82,21 @@ public BooleanBlock expand() {
if (firstValueIndexes == null) {
return this;
}
int end = firstValueIndexes[getPositionCount()];
if (nullsMask == null) {
return new BooleanArrayVector(values, end).asBlock();
// TODO use reference counting to share the values
try (var builder = blockFactory.newBooleanBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
continue;
}
int first = getFirstValueIndex(pos);
int end = first + getValueCount(pos);
for (int i = first; i < end; i++) {
builder.appendBoolean(getBoolean(i));
}
}
return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build();
}
int[] firstValues = IntStream.range(0, end + 1).toArray();
return new BooleanArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory);
}

public static long ramBytesEstimated(boolean[] values, int[] firstValueIndexes, BitSet nullsMask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.elasticsearch.core.Releasables;

import java.util.BitSet;
import java.util.stream.IntStream;

/**
* Block implementation that stores an array of BytesRef.
Expand Down Expand Up @@ -86,12 +85,22 @@ public BytesRefBlock expand() {
if (firstValueIndexes == null) {
return this;
}
int end = firstValueIndexes[getPositionCount()];
if (nullsMask == null) {
return new BytesRefArrayVector(values, end).asBlock();
// TODO use reference counting to share the values
final BytesRef scratch = new BytesRef();
try (var builder = blockFactory.newBytesRefBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
continue;
}
int first = getFirstValueIndex(pos);
int end = first + getValueCount(pos);
for (int i = first; i < end; i++) {
builder.appendBytesRef(getBytesRef(i, scratch));
}
}
return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build();
}
int[] firstValues = IntStream.range(0, end + 1).toArray();
return new BytesRefArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory);
}

public static long ramBytesEstimated(BytesRefArray values, int[] firstValueIndexes, BitSet nullsMask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import java.util.Arrays;
import java.util.BitSet;
import java.util.stream.IntStream;

/**
* Block implementation that stores an array of double.
Expand Down Expand Up @@ -83,12 +82,21 @@ public DoubleBlock expand() {
if (firstValueIndexes == null) {
return this;
}
int end = firstValueIndexes[getPositionCount()];
if (nullsMask == null) {
return new DoubleArrayVector(values, end).asBlock();
// TODO use reference counting to share the values
try (var builder = blockFactory.newDoubleBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
continue;
}
int first = getFirstValueIndex(pos);
int end = first + getValueCount(pos);
for (int i = first; i < end; i++) {
builder.appendDouble(getDouble(i));
}
}
return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build();
}
int[] firstValues = IntStream.range(0, end + 1).toArray();
return new DoubleArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory);
}

public static long ramBytesEstimated(double[] values, int[] firstValueIndexes, BitSet nullsMask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import java.util.Arrays;
import java.util.BitSet;
import java.util.stream.IntStream;

/**
* Block implementation that stores an array of int.
Expand Down Expand Up @@ -83,12 +82,21 @@ public IntBlock expand() {
if (firstValueIndexes == null) {
return this;
}
int end = firstValueIndexes[getPositionCount()];
if (nullsMask == null) {
return new IntArrayVector(values, end).asBlock();
// TODO use reference counting to share the values
try (var builder = blockFactory.newIntBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
continue;
}
int first = getFirstValueIndex(pos);
int end = first + getValueCount(pos);
for (int i = first; i < end; i++) {
builder.appendInt(getInt(i));
}
}
return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build();
}
int[] firstValues = IntStream.range(0, end + 1).toArray();
return new IntArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory);
}

public static long ramBytesEstimated(int[] values, int[] firstValueIndexes, BitSet nullsMask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import java.util.Arrays;
import java.util.BitSet;
import java.util.stream.IntStream;

/**
* Block implementation that stores an array of long.
Expand Down Expand Up @@ -83,12 +82,21 @@ public LongBlock expand() {
if (firstValueIndexes == null) {
return this;
}
int end = firstValueIndexes[getPositionCount()];
if (nullsMask == null) {
return new LongArrayVector(values, end).asBlock();
// TODO use reference counting to share the values
try (var builder = blockFactory.newLongBlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
continue;
}
int first = getFirstValueIndex(pos);
int end = first + getValueCount(pos);
for (int i = first; i < end; i++) {
builder.appendLong(getLong(i));
}
}
return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build();
}
int[] firstValues = IntStream.range(0, end + 1).toArray();
return new LongArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory);
}

public static long ramBytesEstimated(long[] values, int[] firstValueIndexes, BitSet nullsMask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import org.apache.lucene.util.RamUsageEstimator;
import java.util.Arrays;
$endif$
import java.util.BitSet;
import java.util.stream.IntStream;

/**
* Block implementation that stores an array of $type$.
Expand Down Expand Up @@ -69,9 +68,9 @@ $endif$

@Override
public $Type$Block filter(int... positions) {
$if(BytesRef)$
$if(BytesRef)$
final BytesRef scratch = new BytesRef();
$endif$
$endif$
try (var builder = blockFactory.new$Type$BlockBuilder(positions.length)) {
for (int pos : positions) {
if (isNull(pos)) {
Expand Down Expand Up @@ -104,12 +103,28 @@ $endif$
if (firstValueIndexes == null) {
return this;
}
int end = firstValueIndexes[getPositionCount()];
if (nullsMask == null) {
return new $Type$ArrayVector(values, end).asBlock();
// TODO use reference counting to share the values
$if(BytesRef)$
final BytesRef scratch = new BytesRef();
$endif$
try (var builder = blockFactory.new$Type$BlockBuilder(firstValueIndexes[getPositionCount()])) {
for (int pos = 0; pos < getPositionCount(); pos++) {
if (isNull(pos)) {
builder.appendNull();
continue;
}
int first = getFirstValueIndex(pos);
int end = first + getValueCount(pos);
for (int i = first; i < end; i++) {
$if(BytesRef)$
builder.append$Type$(get$Type$(i, scratch));
$else$
builder.append$Type$(get$Type$(i));
$endif$
}
}
return builder.mvOrdering(MvOrdering.DEDUPLICATED_AND_SORTED_ASCENDING).build();
}
int[] firstValues = IntStream.range(0, end + 1).toArray();
return new $Type$ArrayBlock(values, end, firstValues, shiftNullsToExpandedPositions(), MvOrdering.UNORDERED, blockFactory);
}

public static long ramBytesEstimated($if(BytesRef)$BytesRefArray$else$$type$[]$endif$ values, int[] firstValueIndexes, BitSet nullsMask) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
Expand All @@ -33,6 +35,7 @@
* </pre>
*/
public class MvExpandOperator implements Operator {
private static final Logger logger = LogManager.getLogger(MvExpandOperator.class);

public record Factory(int channel, int blockSize) implements OperatorFactory {
@Override
Expand All @@ -53,7 +56,7 @@ public String describe() {
private int noops;

private Page prev;
private boolean prevCompleted = false;
private boolean prevCompleted;
private boolean finished = false;

private Block expandingBlock;
Expand Down Expand Up @@ -81,49 +84,87 @@ public final Page getOutput() {
return null;
}
pagesOut++;
if (prev.getPositionCount() == 0 || expandingBlock.mayHaveMultivaluedFields() == false) {
noops++;
Page result = prev;
prev = null;
return result;
}

try {
return process();
} finally {
if (prevCompleted && prev != null) {
if (expandedBlock == null) {
/*
* If we're emitting the first block from this page
* then we have to expand it.
*/
logger.trace("starting {}", prev);
expandingBlock = prev.getBlock(channel);
if (expandingBlock.mayHaveMultivaluedFields() == false) {
logger.trace("can't have multivalued fields");
noops++;
Page result = prev;
prev = null;
expandingBlock = null;
return result;
}
expandedBlock = expandingBlock.expand();
if (expandedBlock == expandingBlock) {
// The expand was a noop - just return the previous page and clear state.
logger.trace("expanded to same");
noops++;
Page result = prev;
prev = null;
expandingBlock = null;
expandedBlock = null;
return result;
}
if (prev.getBlockCount() == 1) {
/*
* The expand wasn't a noop, but there's only a single block in the result
* so the expansion didn't really make it take more memory. It should be safe
* to return it directly.
*/
logger.trace("single block output");
assert channel == 0;
prev.releaseBlocks();
prev = null;
expandingBlock = null;
Page result = new Page(expandedBlock);
expandedBlock = null;
return result;
}
}
logger.trace("slicing");
return sliceExpandedIntoPages();
}

protected Page process() {
if (expandedBlock == expandingBlock) {
noops++;
prevCompleted = true;
return prev;
}
if (prev.getBlockCount() == 1) {
assert channel == 0;
prevCompleted = true;
return new Page(expandedBlock);
}

private Page sliceExpandedIntoPages() {
prevCompleted = false;
int[] duplicateFilter = nextDuplicateExpandingFilter();

Block[] result = new Block[prev.getBlockCount()];
int[] expandedMask = new int[duplicateFilter.length];
for (int i = 0; i < expandedMask.length; i++) {
expandedMask[i] = i + nextItemOnExpanded;
}
nextItemOnExpanded += expandedMask.length;
for (int b = 0; b < result.length; b++) {
result[b] = b == channel ? expandedBlock.filter(expandedMask) : prev.getBlock(b).filter(duplicateFilter);
boolean success = false;
try {
int[] expandedMask = new int[duplicateFilter.length];
for (int i = 0; i < expandedMask.length; i++) {
expandedMask[i] = i + nextItemOnExpanded;
}
nextItemOnExpanded += expandedMask.length;
for (int b = 0; b < result.length; b++) {
result[b] = b == channel ? expandedBlock.filter(expandedMask) : prev.getBlock(b).filter(duplicateFilter);
}
success = true;
} finally {
if (success == false) {
Releasables.closeExpectNoException(result);
}
}
if (nextItemOnExpanded == expandedBlock.getPositionCount()) {
nextItemOnExpanded = 0;
}
if (prevCompleted) {
Releasables.closeExpectNoException(() -> {
if (prev != null) {
prev.releaseBlocks();
prev = null;
}
}, expandedBlock);
expandingBlock = null;
expandedBlock = null;
}
return new Page(result);
}

Expand Down Expand Up @@ -175,9 +216,7 @@ public final void addInput(Page page) {
assert prev == null : "has pending input page";
prev = page;
this.expandingBlock = prev.getBlock(channel);
this.expandedBlock = expandingBlock.expand();
pagesIn++;
prevCompleted = false;
}

@Override
Expand All @@ -197,9 +236,11 @@ public final Status status() {

@Override
public void close() {
if (prev != null) {
Releasables.closeExpectNoException(() -> prev.releaseBlocks());
}
Releasables.closeExpectNoException(() -> {
if (prev != null) {
prev.releaseBlocks();
}
}, expandedBlock);
}

@Override
Expand Down
Loading

0 comments on commit 79b2e9a

Please sign in to comment.