Skip to content

Commit

Permalink
[fix](nereids)fix bug of select mv in nereids apache#26235 (apache#26415
Browse files Browse the repository at this point in the history
)
  • Loading branch information
starocean999 authored and gnehil committed Dec 4, 2023
1 parent 00d4a7c commit 0c26370
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,28 @@
* Base class for selecting materialized index rules.
*/
public abstract class AbstractSelectMaterializedIndexRule {
protected boolean shouldSelectIndex(LogicalOlapScan scan) {
protected boolean shouldSelectIndexWithAgg(LogicalOlapScan scan) {
switch (scan.getTable().getKeysType()) {
case AGG_KEYS:
case UNIQUE_KEYS:
case DUP_KEYS:
// SelectMaterializedIndexWithAggregate(R1) run before SelectMaterializedIndexWithoutAggregate(R2)
// if R1 selects baseIndex and preAggStatus is off
// we should give a chance to R2 to check if some prefix-index can be selected
// so if R1 selects baseIndex and preAggStatus is off, we keep scan's index unselected in order to
// let R2 to get a chance to do its work
// at last, after R1, the scan may be the 4 status
// 1. preAggStatus is ON and baseIndex is selected, it means select baseIndex is correct.
// 2. preAggStatus is ON and some other Index is selected, this is correct, too.
// 3. preAggStatus is OFF, no index is selected, it means R2 could get a chance to run
// so we check the preAggStatus and if some index is selected to make sure R1 can be run only once
return scan.getPreAggStatus().isOn() && !scan.isIndexSelected();
default:
return false;
}
}

protected boolean shouldSelectIndexWithoutAgg(LogicalOlapScan scan) {
switch (scan.getTable().getKeysType()) {
case AGG_KEYS:
case UNIQUE_KEYS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public List<Rule> buildRules() {
return ImmutableList.of(
// only agg above scan
// Aggregate(Scan)
logicalAggregate(logicalOlapScan().when(this::shouldSelectIndex)).thenApply(ctx -> {
logicalAggregate(logicalOlapScan().when(this::shouldSelectIndexWithAgg)).thenApply(ctx -> {
LogicalAggregate<LogicalOlapScan> agg = ctx.root;
LogicalOlapScan scan = agg.child();
SelectResult result = select(
Expand All @@ -116,7 +116,7 @@ public List<Rule> buildRules() {
agg.getGroupByExpressions(),
new HashSet<>(agg.getExpressions()));

LogicalOlapScan mvPlan = scan.withMaterializedIndexSelected(result.preAggStatus, result.indexId);
LogicalOlapScan mvPlan = createLogicalOlapScan(scan, result);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan);

if (result.exprRewriteMap.isEmpty()) {
Expand All @@ -139,7 +139,7 @@ public List<Rule> buildRules() {

// filter could push down scan.
// Aggregate(Filter(Scan))
logicalAggregate(logicalFilter(logicalOlapScan().when(this::shouldSelectIndex)))
logicalAggregate(logicalFilter(logicalOlapScan().when(this::shouldSelectIndexWithAgg)))
.thenApply(ctx -> {
LogicalAggregate<LogicalFilter<LogicalOlapScan>> agg = ctx.root;
LogicalFilter<LogicalOlapScan> filter = agg.child();
Expand All @@ -162,8 +162,7 @@ public List<Rule> buildRules() {
requiredExpr
);

LogicalOlapScan mvPlan =
scan.withMaterializedIndexSelected(result.preAggStatus, result.indexId);
LogicalOlapScan mvPlan = createLogicalOlapScan(scan, result);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan);

if (result.exprRewriteMap.isEmpty()) {
Expand Down Expand Up @@ -191,7 +190,7 @@ public List<Rule> buildRules() {

// column pruning or other projections such as alias, etc.
// Aggregate(Project(Scan))
logicalAggregate(logicalProject(logicalOlapScan().when(this::shouldSelectIndex)))
logicalAggregate(logicalProject(logicalOlapScan().when(this::shouldSelectIndexWithAgg)))
.thenApply(ctx -> {
LogicalAggregate<LogicalProject<LogicalOlapScan>> agg = ctx.root;
LogicalProject<LogicalOlapScan> project = agg.child();
Expand All @@ -207,8 +206,7 @@ public List<Rule> buildRules() {
collectRequireExprWithAggAndProject(agg.getExpressions(), project.getProjects())
);

LogicalOlapScan mvPlan =
scan.withMaterializedIndexSelected(result.preAggStatus, result.indexId);
LogicalOlapScan mvPlan = createLogicalOlapScan(scan, result);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan);

if (result.exprRewriteMap.isEmpty()) {
Expand Down Expand Up @@ -242,7 +240,7 @@ public List<Rule> buildRules() {
// filter could push down and project.
// Aggregate(Project(Filter(Scan)))
logicalAggregate(logicalProject(logicalFilter(logicalOlapScan()
.when(this::shouldSelectIndex)))).thenApply(ctx -> {
.when(this::shouldSelectIndexWithAgg)))).thenApply(ctx -> {
LogicalAggregate<LogicalProject<LogicalFilter<LogicalOlapScan>>> agg = ctx.root;
LogicalProject<LogicalFilter<LogicalOlapScan>> project = agg.child();
LogicalFilter<LogicalOlapScan> filter = project.child();
Expand All @@ -265,8 +263,7 @@ public List<Rule> buildRules() {
requiredExpr
);

LogicalOlapScan mvPlan =
scan.withMaterializedIndexSelected(result.preAggStatus, result.indexId);
LogicalOlapScan mvPlan = createLogicalOlapScan(scan, result);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan);

if (result.exprRewriteMap.isEmpty()) {
Expand Down Expand Up @@ -301,7 +298,7 @@ public List<Rule> buildRules() {
// filter can't push down
// Aggregate(Filter(Project(Scan)))
logicalAggregate(logicalFilter(logicalProject(logicalOlapScan()
.when(this::shouldSelectIndex)))).thenApply(ctx -> {
.when(this::shouldSelectIndexWithAgg)))).thenApply(ctx -> {
LogicalAggregate<LogicalFilter<LogicalProject<LogicalOlapScan>>> agg = ctx.root;
LogicalFilter<LogicalProject<LogicalOlapScan>> filter = agg.child();
LogicalProject<LogicalOlapScan> project = filter.child();
Expand All @@ -322,8 +319,7 @@ public List<Rule> buildRules() {
requiredExpr
);

LogicalOlapScan mvPlan =
scan.withMaterializedIndexSelected(result.preAggStatus, result.indexId);
LogicalOlapScan mvPlan = createLogicalOlapScan(scan, result);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan);

if (result.exprRewriteMap.isEmpty()) {
Expand Down Expand Up @@ -357,48 +353,49 @@ public List<Rule> buildRules() {

// only agg above scan
// Aggregate(Repeat(Scan))
logicalAggregate(logicalRepeat(logicalOlapScan().when(this::shouldSelectIndex))).thenApply(ctx -> {
LogicalAggregate<LogicalRepeat<LogicalOlapScan>> agg = ctx.root;
LogicalRepeat<LogicalOlapScan> repeat = agg.child();
LogicalOlapScan scan = repeat.child();
SelectResult result = select(
scan,
agg.getInputSlots(),
ImmutableSet.of(),
extractAggFunctionAndReplaceSlot(agg, Optional.empty()),
nonVirtualGroupByExprs(agg),
new HashSet<>(agg.getExpressions()));

LogicalOlapScan mvPlan = scan.withMaterializedIndexSelected(result.preAggStatus, result.indexId);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan);

if (result.exprRewriteMap.isEmpty()) {
return new LogicalProject<>(
generateProjectsAlias(agg.getOutputs(), slotContext),
new ReplaceExpressions(slotContext).replace(
agg.withChildren(
repeat.withAggOutputAndChild(
generateNewOutputsWithMvOutputs(mvPlan, repeat.getOutputs()), mvPlan)
), mvPlan));
} else {
return new LogicalProject<>(
generateProjectsAlias(agg.getOutputs(), slotContext),
new ReplaceExpressions(slotContext).replace(
new LogicalAggregate<>(
agg.getGroupByExpressions(),
replaceAggOutput(
agg, Optional.empty(), Optional.empty(), result.exprRewriteMap),
agg.isNormalized(),
agg.getSourceRepeat(),
logicalAggregate(
logicalRepeat(logicalOlapScan().when(this::shouldSelectIndexWithAgg))).thenApply(ctx -> {
LogicalAggregate<LogicalRepeat<LogicalOlapScan>> agg = ctx.root;
LogicalRepeat<LogicalOlapScan> repeat = agg.child();
LogicalOlapScan scan = repeat.child();
SelectResult result = select(
scan,
agg.getInputSlots(),
ImmutableSet.of(),
extractAggFunctionAndReplaceSlot(agg, Optional.empty()),
nonVirtualGroupByExprs(agg),
new HashSet<>(agg.getExpressions()));

LogicalOlapScan mvPlan = createLogicalOlapScan(scan, result);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan);

if (result.exprRewriteMap.isEmpty()) {
return new LogicalProject<>(
generateProjectsAlias(agg.getOutputs(), slotContext),
new ReplaceExpressions(slotContext).replace(
agg.withChildren(
repeat.withAggOutputAndChild(
generateNewOutputsWithMvOutputs(mvPlan, repeat.getOutputs()), mvPlan)
), mvPlan));
}
}).toRule(RuleType.MATERIALIZED_INDEX_AGG_REPEAT_SCAN),
} else {
return new LogicalProject<>(
generateProjectsAlias(agg.getOutputs(), slotContext),
new ReplaceExpressions(slotContext).replace(
new LogicalAggregate<>(
agg.getGroupByExpressions(),
replaceAggOutput(
agg, Optional.empty(), Optional.empty(), result.exprRewriteMap),
agg.isNormalized(),
agg.getSourceRepeat(),
repeat.withAggOutputAndChild(
generateNewOutputsWithMvOutputs(mvPlan, repeat.getOutputs()), mvPlan)
), mvPlan));
}
}).toRule(RuleType.MATERIALIZED_INDEX_AGG_REPEAT_SCAN),

// filter could push down scan.
// Aggregate(Repeat(Filter(Scan)))
logicalAggregate(logicalRepeat(logicalFilter(logicalOlapScan().when(this::shouldSelectIndex))))
logicalAggregate(logicalRepeat(logicalFilter(logicalOlapScan().when(this::shouldSelectIndexWithAgg))))
.thenApply(ctx -> {
LogicalAggregate<LogicalRepeat<LogicalFilter<LogicalOlapScan>>> agg = ctx.root;
LogicalRepeat<LogicalFilter<LogicalOlapScan>> repeat = agg.child();
Expand All @@ -422,8 +419,7 @@ public List<Rule> buildRules() {
requiredExpr
);

LogicalOlapScan mvPlan =
scan.withMaterializedIndexSelected(result.preAggStatus, result.indexId);
LogicalOlapScan mvPlan = createLogicalOlapScan(scan, result);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan);

if (result.exprRewriteMap.isEmpty()) {
Expand Down Expand Up @@ -457,7 +453,7 @@ public List<Rule> buildRules() {

// column pruning or other projections such as alias, etc.
// Aggregate(Repeat(Project(Scan)))
logicalAggregate(logicalRepeat(logicalProject(logicalOlapScan().when(this::shouldSelectIndex))))
logicalAggregate(logicalRepeat(logicalProject(logicalOlapScan().when(this::shouldSelectIndexWithAgg))))
.thenApply(ctx -> {
LogicalAggregate<LogicalRepeat<LogicalProject<LogicalOlapScan>>> agg = ctx.root;
LogicalRepeat<LogicalProject<LogicalOlapScan>> repeat = agg.child();
Expand All @@ -474,8 +470,7 @@ public List<Rule> buildRules() {
collectRequireExprWithAggAndProject(agg.getExpressions(), project.getProjects())
);

LogicalOlapScan mvPlan =
scan.withMaterializedIndexSelected(result.preAggStatus, result.indexId);
LogicalOlapScan mvPlan = createLogicalOlapScan(scan, result);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan);

if (result.exprRewriteMap.isEmpty()) {
Expand Down Expand Up @@ -514,7 +509,7 @@ public List<Rule> buildRules() {
// filter could push down and project.
// Aggregate(Repeat(Project(Filter(Scan))))
logicalAggregate(logicalRepeat(logicalProject(logicalFilter(logicalOlapScan()
.when(this::shouldSelectIndex))))).thenApply(ctx -> {
.when(this::shouldSelectIndexWithAgg))))).thenApply(ctx -> {
LogicalAggregate<LogicalRepeat<LogicalProject
<LogicalFilter<LogicalOlapScan>>>> agg = ctx.root;
LogicalRepeat<LogicalProject<LogicalFilter<LogicalOlapScan>>> repeat = agg.child();
Expand All @@ -539,8 +534,7 @@ public List<Rule> buildRules() {
requiredExpr
);

LogicalOlapScan mvPlan =
scan.withMaterializedIndexSelected(result.preAggStatus, result.indexId);
LogicalOlapScan mvPlan = createLogicalOlapScan(scan, result);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan);

if (result.exprRewriteMap.isEmpty()) {
Expand Down Expand Up @@ -582,7 +576,7 @@ public List<Rule> buildRules() {
// filter can't push down
// Aggregate(Repeat(Filter(Project(Scan))))
logicalAggregate(logicalRepeat(logicalFilter(logicalProject(logicalOlapScan()
.when(this::shouldSelectIndex))))).thenApply(ctx -> {
.when(this::shouldSelectIndexWithAgg))))).thenApply(ctx -> {
LogicalAggregate<LogicalRepeat<LogicalFilter
<LogicalProject<LogicalOlapScan>>>> agg = ctx.root;
LogicalRepeat<LogicalFilter<LogicalProject<LogicalOlapScan>>> repeat = agg.child();
Expand All @@ -605,8 +599,7 @@ public List<Rule> buildRules() {
requiredExpr
);

LogicalOlapScan mvPlan =
scan.withMaterializedIndexSelected(result.preAggStatus, result.indexId);
LogicalOlapScan mvPlan = createLogicalOlapScan(scan, result);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan);

if (result.exprRewriteMap.isEmpty()) {
Expand Down Expand Up @@ -750,6 +743,19 @@ public SelectResult(PreAggStatus preAggStatus, long indexId, ExprRewriteMap expr
}
}

private static LogicalOlapScan createLogicalOlapScan(LogicalOlapScan scan, SelectResult result) {
LogicalOlapScan mvPlan;
if (result.preAggStatus.isOff()) {
// we only set preAggStatus and make index unselected to let SelectMaterializedIndexWithoutAggregate
// have a chance to run and select proper index
mvPlan = scan.withPreAggStatus(result.preAggStatus);
} else {
mvPlan =
scan.withMaterializedIndexSelected(result.preAggStatus, result.indexId);
}
return mvPlan;
}

/**
* Do aggregate function extraction and replace aggregate function's input slots by underlying project.
* <p>
Expand Down
Loading

0 comments on commit 0c26370

Please sign in to comment.