Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add TakeOrderedOperator #2863

Merged
merged 11 commits into from
Aug 5, 2024
14 changes: 14 additions & 0 deletions core/src/main/java/org/opensearch/sql/executor/Explain.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.opensearch.sql.planner.physical.RemoveOperator;
import org.opensearch.sql.planner.physical.RenameOperator;
import org.opensearch.sql.planner.physical.SortOperator;
import org.opensearch.sql.planner.physical.TakeOrderedOperator;
import org.opensearch.sql.planner.physical.ValuesOperator;
import org.opensearch.sql.planner.physical.WindowOperator;
import org.opensearch.sql.storage.TableScanOperator;
Expand Down Expand Up @@ -73,6 +74,19 @@ public ExplainResponseNode visitSort(SortOperator node, Object context) {
ImmutableMap.of("sortList", describeSortList(node.getSortList()))));
}

@Override
public ExplainResponseNode visitTakeOrdered(TakeOrderedOperator node, Object context) {
return explain(
node,
context,
explainNode ->
explainNode.setDescription(
ImmutableMap.of(
"limit", node.getLimit(),
"offset", node.getOffset(),
"sortList", describeSortList(node.getSortList()))));
}

@Override
public ExplainResponseNode visitTableScan(TableScanOperator node, Object context) {
return explain(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.sql.planner.physical.RemoveOperator;
import org.opensearch.sql.planner.physical.RenameOperator;
import org.opensearch.sql.planner.physical.SortOperator;
import org.opensearch.sql.planner.physical.TakeOrderedOperator;
import org.opensearch.sql.planner.physical.ValuesOperator;
import org.opensearch.sql.planner.physical.WindowOperator;
import org.opensearch.sql.storage.read.TableScanBuilder;
Expand Down Expand Up @@ -129,7 +130,13 @@ public PhysicalPlan visitValues(LogicalValues node, C context) {

@Override
public PhysicalPlan visitLimit(LogicalLimit node, C context) {
return new LimitOperator(visitChild(node, context), node.getLimit(), node.getOffset());
PhysicalPlan child = visitChild(node, context);
// Optimize sort + limit to take ordered operator
if (child instanceof SortOperator sortChild) {
penghuo marked this conversation as resolved.
Show resolved Hide resolved
return new TakeOrderedOperator(
sortChild.getInput(), node.getLimit(), node.getOffset(), sortChild.getSortList());
}
return new LimitOperator(child, node.getLimit(), node.getOffset());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public static SortOperator sort(PhysicalPlan input, Pair<SortOption, Expression>
return new SortOperator(input, Arrays.asList(sorts));
}

public static TakeOrderedOperator takeOrdered(
PhysicalPlan input, Integer limit, Integer offset, Pair<SortOption, Expression>... sorts) {
return new TakeOrderedOperator(input, limit, offset, Arrays.asList(sorts));
}

public static DedupeOperator dedupe(PhysicalPlan input, Expression... expressions) {
return new DedupeOperator(input, Arrays.asList(expressions));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ public R visitSort(SortOperator node, C context) {
return visitNode(node, context);
}

public R visitTakeOrdered(TakeOrderedOperator node, C context) {
return visitNode(node, context);
}

public R visitRareTopN(RareTopNOperator node, C context) {
return visitNode(node, context);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.opensearch.sql.planner.physical;

import static org.opensearch.sql.ast.tree.Sort.NullOrder.NULL_FIRST;
import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC;

import com.google.common.collect.Ordering;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.ast.tree.Sort.SortOption;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.utils.ExprValueOrdering;
import org.opensearch.sql.expression.Expression;

public interface SortHelper {

/**
* Construct an expr comparator for sorting on ExprValue.
*
* @param sortList list of sort fields and their related sort options.
* @return A comparator for ExprValue
*/
static Comparator<ExprValue> constructExprComparator(
qianheng-aws marked this conversation as resolved.
Show resolved Hide resolved
List<Pair<SortOption, Expression>> sortList) {
return (o1, o2) -> compareWithExpressions(o1, o2, constructComparator(sortList));
}

/**
* Construct an expr ordering for efficiently taking the top-k elements on ExprValue.
*
* @param sortList list of sort fields and their related sort options.
* @return An guava ordering for ExprValue
*/
static Ordering<ExprValue> constructExprOrdering(List<Pair<SortOption, Expression>> sortList) {
return Ordering.from(constructExprComparator(sortList));
}

private static List<Pair<Expression, Comparator<ExprValue>>> constructComparator(
List<Pair<SortOption, Expression>> sortList) {
List<Pair<Expression, Comparator<ExprValue>>> comparators = new ArrayList<>();
for (Pair<SortOption, Expression> pair : sortList) {
SortOption option = pair.getLeft();
ExprValueOrdering ordering =
ASC.equals(option.getSortOrder())
? ExprValueOrdering.natural()
: ExprValueOrdering.natural().reverse();
ordering =
NULL_FIRST.equals(option.getNullOrder()) ? ordering.nullsFirst() : ordering.nullsLast();
comparators.add(Pair.of(pair.getRight(), ordering));
}
return comparators;
}

private static int compareWithExpressions(
ExprValue o1, ExprValue o2, List<Pair<Expression, Comparator<ExprValue>>> comparators) {
for (Pair<Expression, Comparator<ExprValue>> comparator : comparators) {
Expression expression = comparator.getKey();
int result =
comparator
.getValue()
.compare(
expression.valueOf(o1.bindingTuples()), expression.valueOf(o2.bindingTuples()));
if (result != 0) {
return result;
}
}
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,18 @@

package org.opensearch.sql.planner.physical;

import static org.opensearch.sql.ast.tree.Sort.NullOrder.NULL_FIRST;
import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC;

import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Singular;
import lombok.ToString;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.ast.tree.Sort.SortOption;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.utils.ExprValueOrdering;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.planner.physical.SortOperator.Sorter.SorterBuilder;

/**
* Sort Operator.The input data is sorted by the sort fields in the {@link SortOperator#sortList}.
Expand All @@ -36,7 +29,7 @@ public class SortOperator extends PhysicalPlan {
@Getter private final PhysicalPlan input;

@Getter private final List<Pair<SortOption, Expression>> sortList;
@EqualsAndHashCode.Exclude private final Sorter sorter;
@EqualsAndHashCode.Exclude private final Comparator<ExprValue> sorter;
@EqualsAndHashCode.Exclude private Iterator<ExprValue> iterator;

/**
Expand All @@ -49,18 +42,7 @@ public class SortOperator extends PhysicalPlan {
public SortOperator(PhysicalPlan input, List<Pair<SortOption, Expression>> sortList) {
this.input = input;
this.sortList = sortList;
SorterBuilder sorterBuilder = Sorter.builder();
for (Pair<SortOption, Expression> pair : sortList) {
SortOption option = pair.getLeft();
ExprValueOrdering ordering =
ASC.equals(option.getSortOrder())
? ExprValueOrdering.natural()
: ExprValueOrdering.natural().reverse();
ordering =
NULL_FIRST.equals(option.getNullOrder()) ? ordering.nullsFirst() : ordering.nullsLast();
sorterBuilder.comparator(Pair.of(pair.getRight(), ordering));
}
this.sorter = sorterBuilder.build();
this.sorter = SortHelper.constructExprComparator(sortList);
}

@Override
Expand Down Expand Up @@ -94,27 +76,6 @@ public ExprValue next() {
return iterator.next();
}

@Builder
public static class Sorter implements Comparator<ExprValue> {
@Singular private final List<Pair<Expression, Comparator<ExprValue>>> comparators;

@Override
public int compare(ExprValue o1, ExprValue o2) {
for (Pair<Expression, Comparator<ExprValue>> comparator : comparators) {
Expression expression = comparator.getKey();
int result =
comparator
.getValue()
.compare(
expression.valueOf(o1.bindingTuples()), expression.valueOf(o2.bindingTuples()));
if (result != 0) {
return result;
}
}
return 0;
}
}

private Iterator<ExprValue> iterator(PriorityQueue<ExprValue> result) {
return new Iterator<ExprValue>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.physical;

import com.google.common.collect.Ordering;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.ast.tree.Sort.SortOption;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.expression.Expression;

/**
* TakeOrdered Operator. This operator will sort input data as the order of {@link this#sortList}
* specifies and return {@link this#limit} rows from the {@link this#offset} index.
*
* <p>Functionally, this operator is a combination of {@link SortOperator} and {@link
* LimitOperator}. But it can reduce the time complexity from O(nlogn) to O(n), and memory from O(n)
* to O(k) due to use guava {@link com.google.common.collect.Ordering}.
*
* <p>Overall, it's an optimization to replace `Limit(Sort)` in physical plan level since it's all
* about execution. Because most execution engine may not support this operator, it doesn't have a
* related logical operator.
*/
@ToString
@EqualsAndHashCode(callSuper = false)
public class TakeOrderedOperator extends PhysicalPlan {
@Getter private final PhysicalPlan input;

@Getter private final List<Pair<SortOption, Expression>> sortList;
@Getter private final Integer limit;
@Getter private final Integer offset;
@EqualsAndHashCode.Exclude private final Ordering<ExprValue> ordering;
@EqualsAndHashCode.Exclude private Iterator<ExprValue> iterator;

/**
* TakeOrdered Operator Constructor.
*
* @param input input {@link PhysicalPlan}
* @param limit the limit value from LimitOperator
* @param offset the offset value from LimitOperator
* @param sortList list of sort field from SortOperator
*/
public TakeOrderedOperator(
penghuo marked this conversation as resolved.
Show resolved Hide resolved
PhysicalPlan input,
Integer limit,
Integer offset,
List<Pair<SortOption, Expression>> sortList) {
this.input = input;
this.sortList = sortList;
this.limit = limit;
this.offset = offset;
this.ordering = SortHelper.constructExprOrdering(sortList);
Copy link
Collaborator

@penghuo penghuo Aug 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good job on simpification!

}

@Override
public <R, C> R accept(PhysicalPlanNodeVisitor<R, C> visitor, C context) {
return visitor.visitTakeOrdered(this, context);
}

@Override
public void open() {
super.open();
iterator = ordering.leastOf(input, offset + limit).stream().skip(offset).iterator();
}

@Override
public List<PhysicalPlan> getChild() {
return Collections.singletonList(input);
}

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public ExprValue next() {
return iterator.next();
}
}
21 changes: 21 additions & 0 deletions core/src/test/java/org/opensearch/sql/executor/ExplainTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.remove;
import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.rename;
import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.sort;
import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.takeOrdered;
import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.values;
import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.window;

Expand Down Expand Up @@ -220,6 +221,26 @@ void can_explain_limit() {
explain.apply(plan));
}

@Test
void can_explain_takeOrdered() {
Pair<Sort.SortOption, Expression> sort =
ImmutablePair.of(Sort.SortOption.DEFAULT_ASC, ref("a", INTEGER));
PhysicalPlan plan = takeOrdered(tableScan, 10, 5, sort);
assertEquals(
new ExplainResponse(
new ExplainResponseNode(
"TakeOrderedOperator",
Map.of(
"limit",
10,
"offset",
5,
"sortList",
Map.of("a", Map.of("sortOrder", "ASC", "nullOrder", "NULL_FIRST"))),
singletonList(tableScan.explainNode()))),
explain.apply(plan));
}

@Test
void can_explain_nested() {
Set<String> nestedOperatorArgs = Set.of("message.info", "message");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,4 +278,30 @@ public void visitPaginate_should_remove_it_from_tree() {
new ProjectOperator(new ValuesOperator(List.of(List.of())), List.of(), List.of());
assertEquals(physicalPlanTree, logicalPlanTree.accept(implementor, null));
}

@Test
public void visitLimit_support_return_takeOrdered() {
// replace SortOperator + LimitOperator with TakeOrderedOperator
Pair<Sort.SortOption, Expression> sort =
ImmutablePair.of(Sort.SortOption.DEFAULT_ASC, ref("a", INTEGER));
var logicalValues = values(emptyList());
var logicalSort = sort(logicalValues, sort);
var logicalLimit = limit(logicalSort, 10, 5);
PhysicalPlan physicalPlanTree =
PhysicalPlanDSL.takeOrdered(PhysicalPlanDSL.values(emptyList()), 10, 5, sort);
assertEquals(physicalPlanTree, logicalLimit.accept(implementor, null));

// don't replace if LimitOperator's child is not SortOperator
Pair<ReferenceExpression, Expression> newEvalField =
ImmutablePair.of(ref("name1", STRING), ref("name", STRING));
var logicalEval = eval(logicalSort, newEvalField);
logicalLimit = limit(logicalEval, 10, 5);
physicalPlanTree =
PhysicalPlanDSL.limit(
PhysicalPlanDSL.eval(
PhysicalPlanDSL.sort(PhysicalPlanDSL.values(emptyList()), sort), newEvalField),
10,
5);
assertEquals(physicalPlanTree, logicalLimit.accept(implementor, null));
}
}
Loading
Loading