Skip to content

Commit

Permalink
opt,sql: use paired-joins with non-covering indexes for left joins
Browse files Browse the repository at this point in the history
This is done when the left outer/semi/anti join can use a
lookup join. Prior to this, when the non-covering index
could not fully evaluate the filter for left join we could
not generate a lookup join.

With this change:
- Left outer join becomes a pair of two left outer joins.
- Left semi join is a pair of inner join followed by left
  semi join.
- Left anti join is a pair of left outer join followed by
  left anti join.

Informs #55452

Release note (performance improvement): The optimizer can now
generate lookup joins in certain cases for non-covering
indexes, when performing a left outer/semi/anti join.
  • Loading branch information
sumeerbhola committed Jan 5, 2021
1 parent d2740b1 commit e9cf610
Show file tree
Hide file tree
Showing 16 changed files with 519 additions and 298 deletions.
22 changes: 13 additions & 9 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2049,14 +2049,18 @@ func (dsp *DistSQLPlanner) createPlanForLookupJoin(
}

joinReaderSpec := execinfrapb.JoinReaderSpec{
Table: *n.table.desc.TableDesc(),
Type: n.joinType,
Visibility: n.table.colCfg.visibility,
LockingStrength: n.table.lockingStrength,
LockingWaitPolicy: n.table.lockingWaitPolicy,
MaintainOrdering: len(n.reqOrdering) > 0,
HasSystemColumns: n.table.containsSystemColumns,
LeftJoinWithPairedJoiner: n.isSecondJoinInPairedJoiner,
Table: *n.table.desc.TableDesc(),
Type: n.joinType,
Visibility: n.table.colCfg.visibility,
LockingStrength: n.table.lockingStrength,
LockingWaitPolicy: n.table.lockingWaitPolicy,
// TODO(sumeer): specifying ordering here using isFirstJoinInPairedJoiner
// is late in the sense that the cost of this has not been taken into
// account. Make this decision earlier in CustomFuncs.GenerateLookupJoins.
MaintainOrdering: len(n.reqOrdering) > 0 || n.isFirstJoinInPairedJoiner,
HasSystemColumns: n.table.containsSystemColumns,
LeftJoinWithPairedJoiner: n.isSecondJoinInPairedJoiner,
OutputGroupContinuationForLeftRow: n.isFirstJoinInPairedJoiner,
}
joinReaderSpec.IndexIdx, err = getIndexIdx(n.table.index, n.table.desc)
if err != nil {
Expand All @@ -2072,7 +2076,7 @@ func (dsp *DistSQLPlanner) createPlanForLookupJoin(
joinReaderSpec.LookupColumnsAreKey = n.eqColsAreKey

numInputNodeCols, planToStreamColMap, post, types :=
mappingHelperForLookupJoins(plan, n.input, n.table, false /* addContinuationCol */)
mappingHelperForLookupJoins(plan, n.input, n.table, n.isFirstJoinInPairedJoiner)

// Set the ON condition.
if n.onCond != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,7 @@ func (e *distSQLSpecExecFactory) ConstructLookupJoin(
eqColsAreKey bool,
lookupCols exec.TableColumnOrdinalSet,
onCond tree.TypedExpr,
isFirstJoinInPairedJoiner bool,
isSecondJoinInPairedJoiner bool,
reqOrdering exec.OutputOrdering,
locking *tree.LockingItem,
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/execinfrapb/flow_diagram.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ func (jr *JoinReaderSpec) summary() (string, []string) {
if jr.LeftJoinWithPairedJoiner {
details = append(details, "second join in paired-join")
}
if jr.OutputGroupContinuationForLeftRow {
details = append(details, "first join in paired-join")
}
return "JoinReader", details
}

Expand Down
22 changes: 21 additions & 1 deletion pkg/sql/logictest/testdata/logic_test/lookup_join
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ SELECT small.c, large.c FROM small LEFT JOIN large ON small.c = large.b AND larg
27 NULL
30 NULL

## Left join with ON filter on non-covering index
## Left join with ON filter on non-covering index. Will execute as paired-joins.
query II rowsort
SELECT small.c, large.d FROM small LEFT JOIN large ON small.c = large.b AND large.d < 30
----
Expand All @@ -393,6 +393,26 @@ SELECT small.c, large.d FROM small LEFT JOIN large ON small.c = large.b AND larg
27 NULL
30 NULL

## Left semi join with ON filter on non-covering index. Will execute as paired-joins.
query I rowsort
SELECT small.c FROM small WHERE EXISTS(SELECT 1 FROM large WHERE small.c = large.b AND large.d < 30)
----
6
12

## Left anti join with ON filter on non-covering index. Will execute as paired-joins.
query I rowsort
SELECT small.c FROM small WHERE NOT EXISTS(SELECT 1 FROM large WHERE small.c = large.b AND large.d < 30)
----
3
9
15
18
21
24
27
30

# Lookup joins against interleaved tables. Regression test for #28981.
# This is now tested more thoroughly by joinreader_test.go.

Expand Down
9 changes: 8 additions & 1 deletion pkg/sql/lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,20 @@ type lookupJoinNode struct {
eqColsAreKey bool

// columns are the produced columns, namely the input columns and (unless the
// join type is semi or anti join) the columns in the table scanNode.
// join type is semi or anti join) the columns in the table scanNode. It
// includes an additional continuation column when IsFirstJoinInPairedJoin
// is true.
columns colinfo.ResultColumns

// onCond is any ON condition to be used in conjunction with the implicit
// equality condition on eqCols.
onCond tree.TypedExpr

// At most one of is{First,Second}JoinInPairedJoiner can be true.
// IsFirstJoinInPairedJoiner can be true only if reqOrdering asks the join
// to preserve ordering (currently a non-empty reqOrdering is interpreted
// as a bool to preserve the row ordering of the input).
isFirstJoinInPairedJoiner bool
isSecondJoinInPairedJoiner bool

reqOrdering ReqOrdering
Expand Down
24 changes: 20 additions & 4 deletions pkg/sql/opt/exec/execbuilder/relational.go
Original file line number Diff line number Diff line change
Expand Up @@ -1457,19 +1457,34 @@ func (b *Builder) buildLookupJoin(join *memo.LookupJoinExpr) (execPlan, error) {

inputCols := join.Input.Relational().OutputCols
lookupCols := join.Cols.Difference(inputCols)
if join.IsFirstJoinInPairedJoiner {
lookupCols.Remove(join.ContinuationCol)
}

lookupOrdinals, lookupColMap := b.getColumns(lookupCols, join.Table)
allCols := joinOutputMap(input.outputCols, lookupColMap)

// allExprCols are the columns used in expressions evaluated by this join.
allExprCols := joinOutputMap(input.outputCols, lookupColMap)
allCols := allExprCols
if join.IsFirstJoinInPairedJoiner {
// allCols needs to include the continuation column since it will be
// in the result output by this join.
allCols = allExprCols.Copy()
maxValue, ok := allCols.MaxValue()
if !ok {
return execPlan{}, errors.AssertionFailedf("allCols should not be empty")
}
// Assign the continuation column the next unused value in the map.
allCols.Set(int(join.ContinuationCol), maxValue+1)
}
res := execPlan{outputCols: allCols}
if join.JoinType == opt.SemiJoinOp || join.JoinType == opt.AntiJoinOp {
// For semi and anti join, only the left columns are output.
res.outputCols = input.outputCols
}

ctx := buildScalarCtx{
ivh: tree.MakeIndexedVarHelper(nil /* container */, allCols.Len()),
ivarMap: allCols,
ivh: tree.MakeIndexedVarHelper(nil /* container */, allExprCols.Len()),
ivarMap: allExprCols,
}
var onExpr tree.TypedExpr
if len(join.On) > 0 {
Expand Down Expand Up @@ -1497,6 +1512,7 @@ func (b *Builder) buildLookupJoin(join *memo.LookupJoinExpr) (execPlan, error) {
join.LookupColsAreTableKey,
lookupOrdinals,
onExpr,
join.IsFirstJoinInPairedJoiner,
join.IsSecondJoinInPairedJoiner,
res.reqOrdering(join),
locking,
Expand Down
111 changes: 91 additions & 20 deletions pkg/sql/opt/exec/execbuilder/testdata/lookup_join
Original file line number Diff line number Diff line change
Expand Up @@ -700,8 +700,6 @@ vectorized: true
spans: FULL SCAN

# Left join with ON filter on non-covering index
# TODO(radu): this doesn't use lookup join yet, the current rules don't cover
# left join with ON condition on columns that are not covered by the index.
query T
EXPLAIN (VERBOSE) SELECT small.c, large.d FROM small LEFT JOIN large ON small.c = large.b AND large.d < 30
----
Expand All @@ -712,27 +710,100 @@ vectorized: true
│ columns: (c, d)
│ estimated row count: 336
└── • hash join (right outer)
│ columns: (b, d, c)
└── • project
│ columns: (c, b, d)
│ estimated row count: 336
│ equality: (b) = (c)
├── • filter
│ │ columns: (b, d)
│ │ estimated row count: 3,303
│ │ filter: d < 30
│ │
│ └── • scan
│ columns: (b, d)
│ estimated row count: 10,000
│ table: large@primary
│ spans: FULL SCAN
└── • lookup join (left outer)
│ columns: (c, a, b, cont, d)
│ table: large@primary
│ equality: (a, b) = (a,b)
│ equality cols are key
│ pred: d < 30
└── • lookup join (left outer)
│ columns: (c, a, b, cont)
│ estimated row count: 1,000
│ table: large@bc
│ equality: (c) = (b)
└── • scan
columns: (c)
estimated row count: 100
table: small@primary
spans: FULL SCAN

# Left semi-join with ON filter on non-covering index
query T
EXPLAIN (VERBOSE) SELECT small.c FROM small WHERE EXISTS(SELECT 1 FROM large WHERE small.c = large.b AND large.d < 30)
----
distribution: full
vectorized: true
·
• project
│ columns: (c)
│ estimated row count: 100
└── • project
│ columns: (c, b)
│ estimated row count: 336
└── • scan
columns: (c)
estimated row count: 100
table: small@primary
spans: FULL SCAN
└── • lookup join (semi)
│ columns: (c, a, b, cont)
│ table: large@primary
│ equality: (a, b) = (a,b)
│ equality cols are key
│ pred: d < 30
└── • lookup join (inner)
│ columns: (c, a, b, cont)
│ estimated row count: 990
│ table: large@bc
│ equality: (c) = (b)
└── • scan
columns: (c)
estimated row count: 100
table: small@primary
spans: FULL SCAN

# Left anti-join with ON filter on non-covering index
query T
EXPLAIN (VERBOSE) SELECT small.c FROM small WHERE NOT EXISTS(SELECT 1 FROM large WHERE small.c = large.b AND large.d < small.a)
----
distribution: full
vectorized: true
·
• project
│ columns: (c)
│ estimated row count: 67
└── • project
│ columns: (a, c)
│ estimated row count: 67
└── • project
│ columns: (a, c, b)
│ estimated row count: 667
└── • lookup join (anti)
│ columns: (a, c, a, b, cont)
│ table: large@primary
│ equality: (a, b) = (a,b)
│ equality cols are key
│ pred: d < a
└── • lookup join (left outer)
│ columns: (a, c, a, b, cont)
│ estimated row count: 1,000
│ table: large@bc
│ equality: (c) = (b)
└── • scan
columns: (a, c)
estimated row count: 100
table: small@primary
spans: FULL SCAN

###########################################################
# LOOKUP JOINS ON IMPLICIT INDEX KEY COLUMNS #
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/opt/exec/explain/result_columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,12 @@ func getResultColumns(

case lookupJoinOp:
a := args.(*lookupJoinArgs)
return joinColumns(a.JoinType, inputs[0], tableColumns(a.Table, a.LookupCols)), nil
cols := joinColumns(a.JoinType, inputs[0], tableColumns(a.Table, a.LookupCols))
// The following matches the behavior of execFactory.ConstructLookupJoin.
if a.IsFirstJoinInPairedJoiner {
cols = append(cols, colinfo.ResultColumn{Name: "cont", Typ: types.Bool})
}
return cols, nil

case ordinalityOp:
return appendColumns(inputs[0], colinfo.ResultColumn{
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/opt/exec/factory.opt
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ define LookupJoin {
EqColsAreKey bool
LookupCols exec.TableColumnOrdinalSet
OnCond tree.TypedExpr
IsFirstJoinInPairedJoiner bool
IsSecondJoinInPairedJoiner bool
ReqOrdering exec.OutputOrdering
Locking *tree.LockingItem
Expand Down
21 changes: 13 additions & 8 deletions pkg/sql/opt/memo/check_expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,19 @@ func (m *Memo) CheckExpr(e opt.Expr) {
panic(errors.AssertionFailedf("lookup join with columns that are not required"))
}
if t.IsSecondJoinInPairedJoiner {
ij, ok := t.Input.(*InvertedJoinExpr)
if !ok {
panic(errors.AssertionFailedf(
"lookup paired-join is paired with %T instead of inverted join", t.Input))
}
if !ij.IsFirstJoinInPairedJoiner {
panic(errors.AssertionFailedf(
"lookup paired-join is paired with inverted join that thinks it is unpaired"))
switch firstJoin := t.Input.(type) {
case *InvertedJoinExpr:
if !firstJoin.IsFirstJoinInPairedJoiner {
panic(errors.AssertionFailedf(
"lookup paired-join is paired with inverted join that thinks it is unpaired"))
}
case *LookupJoinExpr:
if !firstJoin.IsFirstJoinInPairedJoiner {
panic(errors.AssertionFailedf(
"lookup paired-join is paired with lookup join that thinks it is unpaired"))
}
default:
panic(errors.AssertionFailedf("lookup paired-join is paired with %T", t.Input))
}
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/opt/ops/relational.opt
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,20 @@ define LookupJoinPrivate {
# table (and thus each left row matches with at most one table row).
LookupColsAreTableKey bool

# At most one of Is{First,Second}JoinInPairedJoiner can be true.
#
# IsFirstJoinInPairedJoiner is true if this is the first join of a
# paired-joiner used for left joins.
IsFirstJoinInPairedJoiner bool

# IsSecondJoinInPairedJoiner is true if this is the second join of a
# paired-joiner used for left joins.
IsSecondJoinInPairedJoiner bool

# ContinuationCol is the column ID of the continuation column when
# IsFirstJoinInPairedJoiner is true.
ContinuationCol ColumnID

# ConstFilters contains the constant filters that are represented as equality
# conditions on the KeyCols. These filters are needed by the statistics code to
# correctly estimate selectivity.
Expand Down
Loading

0 comments on commit e9cf610

Please sign in to comment.