Skip to content

Commit

Permalink
sql: adjust physical planning heuristics around small joins
Browse files Browse the repository at this point in the history
Previously, whenever we had a hash or a merge join with at least one
equality column (i.e. not a cross join), we would force the distribution
of the plan. This can be suboptimal when we need to process a small
number of rows. This commit makes the heuristic a bit more configurable:
we now will choose to distribute only when performing a "large" join,
where "large" is defined as having both inputs produce at least 1k rows
(configured via new `distribute_join_row_count_threshold` session
variable). If both inputs don't have stats available, we fall back to
the old behavior of distributing the plan (which seems like a safer
option).

Note the following operations still force the plan distribution
unconditionally:
- table statistics
- inverted filter (when we have a union of inverted spans)
- inverted join
- window functions with at least one window frame with PARTITION BY
clause
- zigzag join.

I think there are cases where we don't want to force the distribution
because of these operations, but they seem less of a concern, so I left
a few TODOs. Ideally, eventually we'll have the optimizer make the
decision.

Release note (sql change): DistSQL physical planning decisions under
`distsql=auto` mode have been adjusted in the following manner:
presence of the hash or merge join no longer forces the plan to be
distributed. Namely, we might not choose to distribute the plan if both
inputs to the join are expected produce small number of rows (less than
1k combined by default, configurable via
`distribute_join_row_count_threshold` session variable).
  • Loading branch information
yuzefovich committed Dec 16, 2024
1 parent 2899e77 commit 9ab2996
Show file tree
Hide file tree
Showing 13 changed files with 162 additions and 21 deletions.
22 changes: 18 additions & 4 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,8 @@ func checkSupportForPlanNode(
if err != nil {
return cannotDistribute, err
}
// TODO(yuzefovich): we might want to be smarter about this and don't
// force distribution with small inputs.
return rec.compose(shouldDistribute), nil

case *joinNode:
Expand All @@ -600,12 +602,18 @@ func checkSupportForPlanNode(
if err != nil {
return cannotDistribute, err
}
// If either the left or the right side can benefit from distribution, we
// should distribute.
rec := recLeft.compose(recRight)
// If we can do a hash join, we distribute if possible.
if len(n.pred.leftEqualityIndices) > 0 {
rec = rec.compose(shouldDistribute)
// We can partition both streams on the equality columns.
if n.estimatedLeftRowCount == 0 && n.estimatedRightRowCount == 0 {
// In the absence of stats for both inputs, fall back to
// distributing.
rec = rec.compose(shouldDistribute)
} else if n.estimatedLeftRowCount+n.estimatedRightRowCount >= sd.DistributeJoinRowCountThreshold {
// If we have stats on at least one input, then distribute only
// if the join appears to be "large".
rec = rec.compose(shouldDistribute)
}
}
return rec, nil

Expand Down Expand Up @@ -757,6 +765,8 @@ func checkSupportForPlanNode(
if len(f.partitionIdxs) > 0 {
// If at least one function has PARTITION BY clause, then we
// should distribute the execution.
// TODO(yuzefovich): we might want to be smarter about this and
// don't force distribution with small inputs.
return rec.compose(shouldDistribute), nil
}
}
Expand All @@ -778,6 +788,8 @@ func checkSupportForPlanNode(
if err := checkExprForDistSQL(n.onCond, distSQLVisitor); err != nil {
return cannotDistribute, err
}
// TODO(yuzefovich): we might want to be smarter about this and don't
// force distribution with small inputs.
return shouldDistribute, nil
case *cdcValuesNode:
return cannotDistribute, nil
Expand Down Expand Up @@ -815,6 +827,8 @@ func checkSupportForInvertedFilterNode(
// related to #50659. Fix this in the distSQLSpecExecFactory.
filterRec := cannotDistribute
if n.expression.Left == nil && n.expression.Right == nil {
// TODO(yuzefovich): we might want to be smarter about this and don't
// force distribution with small inputs.
filterRec = shouldDistribute
}
return rec.compose(filterRec), nil
Expand Down
22 changes: 19 additions & 3 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,11 +425,13 @@ func (e *distSQLSpecExecFactory) ConstructHashJoin(
leftEqCols, rightEqCols []exec.NodeColumnOrdinal,
leftEqColsAreKey, rightEqColsAreKey bool,
extraOnCond tree.TypedExpr,
estimatedLeftRowCount, estimatedRightRowCount uint64,
) (exec.Node, error) {
return e.constructHashOrMergeJoin(
joinType, left, right, extraOnCond, leftEqCols, rightEqCols,
leftEqColsAreKey, rightEqColsAreKey,
ReqOrdering{} /* mergeJoinOrdering */, exec.OutputOrdering{}, /* reqOrdering */
estimatedLeftRowCount, estimatedRightRowCount,
)
}

Expand All @@ -440,6 +442,7 @@ func (e *distSQLSpecExecFactory) ConstructMergeJoin(
leftOrdering, rightOrdering colinfo.ColumnOrdering,
reqOrdering exec.OutputOrdering,
leftEqColsAreKey, rightEqColsAreKey bool,
estimatedLeftRowCount, estimatedRightRowCount uint64,
) (exec.Node, error) {
leftEqCols, rightEqCols, mergeJoinOrdering, err := getEqualityIndicesAndMergeJoinOrdering(leftOrdering, rightOrdering)
if err != nil {
Expand All @@ -448,6 +451,7 @@ func (e *distSQLSpecExecFactory) ConstructMergeJoin(
return e.constructHashOrMergeJoin(
joinType, left, right, onCond, leftEqCols, rightEqCols,
leftEqColsAreKey, rightEqColsAreKey, mergeJoinOrdering, reqOrdering,
estimatedLeftRowCount, estimatedRightRowCount,
)
}

Expand Down Expand Up @@ -1238,6 +1242,7 @@ func (e *distSQLSpecExecFactory) constructHashOrMergeJoin(
leftEqColsAreKey, rightEqColsAreKey bool,
mergeJoinOrdering colinfo.ColumnOrdering,
reqOrdering exec.OutputOrdering,
estimatedLeftRowCount, estimatedRightRowCount uint64,
) (exec.Node, error) {
leftPhysPlan, leftPlan := getPhysPlan(left)
rightPhysPlan, rightPlan := getPhysPlan(right)
Expand All @@ -1251,9 +1256,20 @@ func (e *distSQLSpecExecFactory) constructHashOrMergeJoin(
rightPlanToStreamColMap: rightMap,
}
post, joinToStreamColMap := helper.joinOutColumns(joinType, resultColumns)
// We always try to distribute the join, but planJoiners() itself might
// decide not to.
planCtx := e.getPlanCtx(shouldDistribute)
rec := canDistribute
if len(leftEqCols) > 0 {
// We can partition both streams on the equality columns.
if estimatedLeftRowCount == 0 && estimatedRightRowCount == 0 {
// In the absence of stats for both inputs, fall back to
// distributing.
rec = shouldDistribute
} else if estimatedLeftRowCount+estimatedRightRowCount >= e.planner.SessionData().DistributeGroupByRowCountThreshold {
// If we have stats on at least one input, then distribute only if
// the join appears to be "large".
rec = shouldDistribute
}
}
planCtx := e.getPlanCtx(rec)
onExpr, err := helper.remapOnExpr(e.ctx, planCtx, onCond)
if err != nil {
return nil, err
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3343,6 +3343,10 @@ func (m *sessionDataMutator) SetAlwaysDistributeFullScans(val bool) {
m.data.AlwaysDistributeFullScans = val
}

func (m *sessionDataMutator) SetDistributeJoinRowCountThreshold(val uint64) {
m.data.DistributeJoinRowCountThreshold = val
}

func (m *sessionDataMutator) SetDisableVecUnionEagerCancellation(val bool) {
m.data.DisableVecUnionEagerCancellation = val
}
Expand Down
22 changes: 17 additions & 5 deletions pkg/sql/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,28 @@ type joinNode struct {

// columns contains the metadata for the results of this node.
columns colinfo.ResultColumns

// estimatedLeftRowCount, when set, is the estimated number of rows that
// the left input will produce.
estimatedLeftRowCount uint64
// estimatedRightRowCount, when set, is the estimated number of rows that
// the right input will produce.
estimatedRightRowCount uint64
}

func (p *planner) makeJoinNode(
left planDataSource, right planDataSource, pred *joinPredicate,
left planDataSource,
right planDataSource,
pred *joinPredicate,
estimatedLeftRowCount, estimatedRightRowCount uint64,
) *joinNode {
n := &joinNode{
left: left,
right: right,
pred: pred,
columns: pred.cols,
left: left,
right: right,
pred: pred,
columns: pred.cols,
estimatedLeftRowCount: estimatedLeftRowCount,
estimatedRightRowCount: estimatedRightRowCount,
}
return n
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/information_schema
Original file line number Diff line number Diff line change
Expand Up @@ -3939,6 +3939,7 @@ disable_plan_gists off
disable_vec_union_eager_cancellation off
disallow_full_table_scans off
distribute_group_by_row_count_threshold 1000
distribute_join_row_count_threshold 1000
distribute_scan_row_count_threshold 10000
distribute_sort_row_count_threshold 1000
distsql_plan_gateway_bias 2
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/pg_catalog
Original file line number Diff line number Diff line change
Expand Up @@ -2939,6 +2939,7 @@ disable_plan_gists off N
disable_vec_union_eager_cancellation off NULL NULL NULL string
disallow_full_table_scans off NULL NULL NULL string
distribute_group_by_row_count_threshold 1000 NULL NULL NULL string
distribute_join_row_count_threshold 1000 NULL NULL NULL string
distribute_scan_row_count_threshold 10000 NULL NULL NULL string
distribute_sort_row_count_threshold 1000 NULL NULL NULL string
distsql off NULL NULL NULL string
Expand Down Expand Up @@ -3141,6 +3142,7 @@ disable_plan_gists off N
disable_vec_union_eager_cancellation off NULL user NULL off off
disallow_full_table_scans off NULL user NULL off off
distribute_group_by_row_count_threshold 1000 NULL user NULL 1000 1000
distribute_join_row_count_threshold 1000 NULL user NULL 1000 1000
distribute_scan_row_count_threshold 10000 NULL user NULL 10000 10000
distribute_sort_row_count_threshold 1000 NULL user NULL 1000 1000
distsql off NULL user NULL off off
Expand Down Expand Up @@ -3339,6 +3341,7 @@ disable_plan_gists NULL NULL NULL
disable_vec_union_eager_cancellation NULL NULL NULL NULL NULL
disallow_full_table_scans NULL NULL NULL NULL NULL
distribute_group_by_row_count_threshold NULL NULL NULL NULL NULL
distribute_join_row_count_threshold NULL NULL NULL NULL NULL
distribute_scan_row_count_threshold NULL NULL NULL NULL NULL
distribute_sort_row_count_threshold NULL NULL NULL NULL NULL
distsql NULL NULL NULL NULL NULL
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/logictest/testdata/logic_test/show_source
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ disable_plan_gists off
disable_vec_union_eager_cancellation off
disallow_full_table_scans off
distribute_group_by_row_count_threshold 1000
distribute_join_row_count_threshold 1000
distribute_scan_row_count_threshold 10000
distribute_sort_row_count_threshold 1000
distsql off
Expand Down
22 changes: 18 additions & 4 deletions pkg/sql/opt/exec/execbuilder/relational.go
Original file line number Diff line number Diff line change
Expand Up @@ -1346,6 +1346,13 @@ func (b *Builder) buildHashJoin(join memo.RelExpr) (_ execPlan, outputCols colOr
leftExpr := join.Child(0).(memo.RelExpr)
rightExpr := join.Child(1).(memo.RelExpr)
filters := join.Child(2).(*memo.FiltersExpr)
var leftRowCount, rightRowCount uint64
if leftExpr.Relational().Statistics().Available {
leftRowCount = uint64(leftExpr.Relational().Statistics().RowCount)
}
if rightExpr.Relational().Statistics().Available {
rightRowCount = uint64(rightExpr.Relational().Statistics().RowCount)
}
if joinType == descpb.LeftSemiJoin || joinType == descpb.LeftAntiJoin {
// The execution engine always builds the hash table on the right side
// of the join, so it is beneficial for the smaller relation to be on
Expand All @@ -1356,15 +1363,14 @@ func (b *Builder) buildHashJoin(join memo.RelExpr) (_ execPlan, outputCols colOr
// to apply join hints to semi or anti joins. Join hints are only
// possible on explicit joins using the JOIN keyword, and semi and anti
// joins are only created from implicit joins without the JOIN keyword.
leftRowCount := leftExpr.Relational().Statistics().RowCount
rightRowCount := rightExpr.Relational().Statistics().RowCount
if leftRowCount < rightRowCount {
if joinType == descpb.LeftSemiJoin {
joinType = descpb.RightSemiJoin
} else {
joinType = descpb.RightAntiJoin
}
leftExpr, rightExpr = rightExpr, leftExpr
leftRowCount, rightRowCount = rightRowCount, leftRowCount
}
}

Expand Down Expand Up @@ -1444,6 +1450,7 @@ func (b *Builder) buildHashJoin(join memo.RelExpr) (_ execPlan, outputCols colOr
leftEqOrdinals, rightEqOrdinals,
leftEqColsAreKey, rightEqColsAreKey,
onExpr,
leftRowCount, rightRowCount,
)
if err != nil {
return execPlan{}, colOrdMap{}, err
Expand All @@ -1466,14 +1473,19 @@ func (b *Builder) buildMergeJoin(
leftExpr, rightExpr := join.Left, join.Right
leftEq, rightEq := join.LeftEq, join.RightEq

var leftRowCount, rightRowCount uint64
if leftExpr.Relational().Statistics().Available {
leftRowCount = uint64(leftExpr.Relational().Statistics().RowCount)
}
if rightExpr.Relational().Statistics().Available {
rightRowCount = uint64(rightExpr.Relational().Statistics().RowCount)
}
if joinType == descpb.LeftSemiJoin || joinType == descpb.LeftAntiJoin {
// We have a partial join, and we want to make sure that the relation
// with smaller cardinality is on the right side. Note that we assumed
// it during the costing.
// TODO(raduberinde): we might also need to look at memo.JoinFlags when
// choosing a side.
leftRowCount := leftExpr.Relational().Statistics().RowCount
rightRowCount := rightExpr.Relational().Statistics().RowCount
if leftRowCount < rightRowCount {
if joinType == descpb.LeftSemiJoin {
joinType = descpb.RightSemiJoin
Expand All @@ -1482,6 +1494,7 @@ func (b *Builder) buildMergeJoin(
}
leftExpr, rightExpr = rightExpr, leftExpr
leftEq, rightEq = rightEq, leftEq
leftRowCount, rightRowCount = rightRowCount, leftRowCount
}
}

Expand Down Expand Up @@ -1536,6 +1549,7 @@ func (b *Builder) buildMergeJoin(
onExpr,
leftOrd, rightOrd, reqOrd,
leftEqColsAreKey, rightEqColsAreKey,
leftRowCount, rightRowCount,
)
if err != nil {
return execPlan{}, colOrdMap{}, err
Expand Down
49 changes: 46 additions & 3 deletions pkg/sql/opt/exec/execbuilder/testdata/distsql_auto_mode
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,24 @@ SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1] WHERE info LIKE 'distribut
----
distribution: local

# Sort (no stats) - distribute.
query T
SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1 ORDER BY v] WHERE info LIKE 'distribution%'
----
distribution: full

# Hash join (no stats) - distribute.
query T
SELECT info FROM [EXPLAIN SELECT * FROM kv AS t1, kv AS t2 WHERE t1.k = t2.k+1 AND t1.k>1 AND t2.k>1] WHERE info LIKE 'distribution%'
----
distribution: full

# Cross join (no stats) - don't distribute.
query T
SELECT info FROM [EXPLAIN SELECT * FROM kv AS t1, kv AS t2 WHERE t1.k>1 AND t2.k>1] WHERE info LIKE 'distribution%'
----
distribution: local

statement ok
ALTER TABLE kv INJECT STATISTICS '[
{
Expand Down Expand Up @@ -186,20 +204,45 @@ SELECT info FROM [EXPLAIN SELECT k, sum(v) FROM kv WHERE k>1 GROUP BY k LIMIT 1]
----
distribution: full

# Hash join over large inputs - distribute.
query T
SELECT info FROM [EXPLAIN SELECT * FROM kv AS t1, kv AS t2 WHERE t1.k = t2.k+1 AND t1.k>1 AND t2.k>1] WHERE info LIKE 'distribution%'
----
distribution: full

# Cross join over large inputs - don't distribute.
query T
SELECT info FROM [EXPLAIN SELECT * FROM kv AS t1, kv AS t2 WHERE t1.k>1 AND t2.k>1] WHERE info LIKE 'distribution%'
----
distribution: local

# Now consider the inputs small.
statement ok
SET distribute_join_row_count_threshold = 100000;

# Hash join over small inputs - don't distribute.
query T
SELECT info FROM [EXPLAIN SELECT * FROM kv AS t1, kv AS t2 WHERE t1.k = t2.k+1 AND t1.k>1 AND t2.k>1] WHERE info LIKE 'distribution%'
----
distribution: local

statement ok
RESET distribute_join_row_count_threshold;

statement ok
CREATE TABLE kw (k INT PRIMARY KEY, w INT);
ALTER TABLE kw SPLIT AT SELECT i FROM generate_series(1,5) AS g(i);
ALTER TABLE kw EXPERIMENTAL_RELOCATE SELECT ARRAY[i], i FROM generate_series(1, 5) as g(i)

# Join - distribute.
# Large hash join - distribute.
query T
SELECT info FROM [EXPLAIN SELECT * FROM kv NATURAL JOIN kw] WHERE info LIKE 'distribution%'
----
distribution: full

# Join with the data living on the remote node - distribute.
# Large hash join with the data living on the remote node - distribute.
query T
SELECT info FROM [EXPLAIN SELECT * FROM kv NATURAL JOIN kw WHERE k=2] WHERE info LIKE 'distribution%'
SELECT info FROM [EXPLAIN SELECT * FROM kv NATURAL JOIN kw WHERE k=2 OR k>5] WHERE info LIKE 'distribution%'
----
distribution: full

Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/opt/exec/factory.opt
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ define HashJoin {
LeftEqColsAreKey bool
RightEqColsAreKey bool
ExtraOnCond tree.TypedExpr
EstimatedLeftRowCount uint64
EstimatedRightRowCount uint64
}

# MergeJoin runs a merge join.
Expand All @@ -118,6 +120,8 @@ define MergeJoin {
ReqOrdering exec.OutputOrdering
LeftEqColsAreKey bool
RightEqColsAreKey bool
EstimatedLeftRowCount uint64
EstimatedRightRowCount uint64
}

# GroupBy runs an aggregation. A set of aggregations is performed for each group
Expand Down
Loading

0 comments on commit 9ab2996

Please sign in to comment.