Skip to content

Commit

Permalink
[refactor](variant) refactor sub path push down on variant type (#36478)
Browse files Browse the repository at this point in the history
intro a new rule VARIANT_SUB_PATH_PRUNING to prune variant sub path.

for example, variant slot v in table t has two sub path: 'c1' and 'c2',
after this rule, select v['c1'] from t will only scan one sub path 'c1'
of v to reduce scan time.

This rule accomplishes all the work using two components. The Collector
traverses from the top down, collecting all the element_at functions on
the variant types, and recording the required path from the original
variant slot to the current element_at. The Replacer traverses from the
bottom up, generating the slots for the required sub path on scan,
union, and cte consumer. Then, it replaces the element_at with the
corresponding slot.
  • Loading branch information
morrySnow authored Jun 21, 2024
1 parent 4c8e66b commit 86fc14e
Show file tree
Hide file tree
Showing 40 changed files with 1,666 additions and 735 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Placeholder;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
import org.apache.doris.nereids.trees.plans.ObjectId;
import org.apache.doris.nereids.trees.plans.PlaceholderId;
Expand Down Expand Up @@ -64,7 +63,6 @@
import java.util.BitSet;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -131,15 +129,6 @@ public class StatementContext implements Closeable {
private final List<Expression> joinFilters = new ArrayList<>();

private final List<Hint> hints = new ArrayList<>();
// Root Slot -> Paths -> Sub-column Slots
private final Map<Slot, Map<List<String>, SlotReference>> subColumnSlotRefMap
= Maps.newHashMap();

// Map from rewritten slot to original expr
private final Map<Slot, Expression> subColumnOriginalExprMap = Maps.newHashMap();

// Map from original expr to rewritten slot
private final Map<Expression, Slot> originalExprToRewrittenSubColumn = Maps.newHashMap();

// Map slot to its relation, currently used in SlotReference to find its original
// Relation for example LogicalOlapScan
Expand Down Expand Up @@ -265,58 +254,10 @@ public Optional<SqlCacheContext> getSqlCacheContext() {
return Optional.ofNullable(sqlCacheContext);
}

public Set<SlotReference> getAllPathsSlots() {
Set<SlotReference> allSlotReferences = Sets.newHashSet();
for (Map<List<String>, SlotReference> slotReferenceMap : subColumnSlotRefMap.values()) {
allSlotReferences.addAll(slotReferenceMap.values());
}
return allSlotReferences;
}

public Expression getOriginalExpr(SlotReference rewriteSlot) {
return subColumnOriginalExprMap.getOrDefault(rewriteSlot, null);
}

public Slot getRewrittenSlotRefByOriginalExpr(Expression originalExpr) {
return originalExprToRewrittenSubColumn.getOrDefault(originalExpr, null);
}

/**
* Add a slot ref attached with paths in context to avoid duplicated slot
*/
public void addPathSlotRef(Slot root, List<String> paths, SlotReference slotRef, Expression originalExpr) {
subColumnSlotRefMap.computeIfAbsent(root, k -> Maps.newTreeMap((lst1, lst2) -> {
Iterator<String> it1 = lst1.iterator();
Iterator<String> it2 = lst2.iterator();
while (it1.hasNext() && it2.hasNext()) {
int result = it1.next().compareTo(it2.next());
if (result != 0) {
return result;
}
}
return Integer.compare(lst1.size(), lst2.size());
}));
subColumnSlotRefMap.get(root).put(paths, slotRef);
subColumnOriginalExprMap.put(slotRef, originalExpr);
originalExprToRewrittenSubColumn.put(originalExpr, slotRef);
}

public SlotReference getPathSlot(Slot root, List<String> paths) {
Map<List<String>, SlotReference> pathsSlotsMap = subColumnSlotRefMap.getOrDefault(root, null);
if (pathsSlotsMap == null) {
return null;
}
return pathsSlotsMap.getOrDefault(paths, null);
}

public void addSlotToRelation(Slot slot, Relation relation) {
slotToRelation.put(slot, relation);
}

public Relation getRelationBySlot(Slot slot) {
return slotToRelation.getOrDefault(slot, null);
}

public boolean isDpHyp() {
return isDpHyp;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@
import org.apache.doris.nereids.trees.expressions.functions.scalar.ElementAt;
import org.apache.doris.nereids.trees.expressions.functions.scalar.HighOrderFunction;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Lambda;
import org.apache.doris.nereids.trees.expressions.functions.scalar.PushDownToProjectionFunction;
import org.apache.doris.nereids.trees.expressions.functions.scalar.ScalarFunction;
import org.apache.doris.nereids.trees.expressions.functions.udf.JavaUdaf;
import org.apache.doris.nereids.trees.expressions.functions.udf.JavaUdf;
Expand All @@ -102,7 +101,6 @@
import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionVisitor;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TFunctionBinaryType;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -211,20 +209,6 @@ private OlapTable getOlapTableDirectly(SlotRef left) {

@Override
public Expr visitElementAt(ElementAt elementAt, PlanTranslatorContext context) {
if (PushDownToProjectionFunction.validToPushDown(elementAt)) {
if (ConnectContext.get() != null
&& ConnectContext.get().getSessionVariable() != null
&& !ConnectContext.get().getSessionVariable().isEnableRewriteElementAtToSlot()) {
throw new AnalysisException(
"set enable_rewrite_element_at_to_slot=true when using element_at function for variant type");
}
SlotReference rewrittenSlot = (SlotReference) context.getConnectContext()
.getStatementContext().getRewrittenSlotRefByOriginalExpr(elementAt);
// rewrittenSlot == null means variant is not from table. so keep element_at function
if (rewrittenSlot != null) {
return context.findSlotRef(rewrittenSlot.getExprId());
}
}
return visitScalarFunction(elementAt, context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@
import org.apache.doris.nereids.trees.expressions.WindowFrame;
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
import org.apache.doris.nereids.trees.expressions.functions.scalar.PushDownToProjectionFunction;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.AggMode;
import org.apache.doris.nereids.trees.plans.AggPhase;
Expand Down Expand Up @@ -1250,8 +1249,7 @@ public PlanFragment visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, P
}
if (planNode instanceof ExchangeNode || planNode instanceof SortNode || planNode instanceof UnionNode
// this means we have filter->limit->project, need a SelectNode
|| (child instanceof PhysicalProject
&& !((PhysicalProject<?>) child).hasPushedDownToProjectionFunctions())) {
|| child instanceof PhysicalProject) {
// the three nodes don't support conjuncts, need create a SelectNode to filter data
SelectNode selectNode = new SelectNode(context.nextPlanNodeId(), planNode);
selectNode.setNereidsId(filter.getId());
Expand Down Expand Up @@ -1833,35 +1831,6 @@ && findOlapScanNodesByPassExchangeAndJoinNode(inputFragment.getPlanRoot())) {
return inputFragment;
}

// collect all valid PushDownToProjectionFunction from expression
private List<Expression> getPushDownToProjectionFunctionForRewritten(NamedExpression expression) {
List<Expression> targetExprList = expression.collectToList(PushDownToProjectionFunction.class::isInstance);
return targetExprList.stream()
.filter(PushDownToProjectionFunction::validToPushDown)
.collect(Collectors.toList());
}

// register rewritten slots from original PushDownToProjectionFunction
private void registerRewrittenSlot(PhysicalProject<? extends Plan> project, OlapScanNode olapScanNode) {
// register slots that are rewritten from element_at/etc..
List<Expression> allPushDownProjectionFunctions = project.getProjects().stream()
.map(this::getPushDownToProjectionFunctionForRewritten)
.flatMap(List::stream)
.collect(Collectors.toList());
for (Expression expr : allPushDownProjectionFunctions) {
PushDownToProjectionFunction function = (PushDownToProjectionFunction) expr;
if (context != null
&& context.getConnectContext() != null
&& context.getConnectContext().getStatementContext() != null) {
Slot argumentSlot = function.getInputSlots().stream().findFirst().get();
Expression rewrittenSlot = PushDownToProjectionFunction.rewriteToSlot(
function, (SlotReference) argumentSlot);
TupleDescriptor tupleDescriptor = context.getTupleDesc(olapScanNode.getTupleId());
context.createSlotDesc(tupleDescriptor, (SlotReference) rewrittenSlot);
}
}
}

// TODO: generate expression mapping when be project could do in ExecNode.
@Override
public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project, PlanTranslatorContext context) {
Expand All @@ -1876,12 +1845,6 @@ public PlanFragment visitPhysicalProject(PhysicalProject<? extends Plan> project

PlanFragment inputFragment = project.child(0).accept(this, context);

if (inputFragment.getPlanRoot() instanceof OlapScanNode) {
// function already pushed down in projection
// e.g. select count(distinct cast(element_at(v, 'a') as int)) from tbl;
registerRewrittenSlot(project, (OlapScanNode) inputFragment.getPlanRoot());
}

PlanNode inputPlanNode = inputFragment.getPlanRoot();
List<Expr> projectionExprs = null;
List<Expr> allProjectionExprs = Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,12 @@ public SlotDescriptor createSlotDesc(TupleDescriptor tupleDesc, SlotReference sl
slotDescriptor.setLabel(slotReference.getName());
} else {
slotRef = new SlotRef(slotDescriptor);
if (slotReference.hasSubColPath()) {
slotDescriptor.setSubColLables(slotReference.getSubColPath());
if (slotReference.hasSubColPath() && slotReference.getColumn().isPresent()) {
slotDescriptor.setSubColLables(slotReference.getSubPath());
// use lower case name for variant's root, since backend treat parent column as lower case
// see issue: https://github.com/apache/doris/pull/32999/commits
slotDescriptor.setMaterializedColumnName(slotRef.getColumnName().toLowerCase()
+ "." + String.join(".", slotReference.getSubColPath()));
+ "." + String.join(".", slotReference.getSubPath()));
}
}
slotRef.setTable(table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,14 @@ public AbstractBatchJobExecutor(CascadesContext cascadesContext) {
this.cascadesContext = Objects.requireNonNull(cascadesContext, "cascadesContext can not null");
}

/**
* flat map jobs in TopicRewriteJob to could really run jobs, and filter null.
*/
public static List<RewriteJob> jobs(RewriteJob... jobs) {
return Arrays.stream(jobs)
.filter(Objects::nonNull)
.flatMap(job -> job instanceof TopicRewriteJob
? ((TopicRewriteJob) job).jobs.stream()
? ((TopicRewriteJob) job).jobs.stream().filter(Objects::nonNull)
: Stream.of(job)
).collect(ImmutableList.toImmutableList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.doris.nereids.rules.analysis.BindRelation;
import org.apache.doris.nereids.rules.analysis.BindRelation.CustomTableResolver;
import org.apache.doris.nereids.rules.analysis.BindSink;
import org.apache.doris.nereids.rules.analysis.BindSlotWithPaths;
import org.apache.doris.nereids.rules.analysis.BuildAggForRandomDistributedTable;
import org.apache.doris.nereids.rules.analysis.CheckAfterBind;
import org.apache.doris.nereids.rules.analysis.CheckAnalysis;
Expand Down Expand Up @@ -136,7 +135,6 @@ private static List<RewriteJob> buildAnalyzeJobs(Optional<CustomTableResolver> c
new CheckPolicy()
),
bottomUp(new BindExpression()),
bottomUp(new BindSlotWithPaths()),
topDown(new BindSink()),
bottomUp(new CheckAfterBind()),
bottomUp(
Expand Down
Loading

0 comments on commit 86fc14e

Please sign in to comment.