Skip to content

Commit

Permalink
opt: factor limit hints into scan and lookup join costs
Browse files Browse the repository at this point in the history
Fixes cockroachdb#34811; the example query in this issue now chooses a lookup join
as desired. The coster now takes limit hints into account when costing
scans and lookup joins, and propagates limit hints through lookup joins.

Release note (sql change): The optimizer now considers the likely number
of rows an operator will need to provide, and might choose query plans
based on this. In particular, the optimizer might prefer lookup joins
over alternatives in some situations where all rows of the join will
probably not be needed.
  • Loading branch information
savoie authored and rytaft committed Mar 2, 2020
1 parent 860c137 commit 9ac5dbc
Show file tree
Hide file tree
Showing 10 changed files with 156 additions and 120 deletions.
23 changes: 13 additions & 10 deletions pkg/sql/opt/exec/execbuilder/testdata/aggregate
Original file line number Diff line number Diff line change
Expand Up @@ -751,16 +751,19 @@ group · · (min int) ·
query TTTTT
EXPLAIN (TYPES) SELECT min(v) FROM opt_test WHERE k <> 4
----
· distributed false · ·
· vectorized true · ·
group · · (min int) ·
│ aggregate 0 min(v) · ·
│ scalar · · ·
└── render · · (v int) ·
│ render 0 (v)[int] · ·
└── scan · · (k int, v int) ·
· table opt_test@primary · ·
· spans -/3/# /5- · ·
· distributed false · ·
· vectorized true · ·
group · · (min int) ·
│ aggregate 0 any_not_null(v) · ·
│ scalar · · ·
└── render · · (v int) ·
│ render 0 (v)[int] · ·
└── limit · · (k int, v int) +v
│ count (1)[int] · ·
└── scan · · (k int, v int) +v
· table opt_test@v · ·
· spans /!NULL- · ·
· filter ((k)[int] != (4)[int])[bool] · ·

# Check that the optimization doesn't work when the argument is non-trivial (we
# can't in general guarantee an ordering on a synthesized column).
Expand Down
23 changes: 11 additions & 12 deletions pkg/sql/opt/exec/execbuilder/testdata/limit
Original file line number Diff line number Diff line change
Expand Up @@ -148,18 +148,17 @@ filter · · (k) ·
query TTTTT
EXPLAIN (VERBOSE) SELECT k, w FROM t WHERE v >= 1 AND v <= 100 LIMIT 10
----
· distributed false · ·
· vectorized true · ·
render · · (k, w) ·
│ render 0 k · ·
│ render 1 w · ·
└── index-join · · (k, v, w) ·
│ table t@primary · ·
│ key columns k · ·
└── scan · · (k, v) ·
· table t@t_v_idx · ·
· spans /1-/101 · ·
· limit 10 · ·
· distributed false · ·
· vectorized true · ·
render · · (k, w) ·
│ render 0 k · ·
│ render 1 w · ·
└── limit · · (k, v, w) ·
│ count 10 · ·
└── scan · · (k, v, w) ·
· table t@primary · ·
· spans ALL · ·
· filter (v >= 1) AND (v <= 100) · ·

query TTTTT
EXPLAIN (VERBOSE) SELECT k, w FROM t WHERE v >= 1 AND v <= 100 ORDER BY v LIMIT 10
Expand Down
58 changes: 30 additions & 28 deletions pkg/sql/opt/exec/execbuilder/testdata/subquery
Original file line number Diff line number Diff line change
Expand Up @@ -45,34 +45,36 @@ root · ·
query TTTTT
EXPLAIN (VERBOSE) SELECT * FROM abc WHERE a = (SELECT max(a) FROM abc WHERE EXISTS(SELECT * FROM abc WHERE c=a+3))
----
· distributed false · ·
· vectorized false · ·
root · · (a, b, c) ·
├── scan · · (a, b, c) ·
│ table abc@primary · ·
│ spans ALL · ·
│ filter a = @S2 · ·
├── subquery · · (a, b, c) ·
│ │ id @S1 · ·
│ │ original sql EXISTS (SELECT * FROM abc WHERE c = (a + 3)) · ·
│ │ exec mode exists · ·
│ └── limit · · (a, b, c) ·
│ │ count 1 · ·
│ └── scan · · (a, b, c) ·
│ table abc@primary · ·
│ spans ALL · ·
│ filter c = (a + 3) · ·
└── subquery · · (a, b, c) ·
│ id @S2 · ·
│ original sql (SELECT max(a) FROM abc WHERE EXISTS (SELECT * FROM abc WHERE c = (a + 3))) · ·
│ exec mode one row · ·
└── group · · (max) ·
│ aggregate 0 max(a) · ·
│ scalar · · ·
└── scan · · (a) ·
· table abc@primary · ·
· spans ALL · ·
· filter @S1 · ·
· distributed false · ·
· vectorized false · ·
root · · (a, b, c) ·
├── scan · · (a, b, c) ·
│ table abc@primary · ·
│ spans ALL · ·
│ filter a = @S2 · ·
├── subquery · · (a, b, c) ·
│ │ id @S1 · ·
│ │ original sql EXISTS (SELECT * FROM abc WHERE c = (a + 3)) · ·
│ │ exec mode exists · ·
│ └── limit · · (a, b, c) ·
│ │ count 1 · ·
│ └── scan · · (a, b, c) ·
│ table abc@primary · ·
│ spans ALL · ·
│ filter c = (a + 3) · ·
└── subquery · · (a, b, c) ·
│ id @S2 · ·
│ original sql (SELECT max(a) FROM abc WHERE EXISTS (SELECT * FROM abc WHERE c = (a + 3))) · ·
│ exec mode one row · ·
└── group · · (any_not_null) ·
│ aggregate 0 any_not_null(a) · ·
│ scalar · · ·
└── limit · · (a) -a
│ count 1 · ·
└── revscan · · (a) -a
· table abc@primary · ·
· spans ALL · ·
· filter @S1 · ·

# IN expression transformed into semi-join.
query TTTTT
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/opt/memo/testdata/memo
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ memo (optimized, ~18KB, required=[presentation: y:2,x:3,c:6] [ordering: +2])
├── G5: (const 10)
├── G6: (plus G11 G12)
├── G7: (project G13 G14 y)
│ ├── [ordering: +2]
│ ├── [ordering: +2] [limit hint: 10.00]
│ │ ├── best: (sort G7)
│ │ └── cost: 1119.26
│ ├── [ordering: +5]
Expand All @@ -169,7 +169,7 @@ memo (optimized, ~18KB, required=[presentation: y:2,x:3,c:6] [ordering: +2])
├── G11: (variable y)
├── G12: (const 1)
├── G13: (select G16 G17)
│ ├── [ordering: +2]
│ ├── [ordering: +2] [limit hint: 10.00]
│ │ ├── best: (sort G13)
│ │ └── cost: 1112.58
│ └── []
Expand All @@ -178,7 +178,7 @@ memo (optimized, ~18KB, required=[presentation: y:2,x:3,c:6] [ordering: +2])
├── G14: (projections G18)
├── G15: (eq G19 G20)
├── G16: (scan a)
│ ├── [ordering: +2]
│ ├── [ordering: +2] [limit hint: 30.00]
│ │ ├── best: (sort G16)
│ │ └── cost: 1259.35
│ └── []
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/opt/optgen/exprgen/testdata/limit
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ limit
├── internal-ordering: +1
├── cardinality: [0 - 10]
├── stats: [rows=10]
├── cost: 1050.13
├── cost: 21.13
├── prune: (2)
├── interesting orderings: (+1,+2)
├── scan t.public.abc@ab
│ ├── columns: t.public.abc.a:1(int) t.public.abc.b:2(int)
│ ├── stats: [rows=1000]
│ ├── cost: 1050.02
│ ├── cost: 21.02
│ ├── ordering: +1
│ ├── limit hint: 10.00
│ ├── prune: (1,2)
Expand Down
34 changes: 30 additions & 4 deletions pkg/sql/opt/xform/coster.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ const (
// justification for this constant.
lookupJoinRetrieveRowCost = 2 * seqIOCostFactor

// Input rows to a join are processed in batches of this size.
// See joinreader.go.
joinReaderBatchSize = 100.0

// In the case of a limit hint, a scan will read this multiple of the expected
// number of rows. See scanNode.limitHint.
scanSoftLimitMultiplier = 2.0

// latencyCostFactor represents the throughput impact of doing scans on an
// index that may be remotely located in a different locality. If latencies
// are higher, then overall cluster throughput will suffer somewhat, as there
Expand Down Expand Up @@ -174,7 +182,7 @@ func (c *coster) ComputeCost(candidate memo.RelExpr, required *physical.Required
cost = c.computeIndexJoinCost(candidate.(*memo.IndexJoinExpr))

case opt.LookupJoinOp:
cost = c.computeLookupJoinCost(candidate.(*memo.LookupJoinExpr))
cost = c.computeLookupJoinCost(candidate.(*memo.LookupJoinExpr), required)

case opt.ZigzagJoinOp:
cost = c.computeZigzagJoinCost(candidate.(*memo.ZigzagJoinExpr))
Expand Down Expand Up @@ -281,6 +289,10 @@ func (c *coster) computeScanCost(scan *memo.ScanExpr, required *physical.Require
rowCount := scan.Relational().Stats.RowCount
perRowCost := c.rowScanCost(scan.Table, scan.Index, scan.Cols.Len())

if required.LimitHint != 0 {
rowCount = math.Min(rowCount, required.LimitHint*scanSoftLimitMultiplier)
}

if ordering.ScanIsReverse(scan, &required.Ordering) {
if rowCount > 1 {
// Need to do binary search to seek to the previous row.
Expand Down Expand Up @@ -394,8 +406,22 @@ func (c *coster) computeIndexJoinCost(join *memo.IndexJoinExpr) memo.Cost {
return memo.Cost(leftRowCount) * perRowCost
}

func (c *coster) computeLookupJoinCost(join *memo.LookupJoinExpr) memo.Cost {
leftRowCount := join.Input.Relational().Stats.RowCount
func (c *coster) computeLookupJoinCost(
join *memo.LookupJoinExpr, required *physical.Required,
) memo.Cost {
lookupCount := join.Input.Relational().Stats.RowCount

// Lookup joins can return early if enough rows have been found. An otherwise
// expensive lookup join might have a lower cost if its limit hint estimates
// that most rows will not be needed.
if required.LimitHint != 0 {
// Estimate the number of lookups needed to output LimitHint rows.
expectedLookupCount := required.LimitHint * lookupCount / join.Relational().Stats.RowCount

// Round up to the nearest multiple of a batch.
expectedLookupCount = math.Ceil(expectedLookupCount/joinReaderBatchSize) * joinReaderBatchSize
lookupCount = math.Min(lookupCount, expectedLookupCount)
}

// The rows in the (left) input are used to probe into the (right) table.
// Since the matching rows in the table may not all be in the same range, this
Expand All @@ -409,7 +435,7 @@ func (c *coster) computeLookupJoinCost(join *memo.LookupJoinExpr) memo.Cost {
// slower.
perLookupCost *= 5
}
cost := memo.Cost(leftRowCount) * perLookupCost
cost := memo.Cost(lookupCount) * perLookupCost

// Each lookup might retrieve many rows; add the IO cost of retrieving the
// rows (relevant when we expect many resulting rows per lookup) and the CPU
Expand Down
9 changes: 8 additions & 1 deletion pkg/sql/opt/xform/physical_props.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func BuildChildPhysicalProps(
}

case opt.IndexJoinOp:
// For an index join, every input row results in exactly one output row.
childProps.LimitHint = parentProps.LimitHint

case opt.ExceptOp, opt.ExceptAllOp, opt.IntersectOp, opt.IntersectAllOp,
Expand All @@ -113,15 +114,21 @@ func BuildChildPhysicalProps(
childProps.LimitHint = distinctOnLimitHint(distinctCount, parentProps.LimitHint)
}

case opt.SelectOp:
case opt.SelectOp, opt.LookupJoinOp:
// These operations are assumed to produce a constant number of output rows
// for each input row, independent of already-processed rows.
outputRows := parent.(memo.RelExpr).Relational().Stats.RowCount
if outputRows == 0 || outputRows < parentProps.LimitHint {
break
}
if input, ok := parent.Child(nth).(memo.RelExpr); ok {
inputRows := input.Relational().Stats.RowCount
// outputRows / inputRows is roughly the number of output rows produced
// for each input row. Reduce the number of required input rows so that
// the expected number of output rows is equal to the parent limit hint.
childProps.LimitHint = parentProps.LimitHint * inputRows / outputRows
}

case opt.OrdinalityOp, opt.ProjectOp, opt.ProjectSetOp:
childProps.LimitHint = parentProps.LimitHint
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/sql/opt/xform/testdata/rules/groupby
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ scalar-group-by
# This is because we know nothing about the ordering of y
# on the index xy after a scan on xy with x>7.
opt
SELECT min(y) FROM xyz WHERE x>7
SELECT min(y) FROM xyz@xy WHERE x>7
----
scalar-group-by
├── columns: min:4
Expand All @@ -161,6 +161,7 @@ scalar-group-by
├── scan xyz@xy
│ ├── columns: x:1!null y:2
│ ├── constraint: /1/2: [/8 - ]
│ ├── flags: force-index=xy
│ ├── key: (1)
│ └── fd: (1)-->(2)
└── aggregations
Expand All @@ -171,7 +172,7 @@ scalar-group-by
# This is because we know nothing about the ordering of y
# on the index xy after a scan on xy with x>7
opt
SELECT max(y) FROM xyz WHERE x>7
SELECT max(y) FROM xyz@xy WHERE x>7
----
scalar-group-by
├── columns: max:4
Expand All @@ -181,6 +182,7 @@ scalar-group-by
├── scan xyz@xy
│ ├── columns: x:1!null y:2
│ ├── constraint: /1/2: [/8 - ]
│ ├── flags: force-index=xy
│ ├── key: (1)
│ └── fd: (1)-->(2)
└── aggregations
Expand Down Expand Up @@ -501,7 +503,7 @@ memo (optimized, ~5KB, required=[presentation: min:5])
├── G2: (scan abc,cols=(1))
│ ├── [ordering: +1] [limit hint: 1.00]
│ │ ├── best: (scan abc,cols=(1))
│ │ └── cost: 1050.02
│ │ └── cost: 2.12
│ └── []
│ ├── best: (scan abc,cols=(1))
│ └── cost: 1050.02
Expand Down Expand Up @@ -563,7 +565,7 @@ memo (optimized, ~5KB, required=[presentation: max:5])
├── G2: (scan abc,cols=(1))
│ ├── [ordering: -1] [limit hint: 1.00]
│ │ ├── best: (scan abc,rev,cols=(1))
│ │ └── cost: 1149.68
│ │ └── cost: 2.14
│ └── []
│ ├── best: (scan abc,cols=(1))
│ └── cost: 1050.02
Expand Down
Loading

0 comments on commit 9ac5dbc

Please sign in to comment.