Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: Paginate MV_EXPAND output #100598

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
Expand All @@ -31,11 +32,12 @@
* 2 | 2 | "foo"
* </pre>
*/
public class MvExpandOperator extends AbstractPageMappingOperator {
public record Factory(int channel) implements OperatorFactory {
public class MvExpandOperator implements Operator {

public record Factory(int channel, int blockSize) implements OperatorFactory {
@Override
public Operator get(DriverContext driverContext) {
return new MvExpandOperator(channel);
return new MvExpandOperator(channel, blockSize);
}

@Override
Expand All @@ -46,85 +48,201 @@ public String describe() {

private final int channel;

private final int pageSize;

private int noops;

public MvExpandOperator(int channel) {
private Page prev;
private boolean prevCompleted = false;
private boolean finished = false;

private Block expandingBlock;
private Block expandedBlock;

private int nextPositionToProcess = 0;
private int nextMvToProcess = 0;
private int nextItemOnExpanded = 0;

/**
* Count of pages that have been processed by this operator.
*/
private int pagesIn;
private int pagesOut;

public MvExpandOperator(int channel, int pageSize) {
this.channel = channel;
this.pageSize = pageSize;
assert pageSize > 0;
}

@Override
protected Page process(Page page) {
Block expandingBlock = page.getBlock(channel);
Block expandedBlock = expandingBlock.expand();
public final Page getOutput() {
if (prev == null) {
return null;
}
pagesOut++;
if (prev.getPositionCount() == 0 || expandingBlock.mayHaveMultivaluedFields() == false) {
noops++;
Page result = prev;
prev = null;
return result;
}

try {
return process();
} finally {
if (prevCompleted && prev != null) {
prev.releaseBlocks();
prev = null;
}
}
}

protected Page process() {
if (expandedBlock == expandingBlock) {
noops++;
return page;
prevCompleted = true;
return prev;
}
if (page.getBlockCount() == 1) {
if (prev.getBlockCount() == 1) {
assert channel == 0;
prevCompleted = true;
return new Page(expandedBlock);
}

int[] duplicateFilter = buildDuplicateExpandingFilter(expandingBlock, expandedBlock.getPositionCount());
int[] duplicateFilter = nextDuplicateExpandingFilter();

Block[] result = new Block[page.getBlockCount()];
Block[] result = new Block[prev.getBlockCount()];
int[] expandedMask = new int[duplicateFilter.length];
for (int i = 0; i < expandedMask.length; i++) {
expandedMask[i] = i + nextItemOnExpanded;
}
nextItemOnExpanded += expandedMask.length;
for (int b = 0; b < result.length; b++) {
result[b] = b == channel ? expandedBlock : page.getBlock(b).filter(duplicateFilter);
result[b] = b == channel ? expandedBlock.filter(expandedMask) : prev.getBlock(b).filter(duplicateFilter);
}
if (nextItemOnExpanded == expandedBlock.getPositionCount()) {
nextItemOnExpanded = 0;
}
return new Page(result);
}

private int[] buildDuplicateExpandingFilter(Block expandingBlock, int newPositions) {
int[] duplicateFilter = new int[newPositions];
private int[] nextDuplicateExpandingFilter() {
int[] duplicateFilter = new int[Math.min(pageSize, expandedBlock.getPositionCount() - nextPositionToProcess)];
int n = 0;
for (int p = 0; p < expandingBlock.getPositionCount(); p++) {
int count = expandingBlock.getValueCount(p);
while (true) {
int count = expandingBlock.getValueCount(nextPositionToProcess);
int positions = count == 0 ? 1 : count;
Arrays.fill(duplicateFilter, n, n + positions, p);
n += positions;
int toAdd = Math.min(pageSize - n, positions - nextMvToProcess);
Arrays.fill(duplicateFilter, n, n + toAdd, nextPositionToProcess);
n += toAdd;

if (n == pageSize) {
if (nextMvToProcess + toAdd == positions) {
// finished expanding this position, let's move on to next position (that will be expanded with next call)
nextMvToProcess = 0;
nextPositionToProcess++;
if (nextPositionToProcess == expandingBlock.getPositionCount()) {
nextPositionToProcess = 0;
prevCompleted = true;
}
} else {
// there are still items to expand in current position, but the duplicate filter is full, so we'll deal with them at
// next call
nextMvToProcess = nextMvToProcess + toAdd;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth comments about the meanings of these two arms. I think one is "we're done expanding this page" and another is "we've filled the page we're building and maybe exhausted the current position. But we might not have, we might be half way through expanding a position.

}
return duplicateFilter;
}

nextMvToProcess = 0;
nextPositionToProcess++;
if (nextPositionToProcess == expandingBlock.getPositionCount()) {
nextPositionToProcess = 0;
nextMvToProcess = 0;
prevCompleted = true;
return n < pageSize ? Arrays.copyOfRange(duplicateFilter, 0, n) : duplicateFilter;
}
}
return duplicateFilter;
}

@Override
protected AbstractPageMappingOperator.Status status(int pagesProcessed) {
return new Status(pagesProcessed, noops);
public final boolean needsInput() {
return prev == null && finished == false;
}

@Override
public final void addInput(Page page) {
assert prev == null : "has pending input page";
prev = page;
this.expandingBlock = prev.getBlock(channel);
this.expandedBlock = expandingBlock.expand();
pagesIn++;
prevCompleted = false;
}

@Override
public final void finish() {
finished = true;
}

@Override
public final boolean isFinished() {
return finished && prev == null;
}

@Override
public final Status status() {
return new Status(pagesIn, pagesOut, noops);
}

@Override
public void close() {
if (prev != null) {
Releasables.closeExpectNoException(() -> prev.releaseBlocks());
}
}

@Override
public String toString() {
return "MvExpandOperator[channel=" + channel + "]";
}

public static final class Status extends AbstractPageMappingOperator.Status {
public static final class Status implements Operator.Status {

private final int pagesIn;
private final int pagesOut;
private final int noops;

public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
Operator.Status.class,
"mv_expand",
Status::new
);

private final int noops;

Status(int pagesProcessed, int noops) {
super(pagesProcessed);
Status(int pagesIn, int pagesOut, int noops) {
this.pagesIn = pagesIn;
this.pagesOut = pagesOut;
this.noops = noops;
}

Status(StreamInput in) throws IOException {
super(in);
pagesIn = in.readVInt();
pagesOut = in.readVInt();
noops = in.readVInt();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(pagesIn);
out.writeVInt(pagesOut);
out.writeVInt(noops);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("pages_processed", pagesProcessed());
builder.field("pages_in", pagesIn);
builder.field("pages_out", pagesOut);
builder.field("noops", noops);
return builder.endObject();
}
Expand All @@ -147,12 +265,20 @@ public boolean equals(Object o) {
return false;
}
Status status = (Status) o;
return noops == status.noops && pagesProcessed() == status.pagesProcessed();
return noops == status.noops && pagesIn == status.pagesIn && pagesOut == status.pagesOut;
}

public int pagesIn() {
return pagesIn;
}

public int pagesOut() {
return pagesOut;
}

@Override
public int hashCode() {
return Objects.hash(noops, pagesProcessed());
return Objects.hash(noops, pagesIn, pagesOut);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

public class MvExpandOperatorStatusTests extends AbstractWireSerializingTestCase<MvExpandOperator.Status> {
public static MvExpandOperator.Status simple() {
return new MvExpandOperator.Status(10, 9);
return new MvExpandOperator.Status(10, 15, 9);
}

public static String simpleToJson() {
return """
{"pages_processed":10,"noops":9}""";
{"pages_in":10,"pages_out":15,"noops":9}""";
}

public void testToXContent() {
Expand All @@ -35,20 +35,28 @@ protected Writeable.Reader<MvExpandOperator.Status> instanceReader() {

@Override
public MvExpandOperator.Status createTestInstance() {
return new MvExpandOperator.Status(randomNonNegativeInt(), randomNonNegativeInt());
return new MvExpandOperator.Status(randomNonNegativeInt(), randomNonNegativeInt(), randomNonNegativeInt());
}

@Override
protected MvExpandOperator.Status mutateInstance(MvExpandOperator.Status instance) {
switch (between(0, 1)) {
switch (between(0, 2)) {
case 0:
return new MvExpandOperator.Status(
randomValueOtherThan(instance.pagesProcessed(), ESTestCase::randomNonNegativeInt),
randomValueOtherThan(instance.pagesIn(), ESTestCase::randomNonNegativeInt),
instance.pagesOut(),
instance.noops()
);
case 1:
return new MvExpandOperator.Status(
instance.pagesProcessed(),
instance.pagesIn(),
randomValueOtherThan(instance.pagesOut(), ESTestCase::randomNonNegativeInt),
instance.noops()
);
case 2:
return new MvExpandOperator.Status(
instance.pagesIn(),
instance.pagesOut(),
randomValueOtherThan(instance.noops(), ESTestCase::randomNonNegativeInt)
);
default:
Expand Down
Loading