From 8d26e46bb216cbded790b2273cb4045efda9f0f2 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 16 Dec 2024 12:58:50 -0800 Subject: [PATCH 1/3] sql: fix physical planner heuristic in edge case with cross joins Long time ago in a74611e45e01ebcef42b38ecfdcfd57f3e9d2331 we introduced a change so that when we merge two plans when performing a cross join and both plans have a single result router (i.e. a single processor), we would plan the cross join on the same node as the _left_ input. This was done under the assumption that the left input is processed first by the joiner, but this assumption has changed since then (long time ago too), so we should be choosing the _right_ input. The impact seems very minor though, just something I noticed while working around this area. Release note: None --- pkg/sql/distsql_physical_planner.go | 11 +++---- .../exec/execbuilder/testdata/distsql_join | 29 +++++++++++++++++++ 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 0d9d759165cf..e23bbe785f12 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -3837,12 +3837,13 @@ func (dsp *DistSQLPlanner) planJoiners( // single processor. sqlInstances = []base.SQLInstanceID{dsp.gatewaySQLInstanceID} - // If either side has a single stream, put the processor on that node. We - // prefer the left side because that is processed first by the hash joiner. - if len(leftRouters) == 1 { - sqlInstances[0] = p.Processors[leftRouters[0]].SQLInstanceID - } else if len(rightRouters) == 1 { + // If either side has a single stream, put the processor on that node. + // We prefer the right side because that is processed first by the hash + // joiner. + if len(rightRouters) == 1 { sqlInstances[0] = p.Processors[rightRouters[0]].SQLInstanceID + } else if len(leftRouters) == 1 { + sqlInstances[0] = p.Processors[leftRouters[0]].SQLInstanceID } } diff --git a/pkg/sql/opt/exec/execbuilder/testdata/distsql_join b/pkg/sql/opt/exec/execbuilder/testdata/distsql_join index 0852a05cc06b..0423b05bf73e 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/distsql_join +++ b/pkg/sql/opt/exec/execbuilder/testdata/distsql_join @@ -282,3 +282,32 @@ EXPLAIN (VEC) ) AND c_custkey = o_custkey; RESET vectorize + +statement ok +CREATE TABLE kv (k INT PRIMARY KEY, v INT); +ALTER TABLE kv SPLIT AT SELECT i FROM generate_series(1,5) AS g(i); +ALTER TABLE kv EXPERIMENTAL_RELOCATE SELECT ARRAY[i], i FROM generate_series(1, 5) as g(i); + +statement ok +ALTER TABLE kv INJECT STATISTICS '[ + { + "columns": ["k"], + "created_at": "2024-01-01 1:00:00.00000+00:00", + "row_count": 1000, + "distinct_count": 1000 + } +]'; + +# Force the distribution of the next plan. +statement ok +SET distribute_scan_row_count_threshold = 1; + +# Verify that when merging plans where each has a single result router (n3 and +# n4), we plan the joiner on the same node as the right input (n4). +query T +SELECT * FROM [EXPLAIN (DISTSQL) SELECT * FROM kv AS t1, kv AS t2 WHERE t1.k = 3 AND t2.k = 4] WHERE info LIKE 'Diagram%'; +---- +Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJycklFv2jAUhd_3K6wrTYXJjDjJk6VJoJKp2Sh0BGmTKlR55EKthDizHboK8d-nhLKSqKTd_BDl2sffPcf2DsyvFDhEwTi4nJMP5PNsek1ugx8342E4IZ1RGM2jb-MuqQuSLRlGxDJ6_HPJ96tgFpCOZR8T8ol4XTKcjEjHulXpdxdPApmtFBmHXwNyMZJircXm_QVQyFSME7FBA_wWGFDwgIIPCwq5Vks0RulyaVcJw_g3cIeCzPLCltMLCkulEfgOrLQpAoeJ6qm8X1JitEKmlWxPQRX2eZOxYo3A_ZMu4Qi4t6cnjVh7o7n4meIMRYy679TaQbIdJNu7PMFHoHCp0mKTGU4SSrZAIcpFWfW9vgPnjLGGMadmzH27MfYfxvwWY27DGDtr7NlPkSkdo8a4eSevS15IdyXM_RclM9R9tx5uWlhOBowOXDrw6MA_G8JrhHBrIV55XzM0ucoMvumBOY1OPVZGwniNhyMyqtBLvNFqWWkP5bQCVRMxGntY9Q9FmB2XjNUoNn8fxymJtZK8Gom1ktx_ILmnJNYkea0k53w6tzyxVaoe7mQMHJyn0XvhcxxQbhBrU15bdK8eKuz8MS8PfSVSgxSuRYIjtKg3MpPGyiVwqwvc79_9CQAA__-phZ3F + +statement ok +RESET distribute_scan_row_count_threshold; From a2db3126b8ff10e351c6d5748555d72cf712f1ce Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 16 Dec 2024 13:09:50 -0800 Subject: [PATCH 2/3] sql: adjust physical planning heuristics around small joins Previously, whenever we had a hash or a merge join with at least one equality column (i.e. not a cross join), we would force the distribution of the plan. This can be suboptimal when we need to process a small number of rows. This commit makes the heuristic a bit more configurable: we now will choose to distribute only when performing a "large" join, where "large" is defined as having both inputs produce at least 1k rows (configured via new `distribute_join_row_count_threshold` session variable). If both inputs don't have stats available, we fall back to the old behavior of distributing the plan (which seems like a safer option). Note the following operations still force the plan distribution unconditionally: - table statistics - inverted filter (when we have a union of inverted spans) - inverted join - window functions with at least one window frame with PARTITION BY clause - zigzag join. I think there are cases where we don't want to force the distribution because of these operations, but they seem less of a concern, so I left a few TODOs. Ideally, eventually we'll have the optimizer make the decision. Release note (sql change): DistSQL physical planning decisions under `distsql=auto` mode have been adjusted in the following manner: presence of the hash or merge join no longer forces the plan to be distributed. Namely, we might not choose to distribute the plan if both inputs to the join are expected produce small number of rows (less than 1k combined by default, configurable via `distribute_join_row_count_threshold` session variable). --- pkg/sql/distsql_physical_planner.go | 22 +++++++-- pkg/sql/distsql_spec_exec_factory.go | 22 +++++++-- pkg/sql/exec_util.go | 4 ++ pkg/sql/join.go | 22 +++++++-- .../testdata/logic_test/information_schema | 1 + .../logictest/testdata/logic_test/pg_catalog | 3 ++ .../logictest/testdata/logic_test/show_source | 1 + pkg/sql/opt/exec/execbuilder/relational.go | 24 +++++++++ .../execbuilder/testdata/distsql_auto_mode | 49 +++++++++++++++++-- pkg/sql/opt/exec/factory.opt | 4 ++ pkg/sql/opt_exec_factory.go | 6 ++- .../local_only_session_data.proto | 4 ++ pkg/sql/vars.go | 23 +++++++++ 13 files changed, 168 insertions(+), 17 deletions(-) diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index e23bbe785f12..0ba47febfbe6 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -586,6 +586,8 @@ func checkSupportForPlanNode( if err != nil { return cannotDistribute, err } + // TODO(yuzefovich): we might want to be smarter about this and don't + // force distribution with small inputs. return rec.compose(shouldDistribute), nil case *joinNode: @@ -600,12 +602,18 @@ func checkSupportForPlanNode( if err != nil { return cannotDistribute, err } - // If either the left or the right side can benefit from distribution, we - // should distribute. rec := recLeft.compose(recRight) - // If we can do a hash join, we distribute if possible. if len(n.pred.leftEqualityIndices) > 0 { - rec = rec.compose(shouldDistribute) + // We can partition both streams on the equality columns. + if n.estimatedLeftRowCount == 0 && n.estimatedRightRowCount == 0 { + // In the absence of stats for both inputs, fall back to + // distributing. + rec = rec.compose(shouldDistribute) + } else if n.estimatedLeftRowCount+n.estimatedRightRowCount >= sd.DistributeJoinRowCountThreshold { + // If we have stats on at least one input, then distribute only + // if the join appears to be "large". + rec = rec.compose(shouldDistribute) + } } return rec, nil @@ -757,6 +765,8 @@ func checkSupportForPlanNode( if len(f.partitionIdxs) > 0 { // If at least one function has PARTITION BY clause, then we // should distribute the execution. + // TODO(yuzefovich): we might want to be smarter about this and + // don't force distribution with small inputs. return rec.compose(shouldDistribute), nil } } @@ -778,6 +788,8 @@ func checkSupportForPlanNode( if err := checkExprForDistSQL(n.onCond, distSQLVisitor); err != nil { return cannotDistribute, err } + // TODO(yuzefovich): we might want to be smarter about this and don't + // force distribution with small inputs. return shouldDistribute, nil case *cdcValuesNode: return cannotDistribute, nil @@ -815,6 +827,8 @@ func checkSupportForInvertedFilterNode( // related to #50659. Fix this in the distSQLSpecExecFactory. filterRec := cannotDistribute if n.expression.Left == nil && n.expression.Right == nil { + // TODO(yuzefovich): we might want to be smarter about this and don't + // force distribution with small inputs. filterRec = shouldDistribute } return rec.compose(filterRec), nil diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index d658ffee7c44..791d3cdbad46 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -425,11 +425,13 @@ func (e *distSQLSpecExecFactory) ConstructHashJoin( leftEqCols, rightEqCols []exec.NodeColumnOrdinal, leftEqColsAreKey, rightEqColsAreKey bool, extraOnCond tree.TypedExpr, + estimatedLeftRowCount, estimatedRightRowCount uint64, ) (exec.Node, error) { return e.constructHashOrMergeJoin( joinType, left, right, extraOnCond, leftEqCols, rightEqCols, leftEqColsAreKey, rightEqColsAreKey, ReqOrdering{} /* mergeJoinOrdering */, exec.OutputOrdering{}, /* reqOrdering */ + estimatedLeftRowCount, estimatedRightRowCount, ) } @@ -440,6 +442,7 @@ func (e *distSQLSpecExecFactory) ConstructMergeJoin( leftOrdering, rightOrdering colinfo.ColumnOrdering, reqOrdering exec.OutputOrdering, leftEqColsAreKey, rightEqColsAreKey bool, + estimatedLeftRowCount, estimatedRightRowCount uint64, ) (exec.Node, error) { leftEqCols, rightEqCols, mergeJoinOrdering, err := getEqualityIndicesAndMergeJoinOrdering(leftOrdering, rightOrdering) if err != nil { @@ -448,6 +451,7 @@ func (e *distSQLSpecExecFactory) ConstructMergeJoin( return e.constructHashOrMergeJoin( joinType, left, right, onCond, leftEqCols, rightEqCols, leftEqColsAreKey, rightEqColsAreKey, mergeJoinOrdering, reqOrdering, + estimatedLeftRowCount, estimatedRightRowCount, ) } @@ -1238,6 +1242,7 @@ func (e *distSQLSpecExecFactory) constructHashOrMergeJoin( leftEqColsAreKey, rightEqColsAreKey bool, mergeJoinOrdering colinfo.ColumnOrdering, reqOrdering exec.OutputOrdering, + estimatedLeftRowCount, estimatedRightRowCount uint64, ) (exec.Node, error) { leftPhysPlan, leftPlan := getPhysPlan(left) rightPhysPlan, rightPlan := getPhysPlan(right) @@ -1251,9 +1256,20 @@ func (e *distSQLSpecExecFactory) constructHashOrMergeJoin( rightPlanToStreamColMap: rightMap, } post, joinToStreamColMap := helper.joinOutColumns(joinType, resultColumns) - // We always try to distribute the join, but planJoiners() itself might - // decide not to. - planCtx := e.getPlanCtx(shouldDistribute) + rec := canDistribute + if len(leftEqCols) > 0 { + // We can partition both streams on the equality columns. + if estimatedLeftRowCount == 0 && estimatedRightRowCount == 0 { + // In the absence of stats for both inputs, fall back to + // distributing. + rec = shouldDistribute + } else if estimatedLeftRowCount+estimatedRightRowCount >= e.planner.SessionData().DistributeGroupByRowCountThreshold { + // If we have stats on at least one input, then distribute only if + // the join appears to be "large". + rec = shouldDistribute + } + } + planCtx := e.getPlanCtx(rec) onExpr, err := helper.remapOnExpr(e.ctx, planCtx, onCond) if err != nil { return nil, err diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 94a9e6446c97..f1c7da0c379c 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -3343,6 +3343,10 @@ func (m *sessionDataMutator) SetAlwaysDistributeFullScans(val bool) { m.data.AlwaysDistributeFullScans = val } +func (m *sessionDataMutator) SetDistributeJoinRowCountThreshold(val uint64) { + m.data.DistributeJoinRowCountThreshold = val +} + func (m *sessionDataMutator) SetDisableVecUnionEagerCancellation(val bool) { m.data.DisableVecUnionEagerCancellation = val } diff --git a/pkg/sql/join.go b/pkg/sql/join.go index d430918ee9d5..7ae39e6544c1 100644 --- a/pkg/sql/join.go +++ b/pkg/sql/join.go @@ -31,16 +31,28 @@ type joinNode struct { // columns contains the metadata for the results of this node. columns colinfo.ResultColumns + + // estimatedLeftRowCount, when set, is the estimated number of rows that + // the left input will produce. + estimatedLeftRowCount uint64 + // estimatedRightRowCount, when set, is the estimated number of rows that + // the right input will produce. + estimatedRightRowCount uint64 } func (p *planner) makeJoinNode( - left planDataSource, right planDataSource, pred *joinPredicate, + left planDataSource, + right planDataSource, + pred *joinPredicate, + estimatedLeftRowCount, estimatedRightRowCount uint64, ) *joinNode { n := &joinNode{ - left: left, - right: right, - pred: pred, - columns: pred.cols, + left: left, + right: right, + pred: pred, + columns: pred.cols, + estimatedLeftRowCount: estimatedLeftRowCount, + estimatedRightRowCount: estimatedRightRowCount, } return n } diff --git a/pkg/sql/logictest/testdata/logic_test/information_schema b/pkg/sql/logictest/testdata/logic_test/information_schema index 74885d557dfe..90448a5fe555 100644 --- a/pkg/sql/logictest/testdata/logic_test/information_schema +++ b/pkg/sql/logictest/testdata/logic_test/information_schema @@ -3939,6 +3939,7 @@ disable_plan_gists off disable_vec_union_eager_cancellation off disallow_full_table_scans off distribute_group_by_row_count_threshold 1000 +distribute_join_row_count_threshold 1000 distribute_scan_row_count_threshold 10000 distribute_sort_row_count_threshold 1000 distsql_plan_gateway_bias 2 diff --git a/pkg/sql/logictest/testdata/logic_test/pg_catalog b/pkg/sql/logictest/testdata/logic_test/pg_catalog index 73a846d18e1d..4e8b78060568 100644 --- a/pkg/sql/logictest/testdata/logic_test/pg_catalog +++ b/pkg/sql/logictest/testdata/logic_test/pg_catalog @@ -2939,6 +2939,7 @@ disable_plan_gists off N disable_vec_union_eager_cancellation off NULL NULL NULL string disallow_full_table_scans off NULL NULL NULL string distribute_group_by_row_count_threshold 1000 NULL NULL NULL string +distribute_join_row_count_threshold 1000 NULL NULL NULL string distribute_scan_row_count_threshold 10000 NULL NULL NULL string distribute_sort_row_count_threshold 1000 NULL NULL NULL string distsql off NULL NULL NULL string @@ -3141,6 +3142,7 @@ disable_plan_gists off N disable_vec_union_eager_cancellation off NULL user NULL off off disallow_full_table_scans off NULL user NULL off off distribute_group_by_row_count_threshold 1000 NULL user NULL 1000 1000 +distribute_join_row_count_threshold 1000 NULL user NULL 1000 1000 distribute_scan_row_count_threshold 10000 NULL user NULL 10000 10000 distribute_sort_row_count_threshold 1000 NULL user NULL 1000 1000 distsql off NULL user NULL off off @@ -3339,6 +3341,7 @@ disable_plan_gists NULL NULL NULL disable_vec_union_eager_cancellation NULL NULL NULL NULL NULL disallow_full_table_scans NULL NULL NULL NULL NULL distribute_group_by_row_count_threshold NULL NULL NULL NULL NULL +distribute_join_row_count_threshold NULL NULL NULL NULL NULL distribute_scan_row_count_threshold NULL NULL NULL NULL NULL distribute_sort_row_count_threshold NULL NULL NULL NULL NULL distsql NULL NULL NULL NULL NULL diff --git a/pkg/sql/logictest/testdata/logic_test/show_source b/pkg/sql/logictest/testdata/logic_test/show_source index c644b325084c..5511dc0ddb39 100644 --- a/pkg/sql/logictest/testdata/logic_test/show_source +++ b/pkg/sql/logictest/testdata/logic_test/show_source @@ -70,6 +70,7 @@ disable_plan_gists off disable_vec_union_eager_cancellation off disallow_full_table_scans off distribute_group_by_row_count_threshold 1000 +distribute_join_row_count_threshold 1000 distribute_scan_row_count_threshold 10000 distribute_sort_row_count_threshold 1000 distsql off diff --git a/pkg/sql/opt/exec/execbuilder/relational.go b/pkg/sql/opt/exec/execbuilder/relational.go index 8b4963fc8e79..aa188c2f4a51 100644 --- a/pkg/sql/opt/exec/execbuilder/relational.go +++ b/pkg/sql/opt/exec/execbuilder/relational.go @@ -1356,6 +1356,10 @@ func (b *Builder) buildHashJoin(join memo.RelExpr) (_ execPlan, outputCols colOr // to apply join hints to semi or anti joins. Join hints are only // possible on explicit joins using the JOIN keyword, and semi and anti // joins are only created from implicit joins without the JOIN keyword. + // + // Note that we use the row count estimates even when stats are + // unavailable so that the input that is more likely to be smaller ended + // up on the right side. leftRowCount := leftExpr.Relational().Statistics().RowCount rightRowCount := rightExpr.Relational().Statistics().RowCount if leftRowCount < rightRowCount { @@ -1430,6 +1434,13 @@ func (b *Builder) buildHashJoin(join memo.RelExpr) (_ execPlan, outputCols colOr leftEqColsAreKey := leftExpr.Relational().FuncDeps.ColsAreStrictKey(leftEq.ToSet()) rightEqColsAreKey := rightExpr.Relational().FuncDeps.ColsAreStrictKey(rightEq.ToSet()) + var leftRowCount, rightRowCount uint64 + if leftExpr.Relational().Statistics().Available { + leftRowCount = uint64(leftExpr.Relational().Statistics().RowCount) + } + if rightExpr.Relational().Statistics().Available { + rightRowCount = uint64(rightExpr.Relational().Statistics().RowCount) + } b.recordJoinType(joinType) if isCrossJoin { @@ -1444,6 +1455,7 @@ func (b *Builder) buildHashJoin(join memo.RelExpr) (_ execPlan, outputCols colOr leftEqOrdinals, rightEqOrdinals, leftEqColsAreKey, rightEqColsAreKey, onExpr, + leftRowCount, rightRowCount, ) if err != nil { return execPlan{}, colOrdMap{}, err @@ -1470,6 +1482,10 @@ func (b *Builder) buildMergeJoin( // We have a partial join, and we want to make sure that the relation // with smaller cardinality is on the right side. Note that we assumed // it during the costing. + // + // Note that we use the row count estimates even when stats are + // unavailable so that the input that is more likely to be smaller ended + // up on the right side. // TODO(raduberinde): we might also need to look at memo.JoinFlags when // choosing a side. leftRowCount := leftExpr.Relational().Statistics().RowCount @@ -1527,6 +1543,13 @@ func (b *Builder) buildMergeJoin( } leftEqColsAreKey := leftExpr.Relational().FuncDeps.ColsAreStrictKey(leftEq.ColSet()) rightEqColsAreKey := rightExpr.Relational().FuncDeps.ColsAreStrictKey(rightEq.ColSet()) + var leftRowCount, rightRowCount uint64 + if leftExpr.Relational().Statistics().Available { + leftRowCount = uint64(leftExpr.Relational().Statistics().RowCount) + } + if rightExpr.Relational().Statistics().Available { + rightRowCount = uint64(rightExpr.Relational().Statistics().RowCount) + } b.recordJoinType(joinType) b.recordJoinAlgorithm(exec.MergeJoin) var ep execPlan @@ -1536,6 +1559,7 @@ func (b *Builder) buildMergeJoin( onExpr, leftOrd, rightOrd, reqOrd, leftEqColsAreKey, rightEqColsAreKey, + leftRowCount, rightRowCount, ) if err != nil { return execPlan{}, colOrdMap{}, err diff --git a/pkg/sql/opt/exec/execbuilder/testdata/distsql_auto_mode b/pkg/sql/opt/exec/execbuilder/testdata/distsql_auto_mode index 262b19ce8eb6..27e55ab19aa4 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/distsql_auto_mode +++ b/pkg/sql/opt/exec/execbuilder/testdata/distsql_auto_mode @@ -27,6 +27,24 @@ SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1] WHERE info LIKE 'distribut ---- distribution: local +# Sort (no stats) - distribute. +query T +SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1 ORDER BY v] WHERE info LIKE 'distribution%' +---- +distribution: full + +# Hash join (no stats) - distribute. +query T +SELECT info FROM [EXPLAIN SELECT * FROM kv AS t1, kv AS t2 WHERE t1.k = t2.k+1 AND t1.k>1 AND t2.k>1] WHERE info LIKE 'distribution%' +---- +distribution: full + +# Cross join (no stats) - don't distribute. +query T +SELECT info FROM [EXPLAIN SELECT * FROM kv AS t1, kv AS t2 WHERE t1.k>1 AND t2.k>1] WHERE info LIKE 'distribution%' +---- +distribution: local + statement ok ALTER TABLE kv INJECT STATISTICS '[ { @@ -186,20 +204,45 @@ SELECT info FROM [EXPLAIN SELECT k, sum(v) FROM kv WHERE k>1 GROUP BY k LIMIT 1] ---- distribution: full +# Hash join over large inputs - distribute. +query T +SELECT info FROM [EXPLAIN SELECT * FROM kv AS t1, kv AS t2 WHERE t1.k = t2.k+1 AND t1.k>1 AND t2.k>1] WHERE info LIKE 'distribution%' +---- +distribution: full + +# Cross join over large inputs - don't distribute. +query T +SELECT info FROM [EXPLAIN SELECT * FROM kv AS t1, kv AS t2 WHERE t1.k>1 AND t2.k>1] WHERE info LIKE 'distribution%' +---- +distribution: local + +# Now consider the inputs small. +statement ok +SET distribute_join_row_count_threshold = 100000; + +# Hash join over small inputs - don't distribute. +query T +SELECT info FROM [EXPLAIN SELECT * FROM kv AS t1, kv AS t2 WHERE t1.k = t2.k+1 AND t1.k>1 AND t2.k>1] WHERE info LIKE 'distribution%' +---- +distribution: local + +statement ok +RESET distribute_join_row_count_threshold; + statement ok CREATE TABLE kw (k INT PRIMARY KEY, w INT); ALTER TABLE kw SPLIT AT SELECT i FROM generate_series(1,5) AS g(i); ALTER TABLE kw EXPERIMENTAL_RELOCATE SELECT ARRAY[i], i FROM generate_series(1, 5) as g(i) -# Join - distribute. +# Large hash join - distribute. query T SELECT info FROM [EXPLAIN SELECT * FROM kv NATURAL JOIN kw] WHERE info LIKE 'distribution%' ---- distribution: full -# Join with the data living on the remote node - distribute. +# Large hash join with the data living on the remote node - distribute. query T -SELECT info FROM [EXPLAIN SELECT * FROM kv NATURAL JOIN kw WHERE k=2] WHERE info LIKE 'distribution%' +SELECT info FROM [EXPLAIN SELECT * FROM kv NATURAL JOIN kw WHERE k=2 OR k>5] WHERE info LIKE 'distribution%' ---- distribution: full diff --git a/pkg/sql/opt/exec/factory.opt b/pkg/sql/opt/exec/factory.opt index 0c99ee15df37..8b8eecec3c86 100644 --- a/pkg/sql/opt/exec/factory.opt +++ b/pkg/sql/opt/exec/factory.opt @@ -100,6 +100,8 @@ define HashJoin { LeftEqColsAreKey bool RightEqColsAreKey bool ExtraOnCond tree.TypedExpr + EstimatedLeftRowCount uint64 + EstimatedRightRowCount uint64 } # MergeJoin runs a merge join. @@ -118,6 +120,8 @@ define MergeJoin { ReqOrdering exec.OutputOrdering LeftEqColsAreKey bool RightEqColsAreKey bool + EstimatedLeftRowCount uint64 + EstimatedRightRowCount uint64 } # GroupBy runs an aggregation. A set of aggregations is performed for each group diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index 7f06c2ef273a..a8dbcd473afd 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -415,6 +415,7 @@ func (ef *execFactory) ConstructHashJoin( leftEqCols, rightEqCols []exec.NodeColumnOrdinal, leftEqColsAreKey, rightEqColsAreKey bool, extraOnCond tree.TypedExpr, + estimatedLeftRowCount, estimatedRightRowCount uint64, ) (exec.Node, error) { p := ef.planner leftSrc := asDataSource(left) @@ -435,7 +436,7 @@ func (ef *execFactory) ConstructHashJoin( pred.leftEqKey = leftEqColsAreKey pred.rightEqKey = rightEqColsAreKey - return p.makeJoinNode(leftSrc, rightSrc, pred), nil + return p.makeJoinNode(leftSrc, rightSrc, pred, estimatedLeftRowCount, estimatedRightRowCount), nil } // ConstructApplyJoin is part of the exec.Factory interface. @@ -459,13 +460,14 @@ func (ef *execFactory) ConstructMergeJoin( leftOrdering, rightOrdering colinfo.ColumnOrdering, reqOrdering exec.OutputOrdering, leftEqColsAreKey, rightEqColsAreKey bool, + estimatedLeftRowCount, estimatedRightRowCount uint64, ) (exec.Node, error) { var err error p := ef.planner leftSrc := asDataSource(left) rightSrc := asDataSource(right) pred := makePredicate(joinType, leftSrc.columns, rightSrc.columns, onCond) - node := p.makeJoinNode(leftSrc, rightSrc, pred) + node := p.makeJoinNode(leftSrc, rightSrc, pred, estimatedLeftRowCount, estimatedRightRowCount) pred.leftEqKey = leftEqColsAreKey pred.rightEqKey = rightEqColsAreKey diff --git a/pkg/sql/sessiondatapb/local_only_session_data.proto b/pkg/sql/sessiondatapb/local_only_session_data.proto index 25be303a9b69..77f6e1e72667 100644 --- a/pkg/sql/sessiondatapb/local_only_session_data.proto +++ b/pkg/sql/sessiondatapb/local_only_session_data.proto @@ -578,6 +578,10 @@ message LocalOnlySessionData { // AlwaysDistributeFullScans determines whether full table scans always force // the plan to be distributed, regardless of the estimated row count. bool always_distribute_full_scans = 148; + // DistributeJoinRowCountThreshold is the minimum number of rows estimated to + // be processed from both inputs by the hash or merge join so that we choose + // to distribute the plan because of this joiner stage of DistSQL processors. + uint64 distribute_join_row_count_threshold = 149; /////////////////////////////////////////////////////////////////////////// // WARNING: consider whether a session parameter you're adding needs to // diff --git a/pkg/sql/vars.go b/pkg/sql/vars.go index acf9f226c4d9..66bfc19f8183 100644 --- a/pkg/sql/vars.go +++ b/pkg/sql/vars.go @@ -718,6 +718,29 @@ var varGen = map[string]sessionVar{ GlobalDefault: globalFalse, }, + // CockroachDB extension. + `distribute_join_row_count_threshold`: { + Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) { + return strconv.FormatUint(evalCtx.SessionData().DistributeJoinRowCountThreshold, 10), nil + }, + GetStringVal: makeIntGetStringValFn(`distribute_join_row_count_threshold`), + Set: func(_ context.Context, m sessionDataMutator, s string) error { + i, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return err + } + if i < 0 { + return pgerror.Newf(pgcode.InvalidParameterValue, + "cannot set distribute_join_row_count_threshold to a negative value: %d", i) + } + m.SetDistributeJoinRowCountThreshold(uint64(i)) + return nil + }, + GlobalDefault: func(sv *settings.Values) string { + return strconv.FormatUint(1000, 10) + }, + }, + // CockroachDB extension. `disable_vec_union_eager_cancellation`: { GetStringVal: makePostgresBoolGetStringValFn(`disable_vec_union_eager_cancellation`), From 936d9abeebd8e67941a472cf5c47f094db1f67c5 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 16 Dec 2024 14:14:23 -0800 Subject: [PATCH 3/3] sql: add some logging to physical planning This commit introduces some logging, hidden behind a verbosity level, to the physical planning whenever we choose `shouldDistribute` recommendation. This will show up in the trace and should be helpful to understand why we decided to distribute a particular query. Release note: None --- pkg/sql/distsql_physical_planner.go | 108 ++++++++++++++++------------ pkg/sql/exec_util.go | 2 +- 2 files changed, 65 insertions(+), 45 deletions(-) diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 0ba47febfbe6..c1878420c596 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -396,7 +396,7 @@ const ( cannotDistribute distRecommendation = iota // canDistribute indicates that a plan can be distributed, but it's not - // clear whether it'll be benefit from that. + // clear whether it'll benefit from that. canDistribute // shouldDistribute indicates that a plan will likely benefit if distributed. @@ -512,7 +512,10 @@ func mustWrapValuesNode(planCtx *PlanningCtx, specifiedInQuery bool) bool { // this plan couldn't be distributed. // TODO(radu): add tests for this. func checkSupportForPlanNode( - node planNode, distSQLVisitor *distSQLExprCheckVisitor, sd *sessiondata.SessionData, + ctx context.Context, + node planNode, + distSQLVisitor *distSQLExprCheckVisitor, + sd *sessiondata.SessionData, ) (distRecommendation, error) { switch n := node.(type) { // Keep these cases alphabetized, please! @@ -523,19 +526,19 @@ func checkSupportForPlanNode( return shouldDistribute, nil case *distinctNode: - return checkSupportForPlanNode(n.plan, distSQLVisitor, sd) + return checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd) case *exportNode: - return checkSupportForPlanNode(n.source, distSQLVisitor, sd) + return checkSupportForPlanNode(ctx, n.source, distSQLVisitor, sd) case *filterNode: if err := checkExprForDistSQL(n.filter, distSQLVisitor); err != nil { return cannotDistribute, err } - return checkSupportForPlanNode(n.source.plan, distSQLVisitor, sd) + return checkSupportForPlanNode(ctx, n.source.plan, distSQLVisitor, sd) case *groupNode: - rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } @@ -544,12 +547,15 @@ func checkSupportForPlanNode( return cannotDistribute, newQueryNotSupportedErrorf("aggregate %q cannot be executed with distsql", agg.funcName) } } - // Distribute aggregations if possible. - aggRec := shouldDistribute - if n.estimatedInputRowCount != 0 && sd.DistributeGroupByRowCountThreshold > n.estimatedInputRowCount { - // Don't force distribution if we expect to process small number of - // rows. - aggRec = canDistribute + // Don't force distribution if we expect to process small number of + // rows. + aggRec := canDistribute + if n.estimatedInputRowCount == 0 { + log.VEventf(ctx, 2, "aggregation (no stats) recommends plan distribution") + aggRec = shouldDistribute + } else if n.estimatedInputRowCount >= sd.DistributeGroupByRowCountThreshold { + log.VEventf(ctx, 2, "large aggregation recommends plan distribution") + aggRec = shouldDistribute } return rec.compose(aggRec), nil @@ -563,13 +569,13 @@ func checkSupportForPlanNode( } // n.table doesn't have meaningful spans, but we need to check support (e.g. // for any filtering expression). - if _, err := checkSupportForPlanNode(n.table, distSQLVisitor, sd); err != nil { + if _, err := checkSupportForPlanNode(ctx, n.table, distSQLVisitor, sd); err != nil { return cannotDistribute, err } - return checkSupportForPlanNode(n.input, distSQLVisitor, sd) + return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd) case *invertedFilterNode: - return checkSupportForInvertedFilterNode(n, distSQLVisitor, sd) + return checkSupportForInvertedFilterNode(ctx, n, distSQLVisitor, sd) case *invertedJoinNode: if n.table.lockingStrength != descpb.ScanLockingStrength_FOR_NONE { @@ -582,23 +588,24 @@ func checkSupportForPlanNode( if err := checkExprForDistSQL(n.onExpr, distSQLVisitor); err != nil { return cannotDistribute, err } - rec, err := checkSupportForPlanNode(n.input, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } // TODO(yuzefovich): we might want to be smarter about this and don't // force distribution with small inputs. + log.VEventf(ctx, 2, "inverted join recommends plan distribution") return rec.compose(shouldDistribute), nil case *joinNode: if err := checkExprForDistSQL(n.pred.onCond, distSQLVisitor); err != nil { return cannotDistribute, err } - recLeft, err := checkSupportForPlanNode(n.left.plan, distSQLVisitor, sd) + recLeft, err := checkSupportForPlanNode(ctx, n.left.plan, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } - recRight, err := checkSupportForPlanNode(n.right.plan, distSQLVisitor, sd) + recRight, err := checkSupportForPlanNode(ctx, n.right.plan, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } @@ -606,12 +613,10 @@ func checkSupportForPlanNode( if len(n.pred.leftEqualityIndices) > 0 { // We can partition both streams on the equality columns. if n.estimatedLeftRowCount == 0 && n.estimatedRightRowCount == 0 { - // In the absence of stats for both inputs, fall back to - // distributing. + log.VEventf(ctx, 2, "join (no stats) recommends plan distribution") rec = rec.compose(shouldDistribute) } else if n.estimatedLeftRowCount+n.estimatedRightRowCount >= sd.DistributeJoinRowCountThreshold { - // If we have stats on at least one input, then distribute only - // if the join appears to be "large". + log.VEventf(ctx, 2, "large join recommends plan distribution") rec = rec.compose(shouldDistribute) } } @@ -621,7 +626,7 @@ func checkSupportForPlanNode( // Note that we don't need to check whether we support distribution of // n.countExpr or n.offsetExpr because those expressions are evaluated // locally, during the physical planning. - return checkSupportForPlanNode(n.plan, distSQLVisitor, sd) + return checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd) case *lookupJoinNode: if n.remoteLookupExpr != nil || n.remoteOnlyLookups { @@ -645,7 +650,7 @@ func checkSupportForPlanNode( if err := checkExprForDistSQL(n.onCond, distSQLVisitor); err != nil { return cannotDistribute, err } - rec, err := checkSupportForPlanNode(n.input, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } @@ -662,7 +667,7 @@ func checkSupportForPlanNode( return cannotDistribute, err } } - return checkSupportForPlanNode(n.source, distSQLVisitor, sd) + return checkSupportForPlanNode(ctx, n.source, distSQLVisitor, sd) case *renderNode: for _, e := range n.render { @@ -670,7 +675,7 @@ func checkSupportForPlanNode( return cannotDistribute, err } } - return checkSupportForPlanNode(n.source.plan, distSQLVisitor, sd) + return checkSupportForPlanNode(ctx, n.source.plan, distSQLVisitor, sd) case *scanNode: if n.lockingStrength != descpb.ScanLockingStrength_FOR_NONE { @@ -689,39 +694,48 @@ func checkSupportForPlanNode( // here. scanRec := canDistribute if n.estimatedRowCount != 0 && n.estimatedRowCount >= sd.DistributeScanRowCountThreshold { - // This is a large scan, so we choose to distribute it. + log.VEventf(ctx, 2, "large scan recommends plan distribution") scanRec = shouldDistribute } if n.isFull && (n.estimatedRowCount == 0 || sd.AlwaysDistributeFullScans) { // In the absence of table stats, we default to always distributing // full scans. + log.VEventf(ctx, 2, "full scan recommends plan distribution") scanRec = shouldDistribute } return scanRec, nil case *sortNode: - rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } - sortRec := shouldDistribute - if n.estimatedInputRowCount != 0 && sd.DistributeSortRowCountThreshold > n.estimatedInputRowCount { - // Don't force distribution if we expect to process small number of - // rows. - sortRec = canDistribute + // Don't force distribution if we expect to process small number of + // rows. + sortRec := canDistribute + if n.estimatedInputRowCount == 0 { + log.VEventf(ctx, 2, "sort (no stats) recommends plan distribution") + sortRec = shouldDistribute + } else if n.estimatedInputRowCount >= sd.DistributeSortRowCountThreshold { + log.VEventf(ctx, 2, "large sort recommends plan distribution") + sortRec = shouldDistribute } return rec.compose(sortRec), nil case *topKNode: - rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } - topKRec := shouldDistribute - if n.estimatedInputRowCount != 0 && sd.DistributeSortRowCountThreshold > n.estimatedInputRowCount { - // Don't force distribution if we expect to process small number of - // rows. - topKRec = canDistribute + // Don't force distribution if we expect to process small number of + // rows. + topKRec := canDistribute + if n.estimatedInputRowCount == 0 { + log.VEventf(ctx, 2, "top k (no stats) recommends plan distribution") + topKRec = shouldDistribute + } else if n.estimatedInputRowCount >= sd.DistributeSortRowCountThreshold { + log.VEventf(ctx, 2, "large top k recommends plan distribution") + topKRec = shouldDistribute } return rec.compose(topKRec), nil @@ -729,11 +743,11 @@ func checkSupportForPlanNode( return canDistribute, nil case *unionNode: - recLeft, err := checkSupportForPlanNode(n.left, distSQLVisitor, sd) + recLeft, err := checkSupportForPlanNode(ctx, n.left, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } - recRight, err := checkSupportForPlanNode(n.right, distSQLVisitor, sd) + recRight, err := checkSupportForPlanNode(ctx, n.right, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } @@ -757,7 +771,7 @@ func checkSupportForPlanNode( return canDistribute, nil case *windowNode: - rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } @@ -767,6 +781,7 @@ func checkSupportForPlanNode( // should distribute the execution. // TODO(yuzefovich): we might want to be smarter about this and // don't force distribution with small inputs. + log.VEventf(ctx, 2, "window function with PARTITION BY recommends plan distribution") return rec.compose(shouldDistribute), nil } } @@ -790,6 +805,7 @@ func checkSupportForPlanNode( } // TODO(yuzefovich): we might want to be smarter about this and don't // force distribution with small inputs. + log.VEventf(ctx, 2, "zigzag join recommends plan distribution") return shouldDistribute, nil case *cdcValuesNode: return cannotDistribute, nil @@ -800,9 +816,12 @@ func checkSupportForPlanNode( } func checkSupportForInvertedFilterNode( - n *invertedFilterNode, distSQLVisitor *distSQLExprCheckVisitor, sd *sessiondata.SessionData, + ctx context.Context, + n *invertedFilterNode, + distSQLVisitor *distSQLExprCheckVisitor, + sd *sessiondata.SessionData, ) (distRecommendation, error) { - rec, err := checkSupportForPlanNode(n.input, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } @@ -829,6 +848,7 @@ func checkSupportForInvertedFilterNode( if n.expression.Left == nil && n.expression.Right == nil { // TODO(yuzefovich): we might want to be smarter about this and don't // force distribution with small inputs. + log.VEventf(ctx, 2, "inverted filter (union of inverted spans) recommends plan distribution") filterRec = shouldDistribute } return rec.compose(filterRec), nil diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index f1c7da0c379c..47a4946fa4bc 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1973,7 +1973,7 @@ func getPlanDistribution( return physicalplan.LocalPlan, nil } - rec, err := checkSupportForPlanNode(plan.planNode, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, plan.planNode, distSQLVisitor, sd) if err != nil { // Don't use distSQL for this request. log.VEventf(ctx, 1, "query not supported for distSQL: %s", err)