Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](nereids)fix bug of select mv in nereids #26415

Merged
merged 1 commit into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,28 @@
* Base class for selecting materialized index rules.
*/
public abstract class AbstractSelectMaterializedIndexRule {
protected boolean shouldSelectIndex(LogicalOlapScan scan) {
protected boolean shouldSelectIndexWithAgg(LogicalOlapScan scan) {
switch (scan.getTable().getKeysType()) {
case AGG_KEYS:
case UNIQUE_KEYS:
case DUP_KEYS:
// SelectMaterializedIndexWithAggregate(R1) run before SelectMaterializedIndexWithoutAggregate(R2)
// if R1 selects baseIndex and preAggStatus is off
// we should give a chance to R2 to check if some prefix-index can be selected
// so if R1 selects baseIndex and preAggStatus is off, we keep scan's index unselected in order to
// let R2 to get a chance to do its work
// at last, after R1, the scan may be the 4 status
// 1. preAggStatus is ON and baseIndex is selected, it means select baseIndex is correct.
// 2. preAggStatus is ON and some other Index is selected, this is correct, too.
// 3. preAggStatus is OFF, no index is selected, it means R2 could get a chance to run
// so we check the preAggStatus and if some index is selected to make sure R1 can be run only once
return scan.getPreAggStatus().isOn() && !scan.isIndexSelected();
default:
return false;
}
}

protected boolean shouldSelectIndexWithoutAgg(LogicalOlapScan scan) {
switch (scan.getTable().getKeysType()) {
case AGG_KEYS:
case UNIQUE_KEYS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public List<Rule> buildRules() {
return ImmutableList.of(
// only agg above scan
// Aggregate(Scan)
logicalAggregate(logicalOlapScan().when(this::shouldSelectIndex)).thenApply(ctx -> {
logicalAggregate(logicalOlapScan().when(this::shouldSelectIndexWithAgg)).thenApply(ctx -> {
LogicalAggregate<LogicalOlapScan> agg = ctx.root;
LogicalOlapScan scan = agg.child();
SelectResult result = select(
Expand All @@ -116,7 +116,7 @@ public List<Rule> buildRules() {
agg.getGroupByExpressions(),
new HashSet<>(agg.getExpressions()));

LogicalOlapScan mvPlan = scan.withMaterializedIndexSelected(result.preAggStatus, result.indexId);
LogicalOlapScan mvPlan = createLogicalOlapScan(scan, result);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan);

if (result.exprRewriteMap.isEmpty()) {
Expand All @@ -139,7 +139,7 @@ public List<Rule> buildRules() {

// filter could push down scan.
// Aggregate(Filter(Scan))
logicalAggregate(logicalFilter(logicalOlapScan().when(this::shouldSelectIndex)))
logicalAggregate(logicalFilter(logicalOlapScan().when(this::shouldSelectIndexWithAgg)))
.thenApply(ctx -> {
LogicalAggregate<LogicalFilter<LogicalOlapScan>> agg = ctx.root;
LogicalFilter<LogicalOlapScan> filter = agg.child();
Expand All @@ -162,8 +162,7 @@ public List<Rule> buildRules() {
requiredExpr
);

LogicalOlapScan mvPlan =
scan.withMaterializedIndexSelected(result.preAggStatus, result.indexId);
LogicalOlapScan mvPlan = createLogicalOlapScan(scan, result);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan);

if (result.exprRewriteMap.isEmpty()) {
Expand Down Expand Up @@ -191,7 +190,7 @@ public List<Rule> buildRules() {

// column pruning or other projections such as alias, etc.
// Aggregate(Project(Scan))
logicalAggregate(logicalProject(logicalOlapScan().when(this::shouldSelectIndex)))
logicalAggregate(logicalProject(logicalOlapScan().when(this::shouldSelectIndexWithAgg)))
.thenApply(ctx -> {
LogicalAggregate<LogicalProject<LogicalOlapScan>> agg = ctx.root;
LogicalProject<LogicalOlapScan> project = agg.child();
Expand All @@ -207,8 +206,7 @@ public List<Rule> buildRules() {
collectRequireExprWithAggAndProject(agg.getExpressions(), project.getProjects())
);

LogicalOlapScan mvPlan =
scan.withMaterializedIndexSelected(result.preAggStatus, result.indexId);
LogicalOlapScan mvPlan = createLogicalOlapScan(scan, result);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan);

if (result.exprRewriteMap.isEmpty()) {
Expand Down Expand Up @@ -242,7 +240,7 @@ public List<Rule> buildRules() {
// filter could push down and project.
// Aggregate(Project(Filter(Scan)))
logicalAggregate(logicalProject(logicalFilter(logicalOlapScan()
.when(this::shouldSelectIndex)))).thenApply(ctx -> {
.when(this::shouldSelectIndexWithAgg)))).thenApply(ctx -> {
LogicalAggregate<LogicalProject<LogicalFilter<LogicalOlapScan>>> agg = ctx.root;
LogicalProject<LogicalFilter<LogicalOlapScan>> project = agg.child();
LogicalFilter<LogicalOlapScan> filter = project.child();
Expand All @@ -265,8 +263,7 @@ public List<Rule> buildRules() {
requiredExpr
);

LogicalOlapScan mvPlan =
scan.withMaterializedIndexSelected(result.preAggStatus, result.indexId);
LogicalOlapScan mvPlan = createLogicalOlapScan(scan, result);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan);

if (result.exprRewriteMap.isEmpty()) {
Expand Down Expand Up @@ -301,7 +298,7 @@ public List<Rule> buildRules() {
// filter can't push down
// Aggregate(Filter(Project(Scan)))
logicalAggregate(logicalFilter(logicalProject(logicalOlapScan()
.when(this::shouldSelectIndex)))).thenApply(ctx -> {
.when(this::shouldSelectIndexWithAgg)))).thenApply(ctx -> {
LogicalAggregate<LogicalFilter<LogicalProject<LogicalOlapScan>>> agg = ctx.root;
LogicalFilter<LogicalProject<LogicalOlapScan>> filter = agg.child();
LogicalProject<LogicalOlapScan> project = filter.child();
Expand All @@ -322,8 +319,7 @@ public List<Rule> buildRules() {
requiredExpr
);

LogicalOlapScan mvPlan =
scan.withMaterializedIndexSelected(result.preAggStatus, result.indexId);
LogicalOlapScan mvPlan = createLogicalOlapScan(scan, result);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan);

if (result.exprRewriteMap.isEmpty()) {
Expand Down Expand Up @@ -357,48 +353,49 @@ public List<Rule> buildRules() {

// only agg above scan
// Aggregate(Repeat(Scan))
logicalAggregate(logicalRepeat(logicalOlapScan().when(this::shouldSelectIndex))).thenApply(ctx -> {
LogicalAggregate<LogicalRepeat<LogicalOlapScan>> agg = ctx.root;
LogicalRepeat<LogicalOlapScan> repeat = agg.child();
LogicalOlapScan scan = repeat.child();
SelectResult result = select(
scan,
agg.getInputSlots(),
ImmutableSet.of(),
extractAggFunctionAndReplaceSlot(agg, Optional.empty()),
nonVirtualGroupByExprs(agg),
new HashSet<>(agg.getExpressions()));

LogicalOlapScan mvPlan = scan.withMaterializedIndexSelected(result.preAggStatus, result.indexId);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan);

if (result.exprRewriteMap.isEmpty()) {
return new LogicalProject<>(
generateProjectsAlias(agg.getOutputs(), slotContext),
new ReplaceExpressions(slotContext).replace(
agg.withChildren(
repeat.withAggOutputAndChild(
generateNewOutputsWithMvOutputs(mvPlan, repeat.getOutputs()), mvPlan)
), mvPlan));
} else {
return new LogicalProject<>(
generateProjectsAlias(agg.getOutputs(), slotContext),
new ReplaceExpressions(slotContext).replace(
new LogicalAggregate<>(
agg.getGroupByExpressions(),
replaceAggOutput(
agg, Optional.empty(), Optional.empty(), result.exprRewriteMap),
agg.isNormalized(),
agg.getSourceRepeat(),
logicalAggregate(
logicalRepeat(logicalOlapScan().when(this::shouldSelectIndexWithAgg))).thenApply(ctx -> {
LogicalAggregate<LogicalRepeat<LogicalOlapScan>> agg = ctx.root;
LogicalRepeat<LogicalOlapScan> repeat = agg.child();
LogicalOlapScan scan = repeat.child();
SelectResult result = select(
scan,
agg.getInputSlots(),
ImmutableSet.of(),
extractAggFunctionAndReplaceSlot(agg, Optional.empty()),
nonVirtualGroupByExprs(agg),
new HashSet<>(agg.getExpressions()));

LogicalOlapScan mvPlan = createLogicalOlapScan(scan, result);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan);

if (result.exprRewriteMap.isEmpty()) {
return new LogicalProject<>(
generateProjectsAlias(agg.getOutputs(), slotContext),
new ReplaceExpressions(slotContext).replace(
agg.withChildren(
repeat.withAggOutputAndChild(
generateNewOutputsWithMvOutputs(mvPlan, repeat.getOutputs()), mvPlan)
), mvPlan));
}
}).toRule(RuleType.MATERIALIZED_INDEX_AGG_REPEAT_SCAN),
} else {
return new LogicalProject<>(
generateProjectsAlias(agg.getOutputs(), slotContext),
new ReplaceExpressions(slotContext).replace(
new LogicalAggregate<>(
agg.getGroupByExpressions(),
replaceAggOutput(
agg, Optional.empty(), Optional.empty(), result.exprRewriteMap),
agg.isNormalized(),
agg.getSourceRepeat(),
repeat.withAggOutputAndChild(
generateNewOutputsWithMvOutputs(mvPlan, repeat.getOutputs()), mvPlan)
), mvPlan));
}
}).toRule(RuleType.MATERIALIZED_INDEX_AGG_REPEAT_SCAN),

// filter could push down scan.
// Aggregate(Repeat(Filter(Scan)))
logicalAggregate(logicalRepeat(logicalFilter(logicalOlapScan().when(this::shouldSelectIndex))))
logicalAggregate(logicalRepeat(logicalFilter(logicalOlapScan().when(this::shouldSelectIndexWithAgg))))
.thenApply(ctx -> {
LogicalAggregate<LogicalRepeat<LogicalFilter<LogicalOlapScan>>> agg = ctx.root;
LogicalRepeat<LogicalFilter<LogicalOlapScan>> repeat = agg.child();
Expand All @@ -422,8 +419,7 @@ public List<Rule> buildRules() {
requiredExpr
);

LogicalOlapScan mvPlan =
scan.withMaterializedIndexSelected(result.preAggStatus, result.indexId);
LogicalOlapScan mvPlan = createLogicalOlapScan(scan, result);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan);

if (result.exprRewriteMap.isEmpty()) {
Expand Down Expand Up @@ -457,7 +453,7 @@ public List<Rule> buildRules() {

// column pruning or other projections such as alias, etc.
// Aggregate(Repeat(Project(Scan)))
logicalAggregate(logicalRepeat(logicalProject(logicalOlapScan().when(this::shouldSelectIndex))))
logicalAggregate(logicalRepeat(logicalProject(logicalOlapScan().when(this::shouldSelectIndexWithAgg))))
.thenApply(ctx -> {
LogicalAggregate<LogicalRepeat<LogicalProject<LogicalOlapScan>>> agg = ctx.root;
LogicalRepeat<LogicalProject<LogicalOlapScan>> repeat = agg.child();
Expand All @@ -474,8 +470,7 @@ public List<Rule> buildRules() {
collectRequireExprWithAggAndProject(agg.getExpressions(), project.getProjects())
);

LogicalOlapScan mvPlan =
scan.withMaterializedIndexSelected(result.preAggStatus, result.indexId);
LogicalOlapScan mvPlan = createLogicalOlapScan(scan, result);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan);

if (result.exprRewriteMap.isEmpty()) {
Expand Down Expand Up @@ -514,7 +509,7 @@ public List<Rule> buildRules() {
// filter could push down and project.
// Aggregate(Repeat(Project(Filter(Scan))))
logicalAggregate(logicalRepeat(logicalProject(logicalFilter(logicalOlapScan()
.when(this::shouldSelectIndex))))).thenApply(ctx -> {
.when(this::shouldSelectIndexWithAgg))))).thenApply(ctx -> {
LogicalAggregate<LogicalRepeat<LogicalProject
<LogicalFilter<LogicalOlapScan>>>> agg = ctx.root;
LogicalRepeat<LogicalProject<LogicalFilter<LogicalOlapScan>>> repeat = agg.child();
Expand All @@ -539,8 +534,7 @@ public List<Rule> buildRules() {
requiredExpr
);

LogicalOlapScan mvPlan =
scan.withMaterializedIndexSelected(result.preAggStatus, result.indexId);
LogicalOlapScan mvPlan = createLogicalOlapScan(scan, result);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan);

if (result.exprRewriteMap.isEmpty()) {
Expand Down Expand Up @@ -582,7 +576,7 @@ public List<Rule> buildRules() {
// filter can't push down
// Aggregate(Repeat(Filter(Project(Scan))))
logicalAggregate(logicalRepeat(logicalFilter(logicalProject(logicalOlapScan()
.when(this::shouldSelectIndex))))).thenApply(ctx -> {
.when(this::shouldSelectIndexWithAgg))))).thenApply(ctx -> {
LogicalAggregate<LogicalRepeat<LogicalFilter
<LogicalProject<LogicalOlapScan>>>> agg = ctx.root;
LogicalRepeat<LogicalFilter<LogicalProject<LogicalOlapScan>>> repeat = agg.child();
Expand All @@ -605,8 +599,7 @@ public List<Rule> buildRules() {
requiredExpr
);

LogicalOlapScan mvPlan =
scan.withMaterializedIndexSelected(result.preAggStatus, result.indexId);
LogicalOlapScan mvPlan = createLogicalOlapScan(scan, result);
SlotContext slotContext = generateBaseScanExprToMvExpr(mvPlan);

if (result.exprRewriteMap.isEmpty()) {
Expand Down Expand Up @@ -750,6 +743,19 @@ public SelectResult(PreAggStatus preAggStatus, long indexId, ExprRewriteMap expr
}
}

private static LogicalOlapScan createLogicalOlapScan(LogicalOlapScan scan, SelectResult result) {
LogicalOlapScan mvPlan;
if (result.preAggStatus.isOff()) {
// we only set preAggStatus and make index unselected to let SelectMaterializedIndexWithoutAggregate
// have a chance to run and select proper index
mvPlan = scan.withPreAggStatus(result.preAggStatus);
} else {
mvPlan =
scan.withMaterializedIndexSelected(result.preAggStatus, result.indexId);
}
return mvPlan;
}

/**
* Do aggregate function extraction and replace aggregate function's input slots by underlying project.
* <p>
Expand Down
Loading