Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt,sql: use paired-joins with non-covering indexes for left joins #58261

Merged
merged 1 commit into from
Feb 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2228,14 +2228,18 @@ func (dsp *DistSQLPlanner) createPlanForLookupJoin(
}

joinReaderSpec := execinfrapb.JoinReaderSpec{
Table: *n.table.desc.TableDesc(),
Type: n.joinType,
LockingStrength: n.table.lockingStrength,
LockingWaitPolicy: n.table.lockingWaitPolicy,
MaintainOrdering: len(n.reqOrdering) > 0,
HasSystemColumns: n.table.containsSystemColumns,
LeftJoinWithPairedJoiner: n.isSecondJoinInPairedJoiner,
LookupBatchBytesLimit: dsp.distSQLSrv.TestingKnobs.JoinReaderBatchBytesLimit,
Table: *n.table.desc.TableDesc(),
Type: n.joinType,
LockingStrength: n.table.lockingStrength,
LockingWaitPolicy: n.table.lockingWaitPolicy,
// TODO(sumeer): specifying ordering here using isFirstJoinInPairedJoiner
// is late in the sense that the cost of this has not been taken into
// account. Make this decision earlier in CustomFuncs.GenerateLookupJoins.
MaintainOrdering: len(n.reqOrdering) > 0 || n.isFirstJoinInPairedJoiner,
HasSystemColumns: n.table.containsSystemColumns,
LeftJoinWithPairedJoiner: n.isSecondJoinInPairedJoiner,
OutputGroupContinuationForLeftRow: n.isFirstJoinInPairedJoiner,
LookupBatchBytesLimit: dsp.distSQLSrv.TestingKnobs.JoinReaderBatchBytesLimit,
}
joinReaderSpec.IndexIdx, err = getIndexIdx(n.table.index, n.table.desc)
if err != nil {
Expand All @@ -2251,7 +2255,7 @@ func (dsp *DistSQLPlanner) createPlanForLookupJoin(
joinReaderSpec.LookupColumnsAreKey = n.eqColsAreKey

numInputNodeCols, planToStreamColMap, post, types :=
mappingHelperForLookupJoins(plan, n.input, n.table, false /* addContinuationCol */)
mappingHelperForLookupJoins(plan, n.input, n.table, n.isFirstJoinInPairedJoiner)

// Set the lookup condition.
var indexVarMap []int
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,7 @@ func (e *distSQLSpecExecFactory) ConstructLookupJoin(
remoteLookupExpr tree.TypedExpr,
lookupCols exec.TableColumnOrdinalSet,
onCond tree.TypedExpr,
isFirstJoinInPairedJoiner bool,
isSecondJoinInPairedJoiner bool,
reqOrdering exec.OutputOrdering,
locking *tree.LockingItem,
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/execinfrapb/flow_diagram.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ func (jr *JoinReaderSpec) summary() (string, []string) {
if jr.LeftJoinWithPairedJoiner {
details = append(details, "second join in paired-join")
}
if jr.OutputGroupContinuationForLeftRow {
details = append(details, "first join in paired-join")
}
return "JoinReader", details
}

Expand Down
22 changes: 21 additions & 1 deletion pkg/sql/logictest/testdata/logic_test/lookup_join
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ SELECT small.c, large.c FROM small LEFT JOIN large ON small.c = large.b AND larg
27 NULL
30 NULL

## Left join with ON filter on non-covering index
## Left join with ON filter on non-covering index. Will execute as paired-joins.
query II rowsort
SELECT small.c, large.d FROM small LEFT JOIN large ON small.c = large.b AND large.d < 30
----
Expand All @@ -393,6 +393,26 @@ SELECT small.c, large.d FROM small LEFT JOIN large ON small.c = large.b AND larg
27 NULL
30 NULL

## Left semi join with ON filter on non-covering index. Will execute as paired-joins.
query I rowsort
SELECT small.c FROM small WHERE EXISTS(SELECT 1 FROM large WHERE small.c = large.b AND large.d < 30)
----
6
12

## Left anti join with ON filter on non-covering index. Will execute as paired-joins.
query I rowsort
SELECT small.c FROM small WHERE NOT EXISTS(SELECT 1 FROM large WHERE small.c = large.b AND large.d < 30)
----
3
9
15
18
21
24
27
30

###########################################################
# LOOKUP JOINS ON IMPLICIT INDEX KEY COLUMNS #
# https://github.com/cockroachdb/cockroach/issues/31777 #
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,17 @@ type lookupJoinNode struct {
remoteLookupExpr tree.TypedExpr

// columns are the produced columns, namely the input columns and (unless the
// join type is semi or anti join) the columns in the table scanNode.
// join type is semi or anti join) the columns in the table scanNode. It
// includes an additional continuation column when IsFirstJoinInPairedJoin
// is true.
columns colinfo.ResultColumns

// onCond is any ON condition to be used in conjunction with the implicit
// equality condition on eqCols or the conditions in lookupExpr.
onCond tree.TypedExpr

// At most one of is{First,Second}JoinInPairedJoiner can be true.
isFirstJoinInPairedJoiner bool
isSecondJoinInPairedJoiner bool

reqOrdering ReqOrdering
Expand Down
24 changes: 20 additions & 4 deletions pkg/sql/opt/exec/execbuilder/relational.go
Original file line number Diff line number Diff line change
Expand Up @@ -1752,19 +1752,34 @@ func (b *Builder) buildLookupJoin(join *memo.LookupJoinExpr) (execPlan, error) {

inputCols := join.Input.Relational().OutputCols
lookupCols := join.Cols.Difference(inputCols)
if join.IsFirstJoinInPairedJoiner {
lookupCols.Remove(join.ContinuationCol)
}

lookupOrdinals, lookupColMap := b.getColumns(lookupCols, join.Table)
allCols := joinOutputMap(input.outputCols, lookupColMap)

// allExprCols are the columns used in expressions evaluated by this join.
allExprCols := joinOutputMap(input.outputCols, lookupColMap)
allCols := allExprCols
if join.IsFirstJoinInPairedJoiner {
// allCols needs to include the continuation column since it will be
// in the result output by this join.
allCols = allExprCols.Copy()
maxValue, ok := allCols.MaxValue()
if !ok {
return execPlan{}, errors.AssertionFailedf("allCols should not be empty")
}
// Assign the continuation column the next unused value in the map.
allCols.Set(int(join.ContinuationCol), maxValue+1)
}
res := execPlan{outputCols: allCols}
if join.JoinType == opt.SemiJoinOp || join.JoinType == opt.AntiJoinOp {
// For semi and anti join, only the left columns are output.
res.outputCols = input.outputCols
}

ctx := buildScalarCtx{
ivh: tree.MakeIndexedVarHelper(nil /* container */, allCols.Len()),
ivarMap: allCols,
ivh: tree.MakeIndexedVarHelper(nil /* container */, allExprCols.Len()),
ivarMap: allExprCols,
}
var lookupExpr, remoteLookupExpr tree.TypedExpr
if len(join.LookupExpr) > 0 {
Expand Down Expand Up @@ -1809,6 +1824,7 @@ func (b *Builder) buildLookupJoin(join *memo.LookupJoinExpr) (execPlan, error) {
remoteLookupExpr,
lookupOrdinals,
onExpr,
join.IsFirstJoinInPairedJoiner,
join.IsSecondJoinInPairedJoiner,
res.reqOrdering(join),
locking,
Expand Down
99 changes: 79 additions & 20 deletions pkg/sql/opt/exec/execbuilder/testdata/lookup_join
Original file line number Diff line number Diff line change
Expand Up @@ -706,8 +706,6 @@ vectorized: true
spans: FULL SCAN

# Left join with ON filter on non-covering index
# TODO(radu): this doesn't use lookup join yet, the current rules don't cover
# left join with ON condition on columns that are not covered by the index.
query T
EXPLAIN (VERBOSE) SELECT small.c, large.d FROM small LEFT JOIN large ON small.c = large.b AND large.d < 30
----
Expand All @@ -718,27 +716,88 @@ vectorized: true
│ columns: (c, d)
│ estimated row count: 336
└── • hash join (right outer)
│ columns: (b, d, c)
└── • project
│ columns: (c, b, d)
│ estimated row count: 336
│ equality: (b) = (c)
├── • filter
│ │ columns: (b, d)
│ │ estimated row count: 3,303
│ │ filter: d < 30
│ │
│ └── • scan
│ columns: (b, d)
│ estimated row count: 10,000 (100% of the table; stats collected <hidden> ago)
│ table: large@large_pkey
│ spans: FULL SCAN
└── • lookup join (left outer)
│ columns: (c, a, b, cont, d)
│ table: large@large_pkey
│ equality: (a, b) = (a,b)
│ equality cols are key
│ pred: d < 30
└── • lookup join (left outer)
│ columns: (c, a, b, cont)
│ estimated row count: 1,000
│ table: large@bc
│ equality: (c) = (b)
└── • scan
columns: (c)
estimated row count: 100 (100% of the table; stats collected <hidden> ago)
table: small@small_pkey
spans: FULL SCAN

# Left semi-join with ON filter on non-covering index
query T
EXPLAIN (VERBOSE) SELECT small.c FROM small WHERE EXISTS(SELECT 1 FROM large WHERE small.c = large.b AND large.d < 30)
----
distribution: full
vectorized: true
·
• project
│ columns: (c)
│ estimated row count: 100
└── • lookup join (semi)
│ columns: (c, a, b, cont)
│ table: large@large_pkey
│ equality: (a, b) = (a,b)
│ equality cols are key
│ pred: d < 30
└── • scan
columns: (c)
estimated row count: 100 (100% of the table; stats collected <hidden> ago)
table: small@small_pkey
spans: FULL SCAN
└── • lookup join (inner)
│ columns: (c, a, b, cont)
│ estimated row count: 990
│ table: large@bc
│ equality: (c) = (b)
└── • scan
columns: (c)
estimated row count: 100 (100% of the table; stats collected <hidden> ago)
table: small@small_pkey
spans: FULL SCAN

# Left anti-join with ON filter on non-covering index
query T
EXPLAIN (VERBOSE) SELECT small.c FROM small WHERE NOT EXISTS(SELECT 1 FROM large WHERE small.c = large.b AND large.d < 30)
----
distribution: full
vectorized: true
·
• project
│ columns: (c)
│ estimated row count: 0
└── • lookup join (anti)
│ columns: (c, a, b, cont)
│ table: large@large_pkey
│ equality: (a, b) = (a,b)
│ equality cols are key
│ pred: d < 30
└── • lookup join (left outer)
│ columns: (c, a, b, cont)
│ estimated row count: 1,000
│ table: large@bc
│ equality: (c) = (b)
└── • scan
columns: (c)
estimated row count: 100 (100% of the table; stats collected <hidden> ago)
table: small@small_pkey
spans: FULL SCAN

###########################################################
# LOOKUP JOINS ON IMPLICIT INDEX KEY COLUMNS #
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/opt/exec/explain/result_columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,12 @@ func getResultColumns(

case lookupJoinOp:
a := args.(*lookupJoinArgs)
return joinColumns(a.JoinType, inputs[0], tableColumns(a.Table, a.LookupCols)), nil
cols := joinColumns(a.JoinType, inputs[0], tableColumns(a.Table, a.LookupCols))
// The following matches the behavior of execFactory.ConstructLookupJoin.
if a.IsFirstJoinInPairedJoiner {
cols = append(cols, colinfo.ResultColumn{Name: "cont", Typ: types.Bool})
}
return cols, nil

case ordinalityOp:
return appendColumns(inputs[0], colinfo.ResultColumn{
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/opt/exec/factory.opt
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ define LookupJoin {
RemoteLookupExpr tree.TypedExpr
LookupCols exec.TableColumnOrdinalSet
OnCond tree.TypedExpr
IsFirstJoinInPairedJoiner bool
IsSecondJoinInPairedJoiner bool
ReqOrdering exec.OutputOrdering
Locking *tree.LockingItem
Expand Down
32 changes: 25 additions & 7 deletions pkg/sql/opt/memo/check_expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,15 +234,33 @@ func (m *Memo) CheckExpr(e opt.Expr) {
if !t.Cols.SubsetOf(requiredCols) {
panic(errors.AssertionFailedf("lookup join with columns that are not required"))
}
if t.IsSecondJoinInPairedJoiner {
ij, ok := t.Input.(*InvertedJoinExpr)
if !ok {
if t.IsFirstJoinInPairedJoiner {
switch t.JoinType {
case opt.InnerJoinOp, opt.LeftJoinOp:
default:
panic(errors.AssertionFailedf(
"lookup paired-join is paired with %T instead of inverted join", t.Input))
"first join in paired joiner must be an inner or left join. found %s",
t.JoinType.String(),
))
}
if !ij.IsFirstJoinInPairedJoiner {
panic(errors.AssertionFailedf(
"lookup paired-join is paired with inverted join that thinks it is unpaired"))
if t.ContinuationCol == 0 {
panic(errors.AssertionFailedf("first join in paired joiner must have a continuation column"))
}
}
if t.IsSecondJoinInPairedJoiner {
switch firstJoin := t.Input.(type) {
case *InvertedJoinExpr:
if !firstJoin.IsFirstJoinInPairedJoiner {
panic(errors.AssertionFailedf(
"lookup paired-join is paired with inverted join that thinks it is unpaired"))
}
case *LookupJoinExpr:
if !firstJoin.IsFirstJoinInPairedJoiner {
panic(errors.AssertionFailedf(
"lookup paired-join is paired with lookup join that thinks it is unpaired"))
}
default:
panic(errors.AssertionFailedf("lookup paired-join is paired with %T", t.Input))
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/opt/memo/expr_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,9 @@ func (f *ExprFmtCtx) formatRelational(e RelExpr, tp treeprinter.Node) {
if t.LookupColsAreTableKey {
tp.Childf("lookup columns are key")
}
if t.IsFirstJoinInPairedJoiner {
f.formatColList(e, tp, "first join in paired joiner; continuation column:", opt.ColList{t.ContinuationCol})
}
if t.IsSecondJoinInPairedJoiner {
tp.Childf("second join in paired joiner")
}
Expand Down
16 changes: 14 additions & 2 deletions pkg/sql/opt/ops/relational.opt
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,22 @@ define LookupJoinPrivate {
# table (and thus each left row matches with at most one table row).
LookupColsAreTableKey bool

# IsSecondJoinInPairedJoiner is true if this is the second join of a
# paired-joiner used for left joins.
# At most one of Is{First,Second}JoinInPairedJoiner can be true.
#
# IsFirstJoinInPairedJoiner is true if this is the first (i.e., lower in the
# plan tree) join of a paired-joiner used for left joins.
IsFirstJoinInPairedJoiner bool

# IsSecondJoinInPairedJoiner is true if this is the second (i.e., higher in
# the plan tree) join of a paired-joiner used for left joins.
IsSecondJoinInPairedJoiner bool

# ContinuationCol is the column ID of the continuation column when
# IsFirstJoinInPairedJoiner is true. The continuation column is a boolean
# column that indicates whether an output row is a continuation of a group
# corresponding to a single left input row.
ContinuationCol ColumnID

# LocalityOptimized is true if this lookup join is part of a locality
# optimized search strategy. For semi, inner, and left joins, this means
# that RemoteLookupExpr will be non-nil. See comments above that field for
Expand Down
Loading