Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: adjust physical planning heuristics around small joins #137562

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 84 additions & 49 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ const (
cannotDistribute distRecommendation = iota

// canDistribute indicates that a plan can be distributed, but it's not
// clear whether it'll be benefit from that.
// clear whether it'll benefit from that.
canDistribute

// shouldDistribute indicates that a plan will likely benefit if distributed.
Expand Down Expand Up @@ -512,7 +512,10 @@ func mustWrapValuesNode(planCtx *PlanningCtx, specifiedInQuery bool) bool {
// this plan couldn't be distributed.
// TODO(radu): add tests for this.
func checkSupportForPlanNode(
node planNode, distSQLVisitor *distSQLExprCheckVisitor, sd *sessiondata.SessionData,
ctx context.Context,
node planNode,
distSQLVisitor *distSQLExprCheckVisitor,
sd *sessiondata.SessionData,
) (distRecommendation, error) {
switch n := node.(type) {
// Keep these cases alphabetized, please!
Expand All @@ -523,19 +526,19 @@ func checkSupportForPlanNode(
return shouldDistribute, nil

case *distinctNode:
return checkSupportForPlanNode(n.plan, distSQLVisitor, sd)
return checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd)

case *exportNode:
return checkSupportForPlanNode(n.source, distSQLVisitor, sd)
return checkSupportForPlanNode(ctx, n.source, distSQLVisitor, sd)

case *filterNode:
if err := checkExprForDistSQL(n.filter, distSQLVisitor); err != nil {
return cannotDistribute, err
}
return checkSupportForPlanNode(n.source.plan, distSQLVisitor, sd)
return checkSupportForPlanNode(ctx, n.source.plan, distSQLVisitor, sd)

case *groupNode:
rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor, sd)
rec, err := checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -544,12 +547,15 @@ func checkSupportForPlanNode(
return cannotDistribute, newQueryNotSupportedErrorf("aggregate %q cannot be executed with distsql", agg.funcName)
}
}
// Distribute aggregations if possible.
aggRec := shouldDistribute
if n.estimatedInputRowCount != 0 && sd.DistributeGroupByRowCountThreshold > n.estimatedInputRowCount {
// Don't force distribution if we expect to process small number of
// rows.
aggRec = canDistribute
// Don't force distribution if we expect to process small number of
// rows.
aggRec := canDistribute
if n.estimatedInputRowCount == 0 {
log.VEventf(ctx, 2, "aggregation (no stats) recommends plan distribution")
aggRec = shouldDistribute
} else if n.estimatedInputRowCount >= sd.DistributeGroupByRowCountThreshold {
log.VEventf(ctx, 2, "large aggregation recommends plan distribution")
aggRec = shouldDistribute
}
return rec.compose(aggRec), nil

Expand All @@ -563,13 +569,13 @@ func checkSupportForPlanNode(
}
// n.table doesn't have meaningful spans, but we need to check support (e.g.
// for any filtering expression).
if _, err := checkSupportForPlanNode(n.table, distSQLVisitor, sd); err != nil {
if _, err := checkSupportForPlanNode(ctx, n.table, distSQLVisitor, sd); err != nil {
return cannotDistribute, err
}
return checkSupportForPlanNode(n.input, distSQLVisitor, sd)
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)

case *invertedFilterNode:
return checkSupportForInvertedFilterNode(n, distSQLVisitor, sd)
return checkSupportForInvertedFilterNode(ctx, n, distSQLVisitor, sd)

case *invertedJoinNode:
if n.table.lockingStrength != descpb.ScanLockingStrength_FOR_NONE {
Expand All @@ -582,38 +588,45 @@ func checkSupportForPlanNode(
if err := checkExprForDistSQL(n.onExpr, distSQLVisitor); err != nil {
return cannotDistribute, err
}
rec, err := checkSupportForPlanNode(n.input, distSQLVisitor, sd)
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
if err != nil {
return cannotDistribute, err
}
// TODO(yuzefovich): we might want to be smarter about this and don't
// force distribution with small inputs.
log.VEventf(ctx, 2, "inverted join recommends plan distribution")
return rec.compose(shouldDistribute), nil

case *joinNode:
if err := checkExprForDistSQL(n.pred.onCond, distSQLVisitor); err != nil {
return cannotDistribute, err
}
recLeft, err := checkSupportForPlanNode(n.left.plan, distSQLVisitor, sd)
recLeft, err := checkSupportForPlanNode(ctx, n.left.plan, distSQLVisitor, sd)
if err != nil {
return cannotDistribute, err
}
recRight, err := checkSupportForPlanNode(n.right.plan, distSQLVisitor, sd)
recRight, err := checkSupportForPlanNode(ctx, n.right.plan, distSQLVisitor, sd)
if err != nil {
return cannotDistribute, err
}
// If either the left or the right side can benefit from distribution, we
// should distribute.
rec := recLeft.compose(recRight)
// If we can do a hash join, we distribute if possible.
if len(n.pred.leftEqualityIndices) > 0 {
rec = rec.compose(shouldDistribute)
// We can partition both streams on the equality columns.
if n.estimatedLeftRowCount == 0 && n.estimatedRightRowCount == 0 {
log.VEventf(ctx, 2, "join (no stats) recommends plan distribution")
rec = rec.compose(shouldDistribute)
} else if n.estimatedLeftRowCount+n.estimatedRightRowCount >= sd.DistributeJoinRowCountThreshold {
log.VEventf(ctx, 2, "large join recommends plan distribution")
rec = rec.compose(shouldDistribute)
}
}
return rec, nil

case *limitNode:
// Note that we don't need to check whether we support distribution of
// n.countExpr or n.offsetExpr because those expressions are evaluated
// locally, during the physical planning.
return checkSupportForPlanNode(n.plan, distSQLVisitor, sd)
return checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd)

case *lookupJoinNode:
if n.remoteLookupExpr != nil || n.remoteOnlyLookups {
Expand All @@ -637,7 +650,7 @@ func checkSupportForPlanNode(
if err := checkExprForDistSQL(n.onCond, distSQLVisitor); err != nil {
return cannotDistribute, err
}
rec, err := checkSupportForPlanNode(n.input, distSQLVisitor, sd)
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -654,15 +667,15 @@ func checkSupportForPlanNode(
return cannotDistribute, err
}
}
return checkSupportForPlanNode(n.source, distSQLVisitor, sd)
return checkSupportForPlanNode(ctx, n.source, distSQLVisitor, sd)

case *renderNode:
for _, e := range n.render {
if err := checkExprForDistSQL(e, distSQLVisitor); err != nil {
return cannotDistribute, err
}
}
return checkSupportForPlanNode(n.source.plan, distSQLVisitor, sd)
return checkSupportForPlanNode(ctx, n.source.plan, distSQLVisitor, sd)

case *scanNode:
if n.lockingStrength != descpb.ScanLockingStrength_FOR_NONE {
Expand All @@ -681,51 +694,60 @@ func checkSupportForPlanNode(
// here.
scanRec := canDistribute
if n.estimatedRowCount != 0 && n.estimatedRowCount >= sd.DistributeScanRowCountThreshold {
// This is a large scan, so we choose to distribute it.
log.VEventf(ctx, 2, "large scan recommends plan distribution")
scanRec = shouldDistribute
}
if n.isFull && (n.estimatedRowCount == 0 || sd.AlwaysDistributeFullScans) {
// In the absence of table stats, we default to always distributing
// full scans.
log.VEventf(ctx, 2, "full scan recommends plan distribution")
scanRec = shouldDistribute
}
return scanRec, nil

case *sortNode:
rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor, sd)
rec, err := checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd)
if err != nil {
return cannotDistribute, err
}
sortRec := shouldDistribute
if n.estimatedInputRowCount != 0 && sd.DistributeSortRowCountThreshold > n.estimatedInputRowCount {
// Don't force distribution if we expect to process small number of
// rows.
sortRec = canDistribute
// Don't force distribution if we expect to process small number of
// rows.
sortRec := canDistribute
if n.estimatedInputRowCount == 0 {
log.VEventf(ctx, 2, "sort (no stats) recommends plan distribution")
sortRec = shouldDistribute
} else if n.estimatedInputRowCount >= sd.DistributeSortRowCountThreshold {
log.VEventf(ctx, 2, "large sort recommends plan distribution")
sortRec = shouldDistribute
}
return rec.compose(sortRec), nil

case *topKNode:
rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor, sd)
rec, err := checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd)
if err != nil {
return cannotDistribute, err
}
topKRec := shouldDistribute
if n.estimatedInputRowCount != 0 && sd.DistributeSortRowCountThreshold > n.estimatedInputRowCount {
// Don't force distribution if we expect to process small number of
// rows.
topKRec = canDistribute
// Don't force distribution if we expect to process small number of
// rows.
topKRec := canDistribute
if n.estimatedInputRowCount == 0 {
log.VEventf(ctx, 2, "top k (no stats) recommends plan distribution")
topKRec = shouldDistribute
} else if n.estimatedInputRowCount >= sd.DistributeSortRowCountThreshold {
log.VEventf(ctx, 2, "large top k recommends plan distribution")
topKRec = shouldDistribute
}
return rec.compose(topKRec), nil

case *unaryNode:
return canDistribute, nil

case *unionNode:
recLeft, err := checkSupportForPlanNode(n.left, distSQLVisitor, sd)
recLeft, err := checkSupportForPlanNode(ctx, n.left, distSQLVisitor, sd)
if err != nil {
return cannotDistribute, err
}
recRight, err := checkSupportForPlanNode(n.right, distSQLVisitor, sd)
recRight, err := checkSupportForPlanNode(ctx, n.right, distSQLVisitor, sd)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -749,14 +771,17 @@ func checkSupportForPlanNode(
return canDistribute, nil

case *windowNode:
rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor, sd)
rec, err := checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd)
if err != nil {
return cannotDistribute, err
}
for _, f := range n.funcs {
if len(f.partitionIdxs) > 0 {
// If at least one function has PARTITION BY clause, then we
// should distribute the execution.
// TODO(yuzefovich): we might want to be smarter about this and
// don't force distribution with small inputs.
log.VEventf(ctx, 2, "window function with PARTITION BY recommends plan distribution")
return rec.compose(shouldDistribute), nil
}
}
Expand All @@ -778,6 +803,9 @@ func checkSupportForPlanNode(
if err := checkExprForDistSQL(n.onCond, distSQLVisitor); err != nil {
return cannotDistribute, err
}
// TODO(yuzefovich): we might want to be smarter about this and don't
// force distribution with small inputs.
log.VEventf(ctx, 2, "zigzag join recommends plan distribution")
return shouldDistribute, nil
case *cdcValuesNode:
return cannotDistribute, nil
Expand All @@ -788,9 +816,12 @@ func checkSupportForPlanNode(
}

func checkSupportForInvertedFilterNode(
n *invertedFilterNode, distSQLVisitor *distSQLExprCheckVisitor, sd *sessiondata.SessionData,
ctx context.Context,
n *invertedFilterNode,
distSQLVisitor *distSQLExprCheckVisitor,
sd *sessiondata.SessionData,
) (distRecommendation, error) {
rec, err := checkSupportForPlanNode(n.input, distSQLVisitor, sd)
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -815,6 +846,9 @@ func checkSupportForInvertedFilterNode(
// related to #50659. Fix this in the distSQLSpecExecFactory.
filterRec := cannotDistribute
if n.expression.Left == nil && n.expression.Right == nil {
// TODO(yuzefovich): we might want to be smarter about this and don't
// force distribution with small inputs.
log.VEventf(ctx, 2, "inverted filter (union of inverted spans) recommends plan distribution")
filterRec = shouldDistribute
}
return rec.compose(filterRec), nil
Expand Down Expand Up @@ -3837,12 +3871,13 @@ func (dsp *DistSQLPlanner) planJoiners(
// single processor.
sqlInstances = []base.SQLInstanceID{dsp.gatewaySQLInstanceID}

// If either side has a single stream, put the processor on that node. We
// prefer the left side because that is processed first by the hash joiner.
if len(leftRouters) == 1 {
sqlInstances[0] = p.Processors[leftRouters[0]].SQLInstanceID
} else if len(rightRouters) == 1 {
// If either side has a single stream, put the processor on that node.
// We prefer the right side because that is processed first by the hash
// joiner.
if len(rightRouters) == 1 {
sqlInstances[0] = p.Processors[rightRouters[0]].SQLInstanceID
} else if len(leftRouters) == 1 {
sqlInstances[0] = p.Processors[leftRouters[0]].SQLInstanceID
}
}

Expand Down
22 changes: 19 additions & 3 deletions pkg/sql/distsql_spec_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,11 +425,13 @@ func (e *distSQLSpecExecFactory) ConstructHashJoin(
leftEqCols, rightEqCols []exec.NodeColumnOrdinal,
leftEqColsAreKey, rightEqColsAreKey bool,
extraOnCond tree.TypedExpr,
estimatedLeftRowCount, estimatedRightRowCount uint64,
) (exec.Node, error) {
return e.constructHashOrMergeJoin(
joinType, left, right, extraOnCond, leftEqCols, rightEqCols,
leftEqColsAreKey, rightEqColsAreKey,
ReqOrdering{} /* mergeJoinOrdering */, exec.OutputOrdering{}, /* reqOrdering */
estimatedLeftRowCount, estimatedRightRowCount,
)
}

Expand All @@ -440,6 +442,7 @@ func (e *distSQLSpecExecFactory) ConstructMergeJoin(
leftOrdering, rightOrdering colinfo.ColumnOrdering,
reqOrdering exec.OutputOrdering,
leftEqColsAreKey, rightEqColsAreKey bool,
estimatedLeftRowCount, estimatedRightRowCount uint64,
) (exec.Node, error) {
leftEqCols, rightEqCols, mergeJoinOrdering, err := getEqualityIndicesAndMergeJoinOrdering(leftOrdering, rightOrdering)
if err != nil {
Expand All @@ -448,6 +451,7 @@ func (e *distSQLSpecExecFactory) ConstructMergeJoin(
return e.constructHashOrMergeJoin(
joinType, left, right, onCond, leftEqCols, rightEqCols,
leftEqColsAreKey, rightEqColsAreKey, mergeJoinOrdering, reqOrdering,
estimatedLeftRowCount, estimatedRightRowCount,
)
}

Expand Down Expand Up @@ -1238,6 +1242,7 @@ func (e *distSQLSpecExecFactory) constructHashOrMergeJoin(
leftEqColsAreKey, rightEqColsAreKey bool,
mergeJoinOrdering colinfo.ColumnOrdering,
reqOrdering exec.OutputOrdering,
estimatedLeftRowCount, estimatedRightRowCount uint64,
) (exec.Node, error) {
leftPhysPlan, leftPlan := getPhysPlan(left)
rightPhysPlan, rightPlan := getPhysPlan(right)
Expand All @@ -1251,9 +1256,20 @@ func (e *distSQLSpecExecFactory) constructHashOrMergeJoin(
rightPlanToStreamColMap: rightMap,
}
post, joinToStreamColMap := helper.joinOutColumns(joinType, resultColumns)
// We always try to distribute the join, but planJoiners() itself might
// decide not to.
planCtx := e.getPlanCtx(shouldDistribute)
rec := canDistribute
if len(leftEqCols) > 0 {
// We can partition both streams on the equality columns.
if estimatedLeftRowCount == 0 && estimatedRightRowCount == 0 {
// In the absence of stats for both inputs, fall back to
// distributing.
rec = shouldDistribute
} else if estimatedLeftRowCount+estimatedRightRowCount >= e.planner.SessionData().DistributeGroupByRowCountThreshold {
// If we have stats on at least one input, then distribute only if
// the join appears to be "large".
rec = shouldDistribute
}
}
planCtx := e.getPlanCtx(rec)
onExpr, err := helper.remapOnExpr(e.ctx, planCtx, onCond)
if err != nil {
return nil, err
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1973,7 +1973,7 @@ func getPlanDistribution(
return physicalplan.LocalPlan, nil
}

rec, err := checkSupportForPlanNode(plan.planNode, distSQLVisitor, sd)
rec, err := checkSupportForPlanNode(ctx, plan.planNode, distSQLVisitor, sd)
if err != nil {
// Don't use distSQL for this request.
log.VEventf(ctx, 1, "query not supported for distSQL: %s", err)
Expand Down Expand Up @@ -3343,6 +3343,10 @@ func (m *sessionDataMutator) SetAlwaysDistributeFullScans(val bool) {
m.data.AlwaysDistributeFullScans = val
}

func (m *sessionDataMutator) SetDistributeJoinRowCountThreshold(val uint64) {
m.data.DistributeJoinRowCountThreshold = val
}

func (m *sessionDataMutator) SetDisableVecUnionEagerCancellation(val bool) {
m.data.DisableVecUnionEagerCancellation = val
}
Expand Down
Loading
Loading