Skip to content

Commit

Permalink
opt,sql: use paired-joins with non-covering indexes for left joins
Browse files Browse the repository at this point in the history
This is done when the left outer/semi/anti join can use a
lookup join. Prior to this, when the non-covering index
could not fully evaluate the filter for left join we could
not generate a lookup join.

With this change:
- Left outer join becomes a pair of two left outer joins.
- Left semi join is a pair of inner join followed by left
  semi join.
- Left anti join is a pair of left outer join followed by
  left anti join.

Informs #55452

Release note (performance improvement): The optimizer can now
generate lookup joins in certain cases for non-covering
indexes, when performing a left outer/semi/anti join.
  • Loading branch information
sumeerbhola authored and rytaft committed Jan 31, 2022
1 parent f3abce6 commit 99bd67a
Show file tree
Hide file tree
Showing 16 changed files with 461 additions and 255 deletions.
22 changes: 13 additions & 9 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2228,14 +2228,18 @@ func (dsp *DistSQLPlanner) createPlanForLookupJoin(
}

joinReaderSpec := execinfrapb.JoinReaderSpec{
Table: *n.table.desc.TableDesc(),
Type: n.joinType,
LockingStrength: n.table.lockingStrength,
LockingWaitPolicy: n.table.lockingWaitPolicy,
MaintainOrdering: len(n.reqOrdering) > 0,
HasSystemColumns: n.table.containsSystemColumns,
LeftJoinWithPairedJoiner: n.isSecondJoinInPairedJoiner,
LookupBatchBytesLimit: dsp.distSQLSrv.TestingKnobs.JoinReaderBatchBytesLimit,
Table: *n.table.desc.TableDesc(),
Type: n.joinType,
LockingStrength: n.table.lockingStrength,
LockingWaitPolicy: n.table.lockingWaitPolicy,
// TODO(sumeer): specifying ordering here using isFirstJoinInPairedJoiner
// is late in the sense that the cost of this has not been taken into
// account. Make this decision earlier in CustomFuncs.GenerateLookupJoins.
MaintainOrdering: len(n.reqOrdering) > 0 || n.isFirstJoinInPairedJoiner,
HasSystemColumns: n.table.containsSystemColumns,
LeftJoinWithPairedJoiner: n.isSecondJoinInPairedJoiner,
OutputGroupContinuationForLeftRow: n.isFirstJoinInPairedJoiner,
LookupBatchBytesLimit: dsp.distSQLSrv.TestingKnobs.JoinReaderBatchBytesLimit,
}
joinReaderSpec.IndexIdx, err = getIndexIdx(n.table.index, n.table.desc)
if err != nil {
Expand All @@ -2251,7 +2255,7 @@ func (dsp *DistSQLPlanner) createPlanForLookupJoin(
joinReaderSpec.LookupColumnsAreKey = n.eqColsAreKey

numInputNodeCols, planToStreamColMap, post, types :=
mappingHelperForLookupJoins(plan, n.input, n.table, false /* addContinuationCol */)
mappingHelperForLookupJoins(plan, n.input, n.table, n.isFirstJoinInPairedJoiner)

// Set the lookup condition.
var indexVarMap []int
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,7 @@ func (e *distSQLSpecExecFactory) ConstructLookupJoin(
remoteLookupExpr tree.TypedExpr,
lookupCols exec.TableColumnOrdinalSet,
onCond tree.TypedExpr,
isFirstJoinInPairedJoiner bool,
isSecondJoinInPairedJoiner bool,
reqOrdering exec.OutputOrdering,
locking *tree.LockingItem,
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/execinfrapb/flow_diagram.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ func (jr *JoinReaderSpec) summary() (string, []string) {
if jr.LeftJoinWithPairedJoiner {
details = append(details, "second join in paired-join")
}
if jr.OutputGroupContinuationForLeftRow {
details = append(details, "first join in paired-join")
}
return "JoinReader", details
}

Expand Down
22 changes: 21 additions & 1 deletion pkg/sql/logictest/testdata/logic_test/lookup_join
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ SELECT small.c, large.c FROM small LEFT JOIN large ON small.c = large.b AND larg
27 NULL
30 NULL

## Left join with ON filter on non-covering index
## Left join with ON filter on non-covering index. Will execute as paired-joins.
query II rowsort
SELECT small.c, large.d FROM small LEFT JOIN large ON small.c = large.b AND large.d < 30
----
Expand All @@ -393,6 +393,26 @@ SELECT small.c, large.d FROM small LEFT JOIN large ON small.c = large.b AND larg
27 NULL
30 NULL

## Left semi join with ON filter on non-covering index. Will execute as paired-joins.
query I rowsort
SELECT small.c FROM small WHERE EXISTS(SELECT 1 FROM large WHERE small.c = large.b AND large.d < 30)
----
6
12

## Left anti join with ON filter on non-covering index. Will execute as paired-joins.
query I rowsort
SELECT small.c FROM small WHERE NOT EXISTS(SELECT 1 FROM large WHERE small.c = large.b AND large.d < 30)
----
3
9
15
18
21
24
27
30

###########################################################
# LOOKUP JOINS ON IMPLICIT INDEX KEY COLUMNS #
# https://github.com/cockroachdb/cockroach/issues/31777 #
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,17 @@ type lookupJoinNode struct {
remoteLookupExpr tree.TypedExpr

// columns are the produced columns, namely the input columns and (unless the
// join type is semi or anti join) the columns in the table scanNode.
// join type is semi or anti join) the columns in the table scanNode. It
// includes an additional continuation column when IsFirstJoinInPairedJoin
// is true.
columns colinfo.ResultColumns

// onCond is any ON condition to be used in conjunction with the implicit
// equality condition on eqCols or the conditions in lookupExpr.
onCond tree.TypedExpr

// At most one of is{First,Second}JoinInPairedJoiner can be true.
isFirstJoinInPairedJoiner bool
isSecondJoinInPairedJoiner bool

reqOrdering ReqOrdering
Expand Down
30 changes: 26 additions & 4 deletions pkg/sql/opt/exec/execbuilder/relational.go
Original file line number Diff line number Diff line change
Expand Up @@ -1752,19 +1752,40 @@ func (b *Builder) buildLookupJoin(join *memo.LookupJoinExpr) (execPlan, error) {

inputCols := join.Input.Relational().OutputCols
lookupCols := join.Cols.Difference(inputCols)
if join.IsFirstJoinInPairedJoiner {
lookupCols.Remove(join.ContinuationCol)
}

lookupOrdinals, lookupColMap := b.getColumns(lookupCols, join.Table)
allCols := joinOutputMap(input.outputCols, lookupColMap)

// allExprCols are the columns used in expressions evaluated by this join.
allExprCols := joinOutputMap(input.outputCols, lookupColMap)
allCols := allExprCols
if join.IsFirstJoinInPairedJoiner {
if join.JoinType != opt.InnerJoinOp && join.JoinType != opt.LeftJoinOp {
return execPlan{}, errors.AssertionFailedf(
"first join in paired joiner must be an inner or left join. found %s",
join.JoinType.String(),
)
}
// allCols needs to include the continuation column since it will be
// in the result output by this join.
allCols = allExprCols.Copy()
maxValue, ok := allCols.MaxValue()
if !ok {
return execPlan{}, errors.AssertionFailedf("allCols should not be empty")
}
// Assign the continuation column the next unused value in the map.
allCols.Set(int(join.ContinuationCol), maxValue+1)
}
res := execPlan{outputCols: allCols}
if join.JoinType == opt.SemiJoinOp || join.JoinType == opt.AntiJoinOp {
// For semi and anti join, only the left columns are output.
res.outputCols = input.outputCols
}

ctx := buildScalarCtx{
ivh: tree.MakeIndexedVarHelper(nil /* container */, allCols.Len()),
ivarMap: allCols,
ivh: tree.MakeIndexedVarHelper(nil /* container */, allExprCols.Len()),
ivarMap: allExprCols,
}
var lookupExpr, remoteLookupExpr tree.TypedExpr
if len(join.LookupExpr) > 0 {
Expand Down Expand Up @@ -1809,6 +1830,7 @@ func (b *Builder) buildLookupJoin(join *memo.LookupJoinExpr) (execPlan, error) {
remoteLookupExpr,
lookupOrdinals,
onExpr,
join.IsFirstJoinInPairedJoiner,
join.IsSecondJoinInPairedJoiner,
res.reqOrdering(join),
locking,
Expand Down
99 changes: 79 additions & 20 deletions pkg/sql/opt/exec/execbuilder/testdata/lookup_join
Original file line number Diff line number Diff line change
Expand Up @@ -706,8 +706,6 @@ vectorized: true
spans: FULL SCAN

# Left join with ON filter on non-covering index
# TODO(radu): this doesn't use lookup join yet, the current rules don't cover
# left join with ON condition on columns that are not covered by the index.
query T
EXPLAIN (VERBOSE) SELECT small.c, large.d FROM small LEFT JOIN large ON small.c = large.b AND large.d < 30
----
Expand All @@ -718,27 +716,88 @@ vectorized: true
│ columns: (c, d)
│ estimated row count: 336
└── • hash join (right outer)
│ columns: (b, d, c)
└── • project
│ columns: (c, b, d)
│ estimated row count: 336
│ equality: (b) = (c)
├── • filter
│ │ columns: (b, d)
│ │ estimated row count: 3,303
│ │ filter: d < 30
│ │
│ └── • scan
│ columns: (b, d)
│ estimated row count: 10,000 (100% of the table; stats collected <hidden> ago)
│ table: large@large_pkey
│ spans: FULL SCAN
└── • lookup join (left outer)
│ columns: (c, a, b, cont, d)
│ table: large@large_pkey
│ equality: (a, b) = (a,b)
│ equality cols are key
│ pred: d < 30
└── • lookup join (left outer)
│ columns: (c, a, b, cont)
│ estimated row count: 1,000
│ table: large@bc
│ equality: (c) = (b)
└── • scan
columns: (c)
estimated row count: 100 (100% of the table; stats collected <hidden> ago)
table: small@small_pkey
spans: FULL SCAN

# Left semi-join with ON filter on non-covering index
query T
EXPLAIN (VERBOSE) SELECT small.c FROM small WHERE EXISTS(SELECT 1 FROM large WHERE small.c = large.b AND large.d < 30)
----
distribution: full
vectorized: true
·
• project
│ columns: (c)
│ estimated row count: 100
└── • lookup join (semi)
│ columns: (c, a, b, cont)
│ table: large@large_pkey
│ equality: (a, b) = (a,b)
│ equality cols are key
│ pred: d < 30
└── • scan
columns: (c)
estimated row count: 100 (100% of the table; stats collected <hidden> ago)
table: small@small_pkey
spans: FULL SCAN
└── • lookup join (inner)
│ columns: (c, a, b, cont)
│ estimated row count: 990
│ table: large@bc
│ equality: (c) = (b)
└── • scan
columns: (c)
estimated row count: 100 (100% of the table; stats collected <hidden> ago)
table: small@small_pkey
spans: FULL SCAN

# Left anti-join with ON filter on non-covering index
query T
EXPLAIN (VERBOSE) SELECT small.c FROM small WHERE NOT EXISTS(SELECT 1 FROM large WHERE small.c = large.b AND large.d < 30)
----
distribution: full
vectorized: true
·
• project
│ columns: (c)
│ estimated row count: 0
└── • lookup join (anti)
│ columns: (c, a, b, cont)
│ table: large@large_pkey
│ equality: (a, b) = (a,b)
│ equality cols are key
│ pred: d < 30
└── • lookup join (left outer)
│ columns: (c, a, b, cont)
│ estimated row count: 1,000
│ table: large@bc
│ equality: (c) = (b)
└── • scan
columns: (c)
estimated row count: 100 (100% of the table; stats collected <hidden> ago)
table: small@small_pkey
spans: FULL SCAN

###########################################################
# LOOKUP JOINS ON IMPLICIT INDEX KEY COLUMNS #
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/opt/exec/explain/result_columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,12 @@ func getResultColumns(

case lookupJoinOp:
a := args.(*lookupJoinArgs)
return joinColumns(a.JoinType, inputs[0], tableColumns(a.Table, a.LookupCols)), nil
cols := joinColumns(a.JoinType, inputs[0], tableColumns(a.Table, a.LookupCols))
// The following matches the behavior of execFactory.ConstructLookupJoin.
if a.IsFirstJoinInPairedJoiner {
cols = append(cols, colinfo.ResultColumn{Name: "cont", Typ: types.Bool})
}
return cols, nil

case ordinalityOp:
return appendColumns(inputs[0], colinfo.ResultColumn{
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/opt/exec/factory.opt
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ define LookupJoin {
RemoteLookupExpr tree.TypedExpr
LookupCols exec.TableColumnOrdinalSet
OnCond tree.TypedExpr
IsFirstJoinInPairedJoiner bool
IsSecondJoinInPairedJoiner bool
ReqOrdering exec.OutputOrdering
Locking *tree.LockingItem
Expand Down
21 changes: 13 additions & 8 deletions pkg/sql/opt/memo/check_expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,14 +235,19 @@ func (m *Memo) CheckExpr(e opt.Expr) {
panic(errors.AssertionFailedf("lookup join with columns that are not required"))
}
if t.IsSecondJoinInPairedJoiner {
ij, ok := t.Input.(*InvertedJoinExpr)
if !ok {
panic(errors.AssertionFailedf(
"lookup paired-join is paired with %T instead of inverted join", t.Input))
}
if !ij.IsFirstJoinInPairedJoiner {
panic(errors.AssertionFailedf(
"lookup paired-join is paired with inverted join that thinks it is unpaired"))
switch firstJoin := t.Input.(type) {
case *InvertedJoinExpr:
if !firstJoin.IsFirstJoinInPairedJoiner {
panic(errors.AssertionFailedf(
"lookup paired-join is paired with inverted join that thinks it is unpaired"))
}
case *LookupJoinExpr:
if !firstJoin.IsFirstJoinInPairedJoiner {
panic(errors.AssertionFailedf(
"lookup paired-join is paired with lookup join that thinks it is unpaired"))
}
default:
panic(errors.AssertionFailedf("lookup paired-join is paired with %T", t.Input))
}
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/opt/ops/relational.opt
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,20 @@ define LookupJoinPrivate {
# table (and thus each left row matches with at most one table row).
LookupColsAreTableKey bool

# At most one of Is{First,Second}JoinInPairedJoiner can be true.
#
# IsFirstJoinInPairedJoiner is true if this is the first join of a
# paired-joiner used for left joins.
IsFirstJoinInPairedJoiner bool

# IsSecondJoinInPairedJoiner is true if this is the second join of a
# paired-joiner used for left joins.
IsSecondJoinInPairedJoiner bool

# ContinuationCol is the column ID of the continuation column when
# IsFirstJoinInPairedJoiner is true.
ContinuationCol ColumnID

# LocalityOptimized is true if this lookup join is part of a locality
# optimized search strategy. For semi, inner, and left joins, this means
# that RemoteLookupExpr will be non-nil. See comments above that field for
Expand Down
Loading

0 comments on commit 99bd67a

Please sign in to comment.