diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 0d8b183a6900a..51fb32086a3aa 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -1868,7 +1868,8 @@ func (p *LogicalJoin) tryToGetMppHashJoin(prop *property.PhysicalProperty, useBC EqualConditions: p.EqualConditions, storeTp: kv.TiFlash, mppShuffleJoin: !useBCJ, - }.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), p.blockOffset, childrenProps...) + // Mpp Join has quite heavy cost. Even limit might not suspend it in time, so we dont scale the count. + }.Init(p.ctx, p.stats, p.blockOffset, childrenProps...) return []PhysicalPlan{join} } @@ -2520,6 +2521,9 @@ func (p *LogicalLimit) exhaustPhysicalPlans(prop *property.PhysicalProperty) ([] if !p.limitHints.preferLimitToCop { allTaskTypes = append(allTaskTypes, property.RootTaskType) } + if p.canPushToCop(kv.TiFlash) && p.ctx.GetSessionVars().IsMPPAllowed() { + allTaskTypes = append(allTaskTypes, property.MppTaskType) + } ret := make([]PhysicalPlan, 0, len(allTaskTypes)) for _, tp := range allTaskTypes { resultProp := &property.PhysicalProperty{TaskTp: tp, ExpectedCnt: float64(p.Count + p.Offset)} diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index b2861ccc6ec60..700b9fd8d9c23 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -3925,8 +3925,8 @@ func (s *testIntegrationSerialSuite) TestEnforceMPP(c *C) { // read from tiflash, mpp with large cost tk.MustQuery("explain format='verbose' select /*+ read_from_storage(tiflash[t]) */ count(*) from t where a=1").Check(testkit.Rows( - "HashAgg_21 1.00 11910.68 root funcs:count(Column#5)->Column#3", - "└─TableReader_23 1.00 11877.08 root data:ExchangeSender_22", + "HashAgg_21 1.00 11910.73 root funcs:count(Column#5)->Column#3", + "└─TableReader_23 1.00 11877.13 root data:ExchangeSender_22", " └─ExchangeSender_22 1.00 285050.00 batchCop[tiflash] ExchangeType: PassThrough", " └─HashAgg_9 1.00 285050.00 batchCop[tiflash] funcs:count(1)->Column#5", " └─Selection_20 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)", diff --git a/planner/core/task.go b/planner/core/task.go index c1b925451ca1b..8559afa8ab6a3 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1106,6 +1106,15 @@ func (p *PhysicalLimit) attach2Task(tasks ...task) task { } t = cop.convertToRootTask(p.ctx) sunk = p.sinkIntoIndexLookUp(t) + } else if mpp, ok := t.(*mppTask); ok { + newCount := p.Offset + p.Count + childProfile := mpp.plan().statsInfo() + stats := deriveLimitStats(childProfile, float64(newCount)) + pushedDownLimit := PhysicalLimit{Count: newCount}.Init(p.ctx, stats, p.blockOffset) + mpp = attachPlan2Task(pushedDownLimit, mpp).(*mppTask) + pushedDownLimit.SetSchema(pushedDownLimit.children[0].Schema()) + pushedDownLimit.cost = mpp.cost() + t = mpp.convertToRootTask(p.ctx) } p.cost = t.cost() if sunk { @@ -2067,7 +2076,8 @@ func (t *mppTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask { }.Init(ctx, t.p.SelectBlockOffset()) p.stats = t.p.statsInfo() - p.cost = t.cst / p.ctx.GetSessionVars().CopTiFlashConcurrencyFactor + cst := t.cst + t.count()*ctx.GetSessionVars().GetNetworkFactor(nil) + p.cost = cst / p.ctx.GetSessionVars().CopTiFlashConcurrencyFactor if p.ctx.GetSessionVars().IsMPPEnforced() { p.cost = 0 } diff --git a/planner/core/testdata/integration_serial_suite_in.json b/planner/core/testdata/integration_serial_suite_in.json index 0cc21a8c36e4b..57d2ffa9068f1 100644 --- a/planner/core/testdata/integration_serial_suite_in.json +++ b/planner/core/testdata/integration_serial_suite_in.json @@ -301,7 +301,10 @@ "desc format = 'brief' select * from (select id from t group by id) C join (select sum(b),id from (select t.id, t1.id as b from t join (select id, count(*) as c from t group by id) t1 on t.id=t1.id)A group by id)B on C.id=b.id", "desc format = 'brief' select * from t join t t1 on t.id = t1.id order by t.value limit 1", "desc format = 'brief' select * from t join t t1 on t.id = t1.id order by t.value % 100 limit 1", - "desc format = 'brief' select count(*) from (select t.id, t.value v1 from t join t t1 on t.id = t1.id order by t.value limit 20) v group by v.v1" + "desc format = 'brief' select count(*) from (select t.id, t.value v1 from t join t t1 on t.id = t1.id order by t.value limit 20) v group by v.v1", + "desc format = 'brief' select * from t join t t1 on t.id = t1.id limit 1", + "desc format = 'brief' select * from t join t t1 on t.id = t1.id limit 1", + "desc format = 'brief' select count(*) from (select t.id, t.value v1 from t join t t1 on t.id = t1.id limit 20) v group by v.v1" ] }, { diff --git a/planner/core/testdata/integration_serial_suite_out.json b/planner/core/testdata/integration_serial_suite_out.json index a2bb282dc3b4a..373a09470a69c 100644 --- a/planner/core/testdata/integration_serial_suite_out.json +++ b/planner/core/testdata/integration_serial_suite_out.json @@ -102,7 +102,7 @@ { "SQL": "explain format = 'verbose' select count(*) from t2 group by a", "Plan": [ - "TableReader_24 3.00 3.21 root data:ExchangeSender_23", + "TableReader_24 3.00 3.33 root data:ExchangeSender_23", "└─ExchangeSender_23 3.00 77.00 batchCop[tiflash] ExchangeType: PassThrough", " └─Projection_22 3.00 0.00 batchCop[tiflash] Column#4", " └─HashAgg_8 3.00 77.00 batchCop[tiflash] group by:test.t2.a, funcs:count(1)->Column#4", @@ -152,8 +152,8 @@ { "SQL": "explain format = 'verbose' select count(*) from t1 join t2 on t1.a = t2.a", "Plan": [ - "StreamAgg_12 1.00 18.81 root funcs:count(1)->Column#7", - "└─TableReader_44 3.00 9.81 root data:ExchangeSender_43", + "StreamAgg_12 1.00 18.93 root funcs:count(1)->Column#7", + "└─TableReader_44 3.00 9.93 root data:ExchangeSender_43", " └─ExchangeSender_43 3.00 235.38 cop[tiflash] ExchangeType: PassThrough", " └─HashJoin_40 3.00 235.38 cop[tiflash] inner join, equal:[eq(test.t1.a, test.t2.a)]", " ├─ExchangeReceiver_19(Build) 3.00 77.00 cop[tiflash] ", @@ -167,11 +167,11 @@ { "SQL": "explain format = 'verbose' select count(*) from t1 join t2 on t1.a = t2.a join t3 on t1.b = t3.b", "Plan": [ - "StreamAgg_15 1.00 60.48 root funcs:count(1)->Column#10", - "└─HashJoin_65 3.00 51.48 root inner join, equal:[eq(test.t1.b, test.t3.b)]", + "StreamAgg_15 1.00 60.60 root funcs:count(1)->Column#10", + "└─HashJoin_65 3.00 51.60 root inner join, equal:[eq(test.t1.b, test.t3.b)]", " ├─IndexReader_53(Build) 3.00 11.66 root index:IndexFullScan_52", " │ └─IndexFullScan_52 3.00 150.50 cop[tikv] table:t3, index:c(b) keep order:false", - " └─TableReader_39(Probe) 3.00 11.02 root data:ExchangeSender_38", + " └─TableReader_39(Probe) 3.00 11.14 root data:ExchangeSender_38", " └─ExchangeSender_38 3.00 264.38 cop[tiflash] ExchangeType: PassThrough", " └─HashJoin_29 3.00 264.38 cop[tiflash] inner join, equal:[eq(test.t1.a, test.t2.a)]", " ├─ExchangeReceiver_35(Build) 3.00 106.00 cop[tiflash] ", @@ -185,21 +185,22 @@ { "SQL": "explain format = 'verbose' select (2) in (select count(*) from t1) from (select t.b < (select t.b from t2 limit 1 ) from t3 t) t", "Plan": [ - "HashJoin_19 3.00 133.41 root CARTESIAN left outer semi join", - "├─Selection_39(Build) 0.80 11.18 root eq(2, Column#18)", - "│ └─StreamAgg_60 1.00 69.50 root funcs:count(Column#32)->Column#18", - "│ └─TableReader_61 1.00 5.17 root data:StreamAgg_44", - "│ └─StreamAgg_44 1.00 8.18 batchCop[tiflash] funcs:count(1)->Column#32", - "│ └─TableFullScan_59 3.00 60.50 batchCop[tiflash] table:t1 keep order:false", - "└─Projection_20(Probe) 3.00 101.83 root 1->Column#28", - " └─Apply_22 3.00 82.03 root CARTESIAN left outer join", + "HashJoin_19 3.00 127.40 root CARTESIAN left outer semi join", + "├─Selection_44(Build) 0.80 11.18 root eq(2, Column#18)", + "│ └─StreamAgg_65 1.00 69.50 root funcs:count(Column#32)->Column#18", + "│ └─TableReader_66 1.00 5.17 root data:StreamAgg_49", + "│ └─StreamAgg_49 1.00 8.18 batchCop[tiflash] funcs:count(1)->Column#32", + "│ └─TableFullScan_64 3.00 60.50 batchCop[tiflash] table:t1 keep order:false", + "└─Projection_20(Probe) 3.00 95.82 root 1->Column#28", + " └─Apply_22 3.00 76.02 root CARTESIAN left outer join", " ├─TableReader_24(Build) 3.00 10.16 root data:TableFullScan_23", " │ └─TableFullScan_23 3.00 128.00 cop[tikv] table:t keep order:false", - " └─Projection_27(Probe) 1.00 23.96 root 1->Column#26", - " └─Limit_28 1.00 5.36 root offset:0, count:1", - " └─TableReader_34 1.00 5.36 root data:Limit_33", - " └─Limit_33 1.00 56.00 cop[tikv] offset:0, count:1", - " └─TableFullScan_31 1.00 56.00 cop[tikv] table:t2 keep order:false" + " └─Projection_27(Probe) 1.00 21.95 root 1->Column#26", + " └─Limit_31 1.00 3.35 root offset:0, count:1", + " └─TableReader_43 1.00 3.35 root data:ExchangeSender_42", + " └─ExchangeSender_42 1.00 79.50 cop[tiflash] ExchangeType: PassThrough", + " └─Limit_41 1.00 79.50 cop[tiflash] offset:0, count:1", + " └─TableFullScan_40 1.00 79.50 cop[tiflash] table:t2 keep order:false" ] }, { @@ -2669,6 +2670,55 @@ " └─Selection(Probe) 9990.00 batchCop[tiflash] not(isnull(test.t.id))", " └─TableFullScan 10000.00 batchCop[tiflash] table:t1 keep order:false, stats:pseudo" ] + }, + { + "SQL": "desc format = 'brief' select * from t join t t1 on t.id = t1.id limit 1", + "Plan": [ + "Limit 1.00 root offset:0, count:1", + "└─TableReader 1.00 root data:ExchangeSender", + " └─ExchangeSender 1.00 cop[tiflash] ExchangeType: PassThrough", + " └─Limit 1.00 cop[tiflash] offset:0, count:1", + " └─HashJoin 12487.50 cop[tiflash] inner join, equal:[eq(test.t.id, test.t.id)]", + " ├─ExchangeReceiver(Build) 9990.00 cop[tiflash] ", + " │ └─ExchangeSender 9990.00 cop[tiflash] ExchangeType: Broadcast", + " │ └─Selection 9990.00 cop[tiflash] not(isnull(test.t.id))", + " │ └─TableFullScan 10000.00 cop[tiflash] table:t keep order:false, stats:pseudo", + " └─Selection(Probe) 9990.00 cop[tiflash] not(isnull(test.t.id))", + " └─TableFullScan 0.80 cop[tiflash] table:t1 keep order:false, stats:pseudo" + ] + }, + { + "SQL": "desc format = 'brief' select * from t join t t1 on t.id = t1.id limit 1", + "Plan": [ + "Limit 1.00 root offset:0, count:1", + "└─TableReader 1.00 root data:ExchangeSender", + " └─ExchangeSender 1.00 cop[tiflash] ExchangeType: PassThrough", + " └─Limit 1.00 cop[tiflash] offset:0, count:1", + " └─HashJoin 12487.50 cop[tiflash] inner join, equal:[eq(test.t.id, test.t.id)]", + " ├─ExchangeReceiver(Build) 9990.00 cop[tiflash] ", + " │ └─ExchangeSender 9990.00 cop[tiflash] ExchangeType: Broadcast", + " │ └─Selection 9990.00 cop[tiflash] not(isnull(test.t.id))", + " │ └─TableFullScan 10000.00 cop[tiflash] table:t keep order:false, stats:pseudo", + " └─Selection(Probe) 9990.00 cop[tiflash] not(isnull(test.t.id))", + " └─TableFullScan 0.80 cop[tiflash] table:t1 keep order:false, stats:pseudo" + ] + }, + { + "SQL": "desc format = 'brief' select count(*) from (select t.id, t.value v1 from t join t t1 on t.id = t1.id limit 20) v group by v.v1", + "Plan": [ + "HashAgg 20.00 root group by:test.t.value, funcs:count(1)->Column#7", + "└─Limit 20.00 root offset:0, count:20", + " └─TableReader 20.00 root data:ExchangeSender", + " └─ExchangeSender 20.00 cop[tiflash] ExchangeType: PassThrough", + " └─Limit 20.00 cop[tiflash] offset:0, count:20", + " └─HashJoin 12487.50 cop[tiflash] inner join, equal:[eq(test.t.id, test.t.id)]", + " ├─ExchangeReceiver(Build) 9990.00 cop[tiflash] ", + " │ └─ExchangeSender 9990.00 cop[tiflash] ExchangeType: Broadcast", + " │ └─Selection 9990.00 cop[tiflash] not(isnull(test.t.id))", + " │ └─TableFullScan 10000.00 cop[tiflash] table:t keep order:false, stats:pseudo", + " └─Selection(Probe) 9990.00 cop[tiflash] not(isnull(test.t.id))", + " └─TableFullScan 16.02 cop[tiflash] table:t1 keep order:false, stats:pseudo" + ] } ] },