Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: Paginate MV_EXPAND output #100598

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
Expand All @@ -31,11 +32,12 @@
* 2 | 2 | "foo"
* </pre>
*/
public class MvExpandOperator extends AbstractPageMappingOperator {
public record Factory(int channel) implements OperatorFactory {
public class MvExpandOperator implements Operator {

public record Factory(int channel, int blockSize) implements OperatorFactory {
@Override
public Operator get(DriverContext driverContext) {
return new MvExpandOperator(channel);
return new MvExpandOperator(channel, blockSize);
}

@Override
Expand All @@ -46,57 +48,171 @@ public String describe() {

private final int channel;

private final int pageSize;

private int noops;

public MvExpandOperator(int channel) {
private Page prev;
private boolean prevCompleted = false;
private boolean finished = false;

private int nextPositionToProcess = 0;
private int nextMvToProcess = 0;
private int nextItemOnExpanded = 0;

/**
* Count of pages that have been processed by this operator.
*/
private int pagesProcessed;

public MvExpandOperator(int channel, int pageSize) {
this.channel = channel;
this.pageSize = pageSize;
assert pageSize > 0;
}

@Override
protected Page process(Page page) {
Block expandingBlock = page.getBlock(channel);
public final Page getOutput() {
if (prev == null) {
return null;
}
if (prev.getPositionCount() == 0) {
Page result = prev;
prev = null;
pagesProcessed++;
return result;
}

Block expandingBlock = prev.getBlock(channel);
if (expandingBlock.mayHaveMultivaluedFields() == false) {
noops++;
Page result = prev;
prev = null;
pagesProcessed++;
return result;
}

try {
return process();
} finally {
if (prevCompleted && prev != null) {
pagesProcessed++;
prev.releaseBlocks();
prev = null;
}
}
}

protected Page process() {
Block expandingBlock = prev.getBlock(channel);
Block expandedBlock = expandingBlock.expand();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uhm, this can be done once per block, no need to do it at every process()...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

if (expandedBlock == expandingBlock) {
noops++;
return page;
prevCompleted = true;
return prev;
}
if (page.getBlockCount() == 1) {
if (prev.getBlockCount() == 1) {
assert channel == 0;
prevCompleted = true;
return new Page(expandedBlock);
}

int[] duplicateFilter = buildDuplicateExpandingFilter(expandingBlock, expandedBlock.getPositionCount());
int[] duplicateFilter = nextDuplicateExpandingFilter(expandingBlock, pageSize);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pageSize, right? I think maybe once you move expandedBlock to a member variable too then this won't take any args, right?


Block[] result = new Block[page.getBlockCount()];
Block[] result = new Block[prev.getBlockCount()];
int[] expandedMask = new int[duplicateFilter.length];
for (int i = 0; i < expandedMask.length; i++) {
expandedMask[i] = i + nextItemOnExpanded;
}
nextItemOnExpanded += expandedMask.length;
for (int b = 0; b < result.length; b++) {
result[b] = b == channel ? expandedBlock : page.getBlock(b).filter(duplicateFilter);
result[b] = b == channel ? expandedBlock.filter(expandedMask) : prev.getBlock(b).filter(duplicateFilter);
}
if (nextItemOnExpanded == expandedBlock.getPositionCount()) {
nextItemOnExpanded = 0;
}
return new Page(result);
}

private int[] buildDuplicateExpandingFilter(Block expandingBlock, int newPositions) {
int[] duplicateFilter = new int[newPositions];
private int[] nextDuplicateExpandingFilter(Block expandingBlock, int size) {
int[] duplicateFilter = new int[size];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably min(size, expanded.positionCount - nextPositionToProcess). Maybe?

int n = 0;
for (int p = 0; p < expandingBlock.getPositionCount(); p++) {
int count = expandingBlock.getValueCount(p);
while (nextPositionToProcess < expandingBlock.getPositionCount()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to use while (true) if here are multiple interesting ways to break from the loop. It's kind of a signal to the reader that "something weird is here"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a big fan of while(true), but in this case I think it will make the code easier to read, so 👍

int count = expandingBlock.getValueCount(nextPositionToProcess);
int positions = count == 0 ? 1 : count;
Arrays.fill(duplicateFilter, n, n + positions, p);
n += positions;
int toAdd = Math.min(size - n, positions - nextMvToProcess);
Arrays.fill(duplicateFilter, n, n + toAdd, nextPositionToProcess);
n += toAdd;

if (n == size) {
if (nextMvToProcess + toAdd == positions) {
nextMvToProcess = 0;
nextPositionToProcess++;
if (nextPositionToProcess == expandingBlock.getPositionCount()) {
nextPositionToProcess = 0;
prevCompleted = true;
}
} else {
nextMvToProcess = nextMvToProcess + toAdd;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth comments about the meanings of these two arms. I think one is "we're done expanding this page" and another is "we've filled the page we're building and maybe exhausted the current position. But we might not have, we might be half way through expanding a position.

}
return duplicateFilter;
}

nextMvToProcess = 0;
nextPositionToProcess++;
}

if (nextPositionToProcess == expandingBlock.getPositionCount()) {
nextPositionToProcess = 0;
nextMvToProcess = 0;
prevCompleted = true;
}
return duplicateFilter;
return n < size ? Arrays.copyOfRange(duplicateFilter, 0, n) : duplicateFilter;
}

@Override
public final boolean needsInput() {
return prev == null && finished == false;
}

@Override
protected AbstractPageMappingOperator.Status status(int pagesProcessed) {
public final void addInput(Page page) {
assert prev == null : "has pending input page";
prev = page;
prevCompleted = false;
}

@Override
public final void finish() {
finished = true;
}

@Override
public final boolean isFinished() {
return finished && prev == null;
}

@Override
public final Status status() {
return new Status(pagesProcessed, noops);
}

@Override
public void close() {
if (prev != null) {
Releasables.closeExpectNoException(() -> prev.releaseBlocks());
}
}

@Override
public String toString() {
return "MvExpandOperator[channel=" + channel + "]";
}

public static final class Status extends AbstractPageMappingOperator.Status {
public static final class Status implements Operator.Status {

private final int pagesProcessed;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be nice to have this as pagesIn and pagesOut. That way you can see what the multiplication factor is too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++


public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
Operator.Status.class,
"mv_expand",
Expand All @@ -106,25 +222,25 @@ public static final class Status extends AbstractPageMappingOperator.Status {
private final int noops;

Status(int pagesProcessed, int noops) {
super(pagesProcessed);
this.pagesProcessed = pagesProcessed;
this.noops = noops;
}

Status(StreamInput in) throws IOException {
super(in);
pagesProcessed = in.readVInt();
noops = in.readVInt();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(pagesProcessed);
out.writeVInt(noops);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("pages_processed", pagesProcessed());
builder.field("pages_processed", pagesProcessed);
builder.field("noops", noops);
return builder.endObject();
}
Expand All @@ -147,12 +263,16 @@ public boolean equals(Object o) {
return false;
}
Status status = (Status) o;
return noops == status.noops && pagesProcessed() == status.pagesProcessed();
return noops == status.noops && pagesProcessed == status.pagesProcessed;
}

public int pagesProcessed() {
return pagesProcessed;
}

@Override
public int hashCode() {
return Objects.hash(noops, pagesProcessed());
return Objects.hash(noops, pagesProcessed);
}

@Override
Expand Down
Loading