diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 0d9d759165cf..c1878420c596 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -396,7 +396,7 @@ const ( cannotDistribute distRecommendation = iota // canDistribute indicates that a plan can be distributed, but it's not - // clear whether it'll be benefit from that. + // clear whether it'll benefit from that. canDistribute // shouldDistribute indicates that a plan will likely benefit if distributed. @@ -512,7 +512,10 @@ func mustWrapValuesNode(planCtx *PlanningCtx, specifiedInQuery bool) bool { // this plan couldn't be distributed. // TODO(radu): add tests for this. func checkSupportForPlanNode( - node planNode, distSQLVisitor *distSQLExprCheckVisitor, sd *sessiondata.SessionData, + ctx context.Context, + node planNode, + distSQLVisitor *distSQLExprCheckVisitor, + sd *sessiondata.SessionData, ) (distRecommendation, error) { switch n := node.(type) { // Keep these cases alphabetized, please! @@ -523,19 +526,19 @@ func checkSupportForPlanNode( return shouldDistribute, nil case *distinctNode: - return checkSupportForPlanNode(n.plan, distSQLVisitor, sd) + return checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd) case *exportNode: - return checkSupportForPlanNode(n.source, distSQLVisitor, sd) + return checkSupportForPlanNode(ctx, n.source, distSQLVisitor, sd) case *filterNode: if err := checkExprForDistSQL(n.filter, distSQLVisitor); err != nil { return cannotDistribute, err } - return checkSupportForPlanNode(n.source.plan, distSQLVisitor, sd) + return checkSupportForPlanNode(ctx, n.source.plan, distSQLVisitor, sd) case *groupNode: - rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } @@ -544,12 +547,15 @@ func checkSupportForPlanNode( return cannotDistribute, newQueryNotSupportedErrorf("aggregate %q cannot be executed with distsql", agg.funcName) } } - // Distribute aggregations if possible. - aggRec := shouldDistribute - if n.estimatedInputRowCount != 0 && sd.DistributeGroupByRowCountThreshold > n.estimatedInputRowCount { - // Don't force distribution if we expect to process small number of - // rows. - aggRec = canDistribute + // Don't force distribution if we expect to process small number of + // rows. + aggRec := canDistribute + if n.estimatedInputRowCount == 0 { + log.VEventf(ctx, 2, "aggregation (no stats) recommends plan distribution") + aggRec = shouldDistribute + } else if n.estimatedInputRowCount >= sd.DistributeGroupByRowCountThreshold { + log.VEventf(ctx, 2, "large aggregation recommends plan distribution") + aggRec = shouldDistribute } return rec.compose(aggRec), nil @@ -563,13 +569,13 @@ func checkSupportForPlanNode( } // n.table doesn't have meaningful spans, but we need to check support (e.g. // for any filtering expression). - if _, err := checkSupportForPlanNode(n.table, distSQLVisitor, sd); err != nil { + if _, err := checkSupportForPlanNode(ctx, n.table, distSQLVisitor, sd); err != nil { return cannotDistribute, err } - return checkSupportForPlanNode(n.input, distSQLVisitor, sd) + return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd) case *invertedFilterNode: - return checkSupportForInvertedFilterNode(n, distSQLVisitor, sd) + return checkSupportForInvertedFilterNode(ctx, n, distSQLVisitor, sd) case *invertedJoinNode: if n.table.lockingStrength != descpb.ScanLockingStrength_FOR_NONE { @@ -582,30 +588,37 @@ func checkSupportForPlanNode( if err := checkExprForDistSQL(n.onExpr, distSQLVisitor); err != nil { return cannotDistribute, err } - rec, err := checkSupportForPlanNode(n.input, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } + // TODO(yuzefovich): we might want to be smarter about this and don't + // force distribution with small inputs. + log.VEventf(ctx, 2, "inverted join recommends plan distribution") return rec.compose(shouldDistribute), nil case *joinNode: if err := checkExprForDistSQL(n.pred.onCond, distSQLVisitor); err != nil { return cannotDistribute, err } - recLeft, err := checkSupportForPlanNode(n.left.plan, distSQLVisitor, sd) + recLeft, err := checkSupportForPlanNode(ctx, n.left.plan, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } - recRight, err := checkSupportForPlanNode(n.right.plan, distSQLVisitor, sd) + recRight, err := checkSupportForPlanNode(ctx, n.right.plan, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } - // If either the left or the right side can benefit from distribution, we - // should distribute. rec := recLeft.compose(recRight) - // If we can do a hash join, we distribute if possible. if len(n.pred.leftEqualityIndices) > 0 { - rec = rec.compose(shouldDistribute) + // We can partition both streams on the equality columns. + if n.estimatedLeftRowCount == 0 && n.estimatedRightRowCount == 0 { + log.VEventf(ctx, 2, "join (no stats) recommends plan distribution") + rec = rec.compose(shouldDistribute) + } else if n.estimatedLeftRowCount+n.estimatedRightRowCount >= sd.DistributeJoinRowCountThreshold { + log.VEventf(ctx, 2, "large join recommends plan distribution") + rec = rec.compose(shouldDistribute) + } } return rec, nil @@ -613,7 +626,7 @@ func checkSupportForPlanNode( // Note that we don't need to check whether we support distribution of // n.countExpr or n.offsetExpr because those expressions are evaluated // locally, during the physical planning. - return checkSupportForPlanNode(n.plan, distSQLVisitor, sd) + return checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd) case *lookupJoinNode: if n.remoteLookupExpr != nil || n.remoteOnlyLookups { @@ -637,7 +650,7 @@ func checkSupportForPlanNode( if err := checkExprForDistSQL(n.onCond, distSQLVisitor); err != nil { return cannotDistribute, err } - rec, err := checkSupportForPlanNode(n.input, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } @@ -654,7 +667,7 @@ func checkSupportForPlanNode( return cannotDistribute, err } } - return checkSupportForPlanNode(n.source, distSQLVisitor, sd) + return checkSupportForPlanNode(ctx, n.source, distSQLVisitor, sd) case *renderNode: for _, e := range n.render { @@ -662,7 +675,7 @@ func checkSupportForPlanNode( return cannotDistribute, err } } - return checkSupportForPlanNode(n.source.plan, distSQLVisitor, sd) + return checkSupportForPlanNode(ctx, n.source.plan, distSQLVisitor, sd) case *scanNode: if n.lockingStrength != descpb.ScanLockingStrength_FOR_NONE { @@ -681,39 +694,48 @@ func checkSupportForPlanNode( // here. scanRec := canDistribute if n.estimatedRowCount != 0 && n.estimatedRowCount >= sd.DistributeScanRowCountThreshold { - // This is a large scan, so we choose to distribute it. + log.VEventf(ctx, 2, "large scan recommends plan distribution") scanRec = shouldDistribute } if n.isFull && (n.estimatedRowCount == 0 || sd.AlwaysDistributeFullScans) { // In the absence of table stats, we default to always distributing // full scans. + log.VEventf(ctx, 2, "full scan recommends plan distribution") scanRec = shouldDistribute } return scanRec, nil case *sortNode: - rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } - sortRec := shouldDistribute - if n.estimatedInputRowCount != 0 && sd.DistributeSortRowCountThreshold > n.estimatedInputRowCount { - // Don't force distribution if we expect to process small number of - // rows. - sortRec = canDistribute + // Don't force distribution if we expect to process small number of + // rows. + sortRec := canDistribute + if n.estimatedInputRowCount == 0 { + log.VEventf(ctx, 2, "sort (no stats) recommends plan distribution") + sortRec = shouldDistribute + } else if n.estimatedInputRowCount >= sd.DistributeSortRowCountThreshold { + log.VEventf(ctx, 2, "large sort recommends plan distribution") + sortRec = shouldDistribute } return rec.compose(sortRec), nil case *topKNode: - rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } - topKRec := shouldDistribute - if n.estimatedInputRowCount != 0 && sd.DistributeSortRowCountThreshold > n.estimatedInputRowCount { - // Don't force distribution if we expect to process small number of - // rows. - topKRec = canDistribute + // Don't force distribution if we expect to process small number of + // rows. + topKRec := canDistribute + if n.estimatedInputRowCount == 0 { + log.VEventf(ctx, 2, "top k (no stats) recommends plan distribution") + topKRec = shouldDistribute + } else if n.estimatedInputRowCount >= sd.DistributeSortRowCountThreshold { + log.VEventf(ctx, 2, "large top k recommends plan distribution") + topKRec = shouldDistribute } return rec.compose(topKRec), nil @@ -721,11 +743,11 @@ func checkSupportForPlanNode( return canDistribute, nil case *unionNode: - recLeft, err := checkSupportForPlanNode(n.left, distSQLVisitor, sd) + recLeft, err := checkSupportForPlanNode(ctx, n.left, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } - recRight, err := checkSupportForPlanNode(n.right, distSQLVisitor, sd) + recRight, err := checkSupportForPlanNode(ctx, n.right, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } @@ -749,7 +771,7 @@ func checkSupportForPlanNode( return canDistribute, nil case *windowNode: - rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } @@ -757,6 +779,9 @@ func checkSupportForPlanNode( if len(f.partitionIdxs) > 0 { // If at least one function has PARTITION BY clause, then we // should distribute the execution. + // TODO(yuzefovich): we might want to be smarter about this and + // don't force distribution with small inputs. + log.VEventf(ctx, 2, "window function with PARTITION BY recommends plan distribution") return rec.compose(shouldDistribute), nil } } @@ -778,6 +803,9 @@ func checkSupportForPlanNode( if err := checkExprForDistSQL(n.onCond, distSQLVisitor); err != nil { return cannotDistribute, err } + // TODO(yuzefovich): we might want to be smarter about this and don't + // force distribution with small inputs. + log.VEventf(ctx, 2, "zigzag join recommends plan distribution") return shouldDistribute, nil case *cdcValuesNode: return cannotDistribute, nil @@ -788,9 +816,12 @@ func checkSupportForPlanNode( } func checkSupportForInvertedFilterNode( - n *invertedFilterNode, distSQLVisitor *distSQLExprCheckVisitor, sd *sessiondata.SessionData, + ctx context.Context, + n *invertedFilterNode, + distSQLVisitor *distSQLExprCheckVisitor, + sd *sessiondata.SessionData, ) (distRecommendation, error) { - rec, err := checkSupportForPlanNode(n.input, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } @@ -815,6 +846,9 @@ func checkSupportForInvertedFilterNode( // related to #50659. Fix this in the distSQLSpecExecFactory. filterRec := cannotDistribute if n.expression.Left == nil && n.expression.Right == nil { + // TODO(yuzefovich): we might want to be smarter about this and don't + // force distribution with small inputs. + log.VEventf(ctx, 2, "inverted filter (union of inverted spans) recommends plan distribution") filterRec = shouldDistribute } return rec.compose(filterRec), nil @@ -3837,12 +3871,13 @@ func (dsp *DistSQLPlanner) planJoiners( // single processor. sqlInstances = []base.SQLInstanceID{dsp.gatewaySQLInstanceID} - // If either side has a single stream, put the processor on that node. We - // prefer the left side because that is processed first by the hash joiner. - if len(leftRouters) == 1 { - sqlInstances[0] = p.Processors[leftRouters[0]].SQLInstanceID - } else if len(rightRouters) == 1 { + // If either side has a single stream, put the processor on that node. + // We prefer the right side because that is processed first by the hash + // joiner. + if len(rightRouters) == 1 { sqlInstances[0] = p.Processors[rightRouters[0]].SQLInstanceID + } else if len(leftRouters) == 1 { + sqlInstances[0] = p.Processors[leftRouters[0]].SQLInstanceID } } diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index d658ffee7c44..791d3cdbad46 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -425,11 +425,13 @@ func (e *distSQLSpecExecFactory) ConstructHashJoin( leftEqCols, rightEqCols []exec.NodeColumnOrdinal, leftEqColsAreKey, rightEqColsAreKey bool, extraOnCond tree.TypedExpr, + estimatedLeftRowCount, estimatedRightRowCount uint64, ) (exec.Node, error) { return e.constructHashOrMergeJoin( joinType, left, right, extraOnCond, leftEqCols, rightEqCols, leftEqColsAreKey, rightEqColsAreKey, ReqOrdering{} /* mergeJoinOrdering */, exec.OutputOrdering{}, /* reqOrdering */ + estimatedLeftRowCount, estimatedRightRowCount, ) } @@ -440,6 +442,7 @@ func (e *distSQLSpecExecFactory) ConstructMergeJoin( leftOrdering, rightOrdering colinfo.ColumnOrdering, reqOrdering exec.OutputOrdering, leftEqColsAreKey, rightEqColsAreKey bool, + estimatedLeftRowCount, estimatedRightRowCount uint64, ) (exec.Node, error) { leftEqCols, rightEqCols, mergeJoinOrdering, err := getEqualityIndicesAndMergeJoinOrdering(leftOrdering, rightOrdering) if err != nil { @@ -448,6 +451,7 @@ func (e *distSQLSpecExecFactory) ConstructMergeJoin( return e.constructHashOrMergeJoin( joinType, left, right, onCond, leftEqCols, rightEqCols, leftEqColsAreKey, rightEqColsAreKey, mergeJoinOrdering, reqOrdering, + estimatedLeftRowCount, estimatedRightRowCount, ) } @@ -1238,6 +1242,7 @@ func (e *distSQLSpecExecFactory) constructHashOrMergeJoin( leftEqColsAreKey, rightEqColsAreKey bool, mergeJoinOrdering colinfo.ColumnOrdering, reqOrdering exec.OutputOrdering, + estimatedLeftRowCount, estimatedRightRowCount uint64, ) (exec.Node, error) { leftPhysPlan, leftPlan := getPhysPlan(left) rightPhysPlan, rightPlan := getPhysPlan(right) @@ -1251,9 +1256,20 @@ func (e *distSQLSpecExecFactory) constructHashOrMergeJoin( rightPlanToStreamColMap: rightMap, } post, joinToStreamColMap := helper.joinOutColumns(joinType, resultColumns) - // We always try to distribute the join, but planJoiners() itself might - // decide not to. - planCtx := e.getPlanCtx(shouldDistribute) + rec := canDistribute + if len(leftEqCols) > 0 { + // We can partition both streams on the equality columns. + if estimatedLeftRowCount == 0 && estimatedRightRowCount == 0 { + // In the absence of stats for both inputs, fall back to + // distributing. + rec = shouldDistribute + } else if estimatedLeftRowCount+estimatedRightRowCount >= e.planner.SessionData().DistributeGroupByRowCountThreshold { + // If we have stats on at least one input, then distribute only if + // the join appears to be "large". + rec = shouldDistribute + } + } + planCtx := e.getPlanCtx(rec) onExpr, err := helper.remapOnExpr(e.ctx, planCtx, onCond) if err != nil { return nil, err diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 94a9e6446c97..47a4946fa4bc 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1973,7 +1973,7 @@ func getPlanDistribution( return physicalplan.LocalPlan, nil } - rec, err := checkSupportForPlanNode(plan.planNode, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, plan.planNode, distSQLVisitor, sd) if err != nil { // Don't use distSQL for this request. log.VEventf(ctx, 1, "query not supported for distSQL: %s", err) @@ -3343,6 +3343,10 @@ func (m *sessionDataMutator) SetAlwaysDistributeFullScans(val bool) { m.data.AlwaysDistributeFullScans = val } +func (m *sessionDataMutator) SetDistributeJoinRowCountThreshold(val uint64) { + m.data.DistributeJoinRowCountThreshold = val +} + func (m *sessionDataMutator) SetDisableVecUnionEagerCancellation(val bool) { m.data.DisableVecUnionEagerCancellation = val } diff --git a/pkg/sql/join.go b/pkg/sql/join.go index d430918ee9d5..7ae39e6544c1 100644 --- a/pkg/sql/join.go +++ b/pkg/sql/join.go @@ -31,16 +31,28 @@ type joinNode struct { // columns contains the metadata for the results of this node. columns colinfo.ResultColumns + + // estimatedLeftRowCount, when set, is the estimated number of rows that + // the left input will produce. + estimatedLeftRowCount uint64 + // estimatedRightRowCount, when set, is the estimated number of rows that + // the right input will produce. + estimatedRightRowCount uint64 } func (p *planner) makeJoinNode( - left planDataSource, right planDataSource, pred *joinPredicate, + left planDataSource, + right planDataSource, + pred *joinPredicate, + estimatedLeftRowCount, estimatedRightRowCount uint64, ) *joinNode { n := &joinNode{ - left: left, - right: right, - pred: pred, - columns: pred.cols, + left: left, + right: right, + pred: pred, + columns: pred.cols, + estimatedLeftRowCount: estimatedLeftRowCount, + estimatedRightRowCount: estimatedRightRowCount, } return n } diff --git a/pkg/sql/logictest/testdata/logic_test/information_schema b/pkg/sql/logictest/testdata/logic_test/information_schema index 74885d557dfe..90448a5fe555 100644 --- a/pkg/sql/logictest/testdata/logic_test/information_schema +++ b/pkg/sql/logictest/testdata/logic_test/information_schema @@ -3939,6 +3939,7 @@ disable_plan_gists off disable_vec_union_eager_cancellation off disallow_full_table_scans off distribute_group_by_row_count_threshold 1000 +distribute_join_row_count_threshold 1000 distribute_scan_row_count_threshold 10000 distribute_sort_row_count_threshold 1000 distsql_plan_gateway_bias 2 diff --git a/pkg/sql/logictest/testdata/logic_test/pg_catalog b/pkg/sql/logictest/testdata/logic_test/pg_catalog index 73a846d18e1d..4e8b78060568 100644 --- a/pkg/sql/logictest/testdata/logic_test/pg_catalog +++ b/pkg/sql/logictest/testdata/logic_test/pg_catalog @@ -2939,6 +2939,7 @@ disable_plan_gists off N disable_vec_union_eager_cancellation off NULL NULL NULL string disallow_full_table_scans off NULL NULL NULL string distribute_group_by_row_count_threshold 1000 NULL NULL NULL string +distribute_join_row_count_threshold 1000 NULL NULL NULL string distribute_scan_row_count_threshold 10000 NULL NULL NULL string distribute_sort_row_count_threshold 1000 NULL NULL NULL string distsql off NULL NULL NULL string @@ -3141,6 +3142,7 @@ disable_plan_gists off N disable_vec_union_eager_cancellation off NULL user NULL off off disallow_full_table_scans off NULL user NULL off off distribute_group_by_row_count_threshold 1000 NULL user NULL 1000 1000 +distribute_join_row_count_threshold 1000 NULL user NULL 1000 1000 distribute_scan_row_count_threshold 10000 NULL user NULL 10000 10000 distribute_sort_row_count_threshold 1000 NULL user NULL 1000 1000 distsql off NULL user NULL off off @@ -3339,6 +3341,7 @@ disable_plan_gists NULL NULL NULL disable_vec_union_eager_cancellation NULL NULL NULL NULL NULL disallow_full_table_scans NULL NULL NULL NULL NULL distribute_group_by_row_count_threshold NULL NULL NULL NULL NULL +distribute_join_row_count_threshold NULL NULL NULL NULL NULL distribute_scan_row_count_threshold NULL NULL NULL NULL NULL distribute_sort_row_count_threshold NULL NULL NULL NULL NULL distsql NULL NULL NULL NULL NULL diff --git a/pkg/sql/logictest/testdata/logic_test/show_source b/pkg/sql/logictest/testdata/logic_test/show_source index c644b325084c..5511dc0ddb39 100644 --- a/pkg/sql/logictest/testdata/logic_test/show_source +++ b/pkg/sql/logictest/testdata/logic_test/show_source @@ -70,6 +70,7 @@ disable_plan_gists off disable_vec_union_eager_cancellation off disallow_full_table_scans off distribute_group_by_row_count_threshold 1000 +distribute_join_row_count_threshold 1000 distribute_scan_row_count_threshold 10000 distribute_sort_row_count_threshold 1000 distsql off diff --git a/pkg/sql/opt/exec/execbuilder/relational.go b/pkg/sql/opt/exec/execbuilder/relational.go index 8b4963fc8e79..aa188c2f4a51 100644 --- a/pkg/sql/opt/exec/execbuilder/relational.go +++ b/pkg/sql/opt/exec/execbuilder/relational.go @@ -1356,6 +1356,10 @@ func (b *Builder) buildHashJoin(join memo.RelExpr) (_ execPlan, outputCols colOr // to apply join hints to semi or anti joins. Join hints are only // possible on explicit joins using the JOIN keyword, and semi and anti // joins are only created from implicit joins without the JOIN keyword. + // + // Note that we use the row count estimates even when stats are + // unavailable so that the input that is more likely to be smaller ended + // up on the right side. leftRowCount := leftExpr.Relational().Statistics().RowCount rightRowCount := rightExpr.Relational().Statistics().RowCount if leftRowCount < rightRowCount { @@ -1430,6 +1434,13 @@ func (b *Builder) buildHashJoin(join memo.RelExpr) (_ execPlan, outputCols colOr leftEqColsAreKey := leftExpr.Relational().FuncDeps.ColsAreStrictKey(leftEq.ToSet()) rightEqColsAreKey := rightExpr.Relational().FuncDeps.ColsAreStrictKey(rightEq.ToSet()) + var leftRowCount, rightRowCount uint64 + if leftExpr.Relational().Statistics().Available { + leftRowCount = uint64(leftExpr.Relational().Statistics().RowCount) + } + if rightExpr.Relational().Statistics().Available { + rightRowCount = uint64(rightExpr.Relational().Statistics().RowCount) + } b.recordJoinType(joinType) if isCrossJoin { @@ -1444,6 +1455,7 @@ func (b *Builder) buildHashJoin(join memo.RelExpr) (_ execPlan, outputCols colOr leftEqOrdinals, rightEqOrdinals, leftEqColsAreKey, rightEqColsAreKey, onExpr, + leftRowCount, rightRowCount, ) if err != nil { return execPlan{}, colOrdMap{}, err @@ -1470,6 +1482,10 @@ func (b *Builder) buildMergeJoin( // We have a partial join, and we want to make sure that the relation // with smaller cardinality is on the right side. Note that we assumed // it during the costing. + // + // Note that we use the row count estimates even when stats are + // unavailable so that the input that is more likely to be smaller ended + // up on the right side. // TODO(raduberinde): we might also need to look at memo.JoinFlags when // choosing a side. leftRowCount := leftExpr.Relational().Statistics().RowCount @@ -1527,6 +1543,13 @@ func (b *Builder) buildMergeJoin( } leftEqColsAreKey := leftExpr.Relational().FuncDeps.ColsAreStrictKey(leftEq.ColSet()) rightEqColsAreKey := rightExpr.Relational().FuncDeps.ColsAreStrictKey(rightEq.ColSet()) + var leftRowCount, rightRowCount uint64 + if leftExpr.Relational().Statistics().Available { + leftRowCount = uint64(leftExpr.Relational().Statistics().RowCount) + } + if rightExpr.Relational().Statistics().Available { + rightRowCount = uint64(rightExpr.Relational().Statistics().RowCount) + } b.recordJoinType(joinType) b.recordJoinAlgorithm(exec.MergeJoin) var ep execPlan @@ -1536,6 +1559,7 @@ func (b *Builder) buildMergeJoin( onExpr, leftOrd, rightOrd, reqOrd, leftEqColsAreKey, rightEqColsAreKey, + leftRowCount, rightRowCount, ) if err != nil { return execPlan{}, colOrdMap{}, err diff --git a/pkg/sql/opt/exec/execbuilder/testdata/distsql_auto_mode b/pkg/sql/opt/exec/execbuilder/testdata/distsql_auto_mode index 262b19ce8eb6..27e55ab19aa4 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/distsql_auto_mode +++ b/pkg/sql/opt/exec/execbuilder/testdata/distsql_auto_mode @@ -27,6 +27,24 @@ SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1] WHERE info LIKE 'distribut ---- distribution: local +# Sort (no stats) - distribute. +query T +SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1 ORDER BY v] WHERE info LIKE 'distribution%' +---- +distribution: full + +# Hash join (no stats) - distribute. +query T +SELECT info FROM [EXPLAIN SELECT * FROM kv AS t1, kv AS t2 WHERE t1.k = t2.k+1 AND t1.k>1 AND t2.k>1] WHERE info LIKE 'distribution%' +---- +distribution: full + +# Cross join (no stats) - don't distribute. +query T +SELECT info FROM [EXPLAIN SELECT * FROM kv AS t1, kv AS t2 WHERE t1.k>1 AND t2.k>1] WHERE info LIKE 'distribution%' +---- +distribution: local + statement ok ALTER TABLE kv INJECT STATISTICS '[ { @@ -186,20 +204,45 @@ SELECT info FROM [EXPLAIN SELECT k, sum(v) FROM kv WHERE k>1 GROUP BY k LIMIT 1] ---- distribution: full +# Hash join over large inputs - distribute. +query T +SELECT info FROM [EXPLAIN SELECT * FROM kv AS t1, kv AS t2 WHERE t1.k = t2.k+1 AND t1.k>1 AND t2.k>1] WHERE info LIKE 'distribution%' +---- +distribution: full + +# Cross join over large inputs - don't distribute. +query T +SELECT info FROM [EXPLAIN SELECT * FROM kv AS t1, kv AS t2 WHERE t1.k>1 AND t2.k>1] WHERE info LIKE 'distribution%' +---- +distribution: local + +# Now consider the inputs small. +statement ok +SET distribute_join_row_count_threshold = 100000; + +# Hash join over small inputs - don't distribute. +query T +SELECT info FROM [EXPLAIN SELECT * FROM kv AS t1, kv AS t2 WHERE t1.k = t2.k+1 AND t1.k>1 AND t2.k>1] WHERE info LIKE 'distribution%' +---- +distribution: local + +statement ok +RESET distribute_join_row_count_threshold; + statement ok CREATE TABLE kw (k INT PRIMARY KEY, w INT); ALTER TABLE kw SPLIT AT SELECT i FROM generate_series(1,5) AS g(i); ALTER TABLE kw EXPERIMENTAL_RELOCATE SELECT ARRAY[i], i FROM generate_series(1, 5) as g(i) -# Join - distribute. +# Large hash join - distribute. query T SELECT info FROM [EXPLAIN SELECT * FROM kv NATURAL JOIN kw] WHERE info LIKE 'distribution%' ---- distribution: full -# Join with the data living on the remote node - distribute. +# Large hash join with the data living on the remote node - distribute. query T -SELECT info FROM [EXPLAIN SELECT * FROM kv NATURAL JOIN kw WHERE k=2] WHERE info LIKE 'distribution%' +SELECT info FROM [EXPLAIN SELECT * FROM kv NATURAL JOIN kw WHERE k=2 OR k>5] WHERE info LIKE 'distribution%' ---- distribution: full diff --git a/pkg/sql/opt/exec/execbuilder/testdata/distsql_join b/pkg/sql/opt/exec/execbuilder/testdata/distsql_join index 0852a05cc06b..0423b05bf73e 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/distsql_join +++ b/pkg/sql/opt/exec/execbuilder/testdata/distsql_join @@ -282,3 +282,32 @@ EXPLAIN (VEC) ) AND c_custkey = o_custkey; RESET vectorize + +statement ok +CREATE TABLE kv (k INT PRIMARY KEY, v INT); +ALTER TABLE kv SPLIT AT SELECT i FROM generate_series(1,5) AS g(i); +ALTER TABLE kv EXPERIMENTAL_RELOCATE SELECT ARRAY[i], i FROM generate_series(1, 5) as g(i); + +statement ok +ALTER TABLE kv INJECT STATISTICS '[ + { + "columns": ["k"], + "created_at": "2024-01-01 1:00:00.00000+00:00", + "row_count": 1000, + "distinct_count": 1000 + } +]'; + +# Force the distribution of the next plan. +statement ok +SET distribute_scan_row_count_threshold = 1; + +# Verify that when merging plans where each has a single result router (n3 and +# n4), we plan the joiner on the same node as the right input (n4). +query T +SELECT * FROM [EXPLAIN (DISTSQL) SELECT * FROM kv AS t1, kv AS t2 WHERE t1.k = 3 AND t2.k = 4] WHERE info LIKE 'Diagram%'; +---- +Diagram: https://cockroachdb.github.io/distsqlplan/decode.html#eJycklFv2jAUhd_3K6wrTYXJjDjJk6VJoJKp2Sh0BGmTKlR55EKthDizHboK8d-nhLKSqKTd_BDl2sffPcf2DsyvFDhEwTi4nJMP5PNsek1ugx8342E4IZ1RGM2jb-MuqQuSLRlGxDJ6_HPJ96tgFpCOZR8T8ol4XTKcjEjHulXpdxdPApmtFBmHXwNyMZJircXm_QVQyFSME7FBA_wWGFDwgIIPCwq5Vks0RulyaVcJw_g3cIeCzPLCltMLCkulEfgOrLQpAoeJ6qm8X1JitEKmlWxPQRX2eZOxYo3A_ZMu4Qi4t6cnjVh7o7n4meIMRYy679TaQbIdJNu7PMFHoHCp0mKTGU4SSrZAIcpFWfW9vgPnjLGGMadmzH27MfYfxvwWY27DGDtr7NlPkSkdo8a4eSevS15IdyXM_RclM9R9tx5uWlhOBowOXDrw6MA_G8JrhHBrIV55XzM0ucoMvumBOY1OPVZGwniNhyMyqtBLvNFqWWkP5bQCVRMxGntY9Q9FmB2XjNUoNn8fxymJtZK8Gom1ktx_ILmnJNYkea0k53w6tzyxVaoe7mQMHJyn0XvhcxxQbhBrU15bdK8eKuz8MS8PfSVSgxSuRYIjtKg3MpPGyiVwqwvc79_9CQAA__-phZ3F + +statement ok +RESET distribute_scan_row_count_threshold; diff --git a/pkg/sql/opt/exec/factory.opt b/pkg/sql/opt/exec/factory.opt index 0c99ee15df37..8b8eecec3c86 100644 --- a/pkg/sql/opt/exec/factory.opt +++ b/pkg/sql/opt/exec/factory.opt @@ -100,6 +100,8 @@ define HashJoin { LeftEqColsAreKey bool RightEqColsAreKey bool ExtraOnCond tree.TypedExpr + EstimatedLeftRowCount uint64 + EstimatedRightRowCount uint64 } # MergeJoin runs a merge join. @@ -118,6 +120,8 @@ define MergeJoin { ReqOrdering exec.OutputOrdering LeftEqColsAreKey bool RightEqColsAreKey bool + EstimatedLeftRowCount uint64 + EstimatedRightRowCount uint64 } # GroupBy runs an aggregation. A set of aggregations is performed for each group diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index 7f06c2ef273a..a8dbcd473afd 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -415,6 +415,7 @@ func (ef *execFactory) ConstructHashJoin( leftEqCols, rightEqCols []exec.NodeColumnOrdinal, leftEqColsAreKey, rightEqColsAreKey bool, extraOnCond tree.TypedExpr, + estimatedLeftRowCount, estimatedRightRowCount uint64, ) (exec.Node, error) { p := ef.planner leftSrc := asDataSource(left) @@ -435,7 +436,7 @@ func (ef *execFactory) ConstructHashJoin( pred.leftEqKey = leftEqColsAreKey pred.rightEqKey = rightEqColsAreKey - return p.makeJoinNode(leftSrc, rightSrc, pred), nil + return p.makeJoinNode(leftSrc, rightSrc, pred, estimatedLeftRowCount, estimatedRightRowCount), nil } // ConstructApplyJoin is part of the exec.Factory interface. @@ -459,13 +460,14 @@ func (ef *execFactory) ConstructMergeJoin( leftOrdering, rightOrdering colinfo.ColumnOrdering, reqOrdering exec.OutputOrdering, leftEqColsAreKey, rightEqColsAreKey bool, + estimatedLeftRowCount, estimatedRightRowCount uint64, ) (exec.Node, error) { var err error p := ef.planner leftSrc := asDataSource(left) rightSrc := asDataSource(right) pred := makePredicate(joinType, leftSrc.columns, rightSrc.columns, onCond) - node := p.makeJoinNode(leftSrc, rightSrc, pred) + node := p.makeJoinNode(leftSrc, rightSrc, pred, estimatedLeftRowCount, estimatedRightRowCount) pred.leftEqKey = leftEqColsAreKey pred.rightEqKey = rightEqColsAreKey diff --git a/pkg/sql/sessiondatapb/local_only_session_data.proto b/pkg/sql/sessiondatapb/local_only_session_data.proto index 25be303a9b69..77f6e1e72667 100644 --- a/pkg/sql/sessiondatapb/local_only_session_data.proto +++ b/pkg/sql/sessiondatapb/local_only_session_data.proto @@ -578,6 +578,10 @@ message LocalOnlySessionData { // AlwaysDistributeFullScans determines whether full table scans always force // the plan to be distributed, regardless of the estimated row count. bool always_distribute_full_scans = 148; + // DistributeJoinRowCountThreshold is the minimum number of rows estimated to + // be processed from both inputs by the hash or merge join so that we choose + // to distribute the plan because of this joiner stage of DistSQL processors. + uint64 distribute_join_row_count_threshold = 149; /////////////////////////////////////////////////////////////////////////// // WARNING: consider whether a session parameter you're adding needs to // diff --git a/pkg/sql/vars.go b/pkg/sql/vars.go index acf9f226c4d9..66bfc19f8183 100644 --- a/pkg/sql/vars.go +++ b/pkg/sql/vars.go @@ -718,6 +718,29 @@ var varGen = map[string]sessionVar{ GlobalDefault: globalFalse, }, + // CockroachDB extension. + `distribute_join_row_count_threshold`: { + Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) { + return strconv.FormatUint(evalCtx.SessionData().DistributeJoinRowCountThreshold, 10), nil + }, + GetStringVal: makeIntGetStringValFn(`distribute_join_row_count_threshold`), + Set: func(_ context.Context, m sessionDataMutator, s string) error { + i, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return err + } + if i < 0 { + return pgerror.Newf(pgcode.InvalidParameterValue, + "cannot set distribute_join_row_count_threshold to a negative value: %d", i) + } + m.SetDistributeJoinRowCountThreshold(uint64(i)) + return nil + }, + GlobalDefault: func(sv *settings.Values) string { + return strconv.FormatUint(1000, 10) + }, + }, + // CockroachDB extension. `disable_vec_union_eager_cancellation`: { GetStringVal: makePostgresBoolGetStringValFn(`disable_vec_union_eager_cancellation`),