Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner/core: support limit push down #24757

Merged
merged 16 commits into from
Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1854,7 +1854,8 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC
EqualConditions: p.EqualConditions,
storeTp: kv.TiFlash,
mppShuffleJoin: !useBCJ,
}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, childrenProps...)
// Mpp Join has quite heavy cost. Even limit might not suspend it in time, so we dont scale the count.
}.Init(p.ctx, p.stats, p.blockOffset, childrenProps...)
return []PhysicalPlan{join}
}

Expand Down Expand Up @@ -2505,6 +2506,9 @@ func (p *LogicalLimit) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([]
if !p.limitHints.preferLimitToCop {
allTaskTypes = append(allTaskTypes, property.RootTaskType)
}
if p.canPushToCop(kv.TiFlash) && p.ctx.GetSessionVars().IsMPPAllowed() {
allTaskTypes = append(allTaskTypes, property.MppTaskType)
}
ret := make([]PhysicalPlan, 0, len(allTaskTypes))
for _, tp := range allTaskTypes {
resultProp := &property.PhysicalProperty{TaskTp: tp, ExpectedCnt: float64(p.Count + p.Offset)}
Expand Down
4 changes: 2 additions & 2 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3867,8 +3867,8 @@ func (s *testIntegrationSerialSuite) TestEnforceMPP(c *C) {

// read from tiflash, mpp with large cost
tk.MustQuery("explain format='verbose' select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1").Check(testkit.Rows(
"HashAgg_21 1.00 11910.68 root funcs:count(Column#5)->Column#3",
"└─TableReader_23 1.00 11877.08 root data:ExchangeSender_22",
"HashAgg_21 1.00 11910.73 root funcs:count(Column#5)->Column#3",
"└─TableReader_23 1.00 11877.13 root data:ExchangeSender_22",
" └─ExchangeSender_22 1.00 285050.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─HashAgg_9 1.00 285050.00 batchCop[tiflash] funcs:count(1)->Column#5",
" └─Selection_20 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)",
Expand Down
12 changes: 11 additions & 1 deletion planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,15 @@ func (p *PhysicalLimit) attach2Task(tasks ...task) task {
}
t = cop.convertToRootTask(p.ctx)
sunk = p.sinkIntoIndexLookUp(t)
} else if mpp, ok := t.(*mppTask); ok {
newCount := p.Offset + p.Count
childProfile := mpp.plan().statsInfo()
stats := deriveLimitStats(childProfile, float64(newCount))
pushedDownLimit := PhysicalLimit{Count: newCount}.Init(p.ctx, stats, p.blockOffset)
mpp = attachPlan2Task(pushedDownLimit, mpp).(*mppTask)
pushedDownLimit.SetSchema(pushedDownLimit.children[0].Schema())
pushedDownLimit.cost = mpp.cost()
t = mpp.convertToRootTask(p.ctx)
}
p.cost = t.cost()
if sunk {
Expand Down Expand Up @@ -2057,7 +2066,8 @@ func (t *mppTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask {
}.Init(ctx, t.p.SelectBlockOffset())
p.stats = t.p.statsInfo()

p.cost = t.cst / p.ctx.GetSessionVars().CopTiFlashConcurrencyFactor
cst := t.cst + t.count()*ctx.GetSessionVars().GetNetworkFactor(nil)
p.cost = cst / p.ctx.GetSessionVars().CopTiFlashConcurrencyFactor
if p.ctx.GetSessionVars().IsMPPEnforced() {
p.cost = 0
}
Expand Down
5 changes: 4 additions & 1 deletion planner/core/testdata/integration_serial_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,10 @@
"desc format = 'brief' select * from (select id from t group by id) C join (select sum(b),id from (select t.id, t1.id as b from t join (select id, count(*) as c from t group by id) t1 on t.id=t1.id)A group by id)B on C.id=b.id",
"desc format = 'brief' select * from t join t t1 on t.id = t1.id order by t.value limit 1",
"desc format = 'brief' select * from t join t t1 on t.id = t1.id order by t.value % 100 limit 1",
"desc format = 'brief' select count(*) from (select t.id, t.value v1 from t join t t1 on t.id = t1.id order by t.value limit 20) v group by v.v1"
"desc format = 'brief' select count(*) from (select t.id, t.value v1 from t join t t1 on t.id = t1.id order by t.value limit 20) v group by v.v1",
"desc format = 'brief' select * from t join t t1 on t.id = t1.id limit 1",
"desc format = 'brief' select * from t join t t1 on t.id = t1.id limit 1",
"desc format = 'brief' select count(*) from (select t.id, t.value v1 from t join t t1 on t.id = t1.id limit 20) v group by v.v1"
]
},
{
Expand Down
88 changes: 69 additions & 19 deletions planner/core/testdata/integration_serial_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
{
"SQL": "explain format = 'verbose' select count(*) from t2 group by a",
"Plan": [
"TableReader_24 3.00 3.21 root data:ExchangeSender_23",
"TableReader_24 3.00 3.33 root data:ExchangeSender_23",
"└─ExchangeSender_23 3.00 77.00 batchCop[tiflash] ExchangeType: PassThrough",
" └─Projection_22 3.00 0.00 batchCop[tiflash] Column#4",
" └─HashAgg_8 3.00 77.00 batchCop[tiflash] group by:test.t2.a, funcs:count(1)->Column#4",
Expand Down Expand Up @@ -152,8 +152,8 @@
{
"SQL": "explain format = 'verbose' select count(*) from t1 join t2 on t1.a = t2.a",
"Plan": [
"StreamAgg_12 1.00 18.81 root funcs:count(1)->Column#7",
"└─TableReader_44 3.00 9.81 root data:ExchangeSender_43",
"StreamAgg_12 1.00 18.93 root funcs:count(1)->Column#7",
"└─TableReader_44 3.00 9.93 root data:ExchangeSender_43",
" └─ExchangeSender_43 3.00 235.38 cop[tiflash] ExchangeType: PassThrough",
" └─HashJoin_40 3.00 235.38 cop[tiflash] inner join, equal:[eq(test.t1.a, test.t2.a)]",
" ├─ExchangeReceiver_19(Build) 3.00 77.00 cop[tiflash] ",
Expand All @@ -167,11 +167,11 @@
{
"SQL": "explain format = 'verbose' select count(*) from t1 join t2 on t1.a = t2.a join t3 on t1.b = t3.b",
"Plan": [
"StreamAgg_15 1.00 60.48 root funcs:count(1)->Column#10",
"└─HashJoin_65 3.00 51.48 root inner join, equal:[eq(test.t1.b, test.t3.b)]",
"StreamAgg_15 1.00 60.60 root funcs:count(1)->Column#10",
"└─HashJoin_65 3.00 51.60 root inner join, equal:[eq(test.t1.b, test.t3.b)]",
" ├─IndexReader_53(Build) 3.00 11.66 root index:IndexFullScan_52",
" │ └─IndexFullScan_52 3.00 150.50 cop[tikv] table:t3, index:c(b) keep order:false",
" └─TableReader_39(Probe) 3.00 11.02 root data:ExchangeSender_38",
" └─TableReader_39(Probe) 3.00 11.14 root data:ExchangeSender_38",
" └─ExchangeSender_38 3.00 264.38 cop[tiflash] ExchangeType: PassThrough",
" └─HashJoin_29 3.00 264.38 cop[tiflash] inner join, equal:[eq(test.t1.a, test.t2.a)]",
" ├─ExchangeReceiver_35(Build) 3.00 106.00 cop[tiflash] ",
Expand All @@ -185,21 +185,22 @@
{
"SQL": "explain format = 'verbose' select (2) in (select count(*) from t1) from (select t.b < (select t.b from t2 limit 1 ) from t3 t) t",
"Plan": [
"HashJoin_19 3.00 133.41 root CARTESIAN left outer semi join",
"├─Selection_39(Build) 0.80 11.18 root eq(2, Column#18)",
"│ └─StreamAgg_60 1.00 69.50 root funcs:count(Column#32)->Column#18",
"│ └─TableReader_61 1.00 5.17 root data:StreamAgg_44",
"│ └─StreamAgg_44 1.00 8.18 batchCop[tiflash] funcs:count(1)->Column#32",
"│ └─TableFullScan_59 3.00 60.50 batchCop[tiflash] table:t1 keep order:false",
"└─Projection_20(Probe) 3.00 101.83 root 1->Column#28",
" └─Apply_22 3.00 82.03 root CARTESIAN left outer join",
"HashJoin_19 3.00 127.40 root CARTESIAN left outer semi join",
"├─Selection_44(Build) 0.80 11.18 root eq(2, Column#18)",
"│ └─StreamAgg_65 1.00 69.50 root funcs:count(Column#32)->Column#18",
"│ └─TableReader_66 1.00 5.17 root data:StreamAgg_49",
"│ └─StreamAgg_49 1.00 8.18 batchCop[tiflash] funcs:count(1)->Column#32",
"│ └─TableFullScan_64 3.00 60.50 batchCop[tiflash] table:t1 keep order:false",
"└─Projection_20(Probe) 3.00 95.82 root 1->Column#28",
" └─Apply_22 3.00 76.02 root CARTESIAN left outer join",
" ├─TableReader_24(Build) 3.00 10.16 root data:TableFullScan_23",
" │ └─TableFullScan_23 3.00 128.00 cop[tikv] table:t keep order:false",
" └─Projection_27(Probe) 1.00 23.96 root 1->Column#26",
" └─Limit_28 1.00 5.36 root offset:0, count:1",
" └─TableReader_34 1.00 5.36 root data:Limit_33",
" └─Limit_33 1.00 56.00 cop[tikv] offset:0, count:1",
" └─TableFullScan_31 1.00 56.00 cop[tikv] table:t2 keep order:false"
" └─Projection_27(Probe) 1.00 21.95 root 1->Column#26",
" └─Limit_31 1.00 3.35 root offset:0, count:1",
" └─TableReader_43 1.00 3.35 root data:ExchangeSender_42",
" └─ExchangeSender_42 1.00 79.50 cop[tiflash] ExchangeType: PassThrough",
" └─Limit_41 1.00 79.50 cop[tiflash] offset:0, count:1",
" └─TableFullScan_40 1.00 79.50 cop[tiflash] table:t2 keep order:false"
]
},
{
Expand Down Expand Up @@ -2562,6 +2563,55 @@
" └─Selection(Probe) 9990.00 batchCop[tiflash] not(isnull(test.t.id))",
" └─TableFullScan 10000.00 batchCop[tiflash] table:t1 keep order:false, stats:pseudo"
]
},
{
"SQL": "desc format = 'brief' select * from t join t t1 on t.id = t1.id limit 1",
"Plan": [
"Limit 1.00 root offset:0, count:1",
"└─TableReader 1.00 root data:ExchangeSender",
" └─ExchangeSender 1.00 cop[tiflash] ExchangeType: PassThrough",
" └─Limit 1.00 cop[tiflash] offset:0, count:1",
" └─HashJoin 12487.50 cop[tiflash] inner join, equal:[eq(test.t.id, test.t.id)]",
" ├─ExchangeReceiver(Build) 9990.00 cop[tiflash] ",
" │ └─ExchangeSender 9990.00 cop[tiflash] ExchangeType: Broadcast",
" │ └─Selection 9990.00 cop[tiflash] not(isnull(test.t.id))",
" │ └─TableFullScan 10000.00 cop[tiflash] table:t keep order:false, stats:pseudo",
" └─Selection(Probe) 9990.00 cop[tiflash] not(isnull(test.t.id))",
" └─TableFullScan 0.80 cop[tiflash] table:t1 keep order:false, stats:pseudo"
]
},
{
"SQL": "desc format = 'brief' select * from t join t t1 on t.id = t1.id limit 1",
"Plan": [
"Limit 1.00 root offset:0, count:1",
"└─TableReader 1.00 root data:ExchangeSender",
" └─ExchangeSender 1.00 cop[tiflash] ExchangeType: PassThrough",
" └─Limit 1.00 cop[tiflash] offset:0, count:1",
" └─HashJoin 12487.50 cop[tiflash] inner join, equal:[eq(test.t.id, test.t.id)]",
" ├─ExchangeReceiver(Build) 9990.00 cop[tiflash] ",
" │ └─ExchangeSender 9990.00 cop[tiflash] ExchangeType: Broadcast",
" │ └─Selection 9990.00 cop[tiflash] not(isnull(test.t.id))",
" │ └─TableFullScan 10000.00 cop[tiflash] table:t keep order:false, stats:pseudo",
" └─Selection(Probe) 9990.00 cop[tiflash] not(isnull(test.t.id))",
" └─TableFullScan 0.80 cop[tiflash] table:t1 keep order:false, stats:pseudo"
]
},
{
"SQL": "desc format = 'brief' select count(*) from (select t.id, t.value v1 from t join t t1 on t.id = t1.id limit 20) v group by v.v1",
"Plan": [
"HashAgg 20.00 root group by:test.t.value, funcs:count(1)->Column#7",
"└─Limit 20.00 root offset:0, count:20",
" └─TableReader 20.00 root data:ExchangeSender",
" └─ExchangeSender 20.00 cop[tiflash] ExchangeType: PassThrough",
" └─Limit 20.00 cop[tiflash] offset:0, count:20",
" └─HashJoin 12487.50 cop[tiflash] inner join, equal:[eq(test.t.id, test.t.id)]",
" ├─ExchangeReceiver(Build) 9990.00 cop[tiflash] ",
" │ └─ExchangeSender 9990.00 cop[tiflash] ExchangeType: Broadcast",
" │ └─Selection 9990.00 cop[tiflash] not(isnull(test.t.id))",
" │ └─TableFullScan 10000.00 cop[tiflash] table:t keep order:false, stats:pseudo",
" └─Selection(Probe) 9990.00 cop[tiflash] not(isnull(test.t.id))",
" └─TableFullScan 16.02 cop[tiflash] table:t1 keep order:false, stats:pseudo"
]
}
]
},
Expand Down