Skip to content

Commit

Permalink
sql: add some logging to physical planning
Browse files Browse the repository at this point in the history
This commit introduces some logging, hidden behind a verbosity level, to
the physical planning whenever we choose `shouldDistribute`
recommendation. This will show up in the trace and should be helpful to
understand why we decided to distribute a particular query.

Release note: None
  • Loading branch information
yuzefovich committed Dec 16, 2024
1 parent 355fb0b commit 320e6c9
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 45 deletions.
108 changes: 64 additions & 44 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ const (
cannotDistribute distRecommendation = iota

// canDistribute indicates that a plan can be distributed, but it's not
// clear whether it'll be benefit from that.
// clear whether it'll benefit from that.
canDistribute

// shouldDistribute indicates that a plan will likely benefit if distributed.
Expand Down Expand Up @@ -512,7 +512,10 @@ func mustWrapValuesNode(planCtx *PlanningCtx, specifiedInQuery bool) bool {
// this plan couldn't be distributed.
// TODO(radu): add tests for this.
func checkSupportForPlanNode(
node planNode, distSQLVisitor *distSQLExprCheckVisitor, sd *sessiondata.SessionData,
ctx context.Context,
node planNode,
distSQLVisitor *distSQLExprCheckVisitor,
sd *sessiondata.SessionData,
) (distRecommendation, error) {
switch n := node.(type) {
// Keep these cases alphabetized, please!
Expand All @@ -523,19 +526,19 @@ func checkSupportForPlanNode(
return shouldDistribute, nil

case *distinctNode:
return checkSupportForPlanNode(n.plan, distSQLVisitor, sd)
return checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd)

case *exportNode:
return checkSupportForPlanNode(n.source, distSQLVisitor, sd)
return checkSupportForPlanNode(ctx, n.source, distSQLVisitor, sd)

case *filterNode:
if err := checkExprForDistSQL(n.filter, distSQLVisitor); err != nil {
return cannotDistribute, err
}
return checkSupportForPlanNode(n.source.plan, distSQLVisitor, sd)
return checkSupportForPlanNode(ctx, n.source.plan, distSQLVisitor, sd)

case *groupNode:
rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor, sd)
rec, err := checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -544,12 +547,15 @@ func checkSupportForPlanNode(
return cannotDistribute, newQueryNotSupportedErrorf("aggregate %q cannot be executed with distsql", agg.funcName)
}
}
// Distribute aggregations if possible.
aggRec := shouldDistribute
if n.estimatedInputRowCount != 0 && sd.DistributeGroupByRowCountThreshold > n.estimatedInputRowCount {
// Don't force distribution if we expect to process small number of
// rows.
aggRec = canDistribute
// Don't force distribution if we expect to process small number of
// rows.
aggRec := canDistribute
if n.estimatedInputRowCount == 0 {
log.VEventf(ctx, 2, "aggregation (no stats) recommends plan distribution")
aggRec = shouldDistribute
} else if n.estimatedInputRowCount >= sd.DistributeGroupByRowCountThreshold {
log.VEventf(ctx, 2, "large aggregation recommends plan distribution")
aggRec = shouldDistribute
}
return rec.compose(aggRec), nil

Expand All @@ -563,13 +569,13 @@ func checkSupportForPlanNode(
}
// n.table doesn't have meaningful spans, but we need to check support (e.g.
// for any filtering expression).
if _, err := checkSupportForPlanNode(n.table, distSQLVisitor, sd); err != nil {
if _, err := checkSupportForPlanNode(ctx, n.table, distSQLVisitor, sd); err != nil {
return cannotDistribute, err
}
return checkSupportForPlanNode(n.input, distSQLVisitor, sd)
return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)

case *invertedFilterNode:
return checkSupportForInvertedFilterNode(n, distSQLVisitor, sd)
return checkSupportForInvertedFilterNode(ctx, n, distSQLVisitor, sd)

case *invertedJoinNode:
if n.table.lockingStrength != descpb.ScanLockingStrength_FOR_NONE {
Expand All @@ -582,36 +588,35 @@ func checkSupportForPlanNode(
if err := checkExprForDistSQL(n.onExpr, distSQLVisitor); err != nil {
return cannotDistribute, err
}
rec, err := checkSupportForPlanNode(n.input, distSQLVisitor, sd)
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
if err != nil {
return cannotDistribute, err
}
// TODO(yuzefovich): we might want to be smarter about this and don't
// force distribution with small inputs.
log.VEventf(ctx, 2, "inverted join recommends plan distribution")
return rec.compose(shouldDistribute), nil

case *joinNode:
if err := checkExprForDistSQL(n.pred.onCond, distSQLVisitor); err != nil {
return cannotDistribute, err
}
recLeft, err := checkSupportForPlanNode(n.left.plan, distSQLVisitor, sd)
recLeft, err := checkSupportForPlanNode(ctx, n.left.plan, distSQLVisitor, sd)
if err != nil {
return cannotDistribute, err
}
recRight, err := checkSupportForPlanNode(n.right.plan, distSQLVisitor, sd)
recRight, err := checkSupportForPlanNode(ctx, n.right.plan, distSQLVisitor, sd)
if err != nil {
return cannotDistribute, err
}
rec := recLeft.compose(recRight)
if len(n.pred.leftEqualityIndices) > 0 {
// We can partition both streams on the equality columns.
if n.estimatedLeftRowCount == 0 && n.estimatedRightRowCount == 0 {
// In the absence of stats for both inputs, fall back to
// distributing.
log.VEventf(ctx, 2, "join (no stats) recommends plan distribution")
rec = rec.compose(shouldDistribute)
} else if n.estimatedLeftRowCount+n.estimatedRightRowCount >= sd.DistributeJoinRowCountThreshold {
// If we have stats on at least one input, then distribute only
// if the join appears to be "large".
log.VEventf(ctx, 2, "large join recommends plan distribution")
rec = rec.compose(shouldDistribute)
}
}
Expand All @@ -621,7 +626,7 @@ func checkSupportForPlanNode(
// Note that we don't need to check whether we support distribution of
// n.countExpr or n.offsetExpr because those expressions are evaluated
// locally, during the physical planning.
return checkSupportForPlanNode(n.plan, distSQLVisitor, sd)
return checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd)

case *lookupJoinNode:
if n.remoteLookupExpr != nil || n.remoteOnlyLookups {
Expand All @@ -645,7 +650,7 @@ func checkSupportForPlanNode(
if err := checkExprForDistSQL(n.onCond, distSQLVisitor); err != nil {
return cannotDistribute, err
}
rec, err := checkSupportForPlanNode(n.input, distSQLVisitor, sd)
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -662,15 +667,15 @@ func checkSupportForPlanNode(
return cannotDistribute, err
}
}
return checkSupportForPlanNode(n.source, distSQLVisitor, sd)
return checkSupportForPlanNode(ctx, n.source, distSQLVisitor, sd)

case *renderNode:
for _, e := range n.render {
if err := checkExprForDistSQL(e, distSQLVisitor); err != nil {
return cannotDistribute, err
}
}
return checkSupportForPlanNode(n.source.plan, distSQLVisitor, sd)
return checkSupportForPlanNode(ctx, n.source.plan, distSQLVisitor, sd)

case *scanNode:
if n.lockingStrength != descpb.ScanLockingStrength_FOR_NONE {
Expand All @@ -689,51 +694,60 @@ func checkSupportForPlanNode(
// here.
scanRec := canDistribute
if n.estimatedRowCount != 0 && n.estimatedRowCount >= sd.DistributeScanRowCountThreshold {
// This is a large scan, so we choose to distribute it.
log.VEventf(ctx, 2, "large scan recommends plan distribution")
scanRec = shouldDistribute
}
if n.isFull && (n.estimatedRowCount == 0 || sd.AlwaysDistributeFullScans) {
// In the absence of table stats, we default to always distributing
// full scans.
log.VEventf(ctx, 2, "full scan recommends plan distribution")
scanRec = shouldDistribute
}
return scanRec, nil

case *sortNode:
rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor, sd)
rec, err := checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd)
if err != nil {
return cannotDistribute, err
}
sortRec := shouldDistribute
if n.estimatedInputRowCount != 0 && sd.DistributeSortRowCountThreshold > n.estimatedInputRowCount {
// Don't force distribution if we expect to process small number of
// rows.
sortRec = canDistribute
// Don't force distribution if we expect to process small number of
// rows.
sortRec := canDistribute
if n.estimatedInputRowCount == 0 {
log.VEventf(ctx, 2, "sort (no stats) recommends plan distribution")
sortRec = shouldDistribute
} else if n.estimatedInputRowCount >= sd.DistributeSortRowCountThreshold {
log.VEventf(ctx, 2, "large sort recommends plan distribution")
sortRec = shouldDistribute
}
return rec.compose(sortRec), nil

case *topKNode:
rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor, sd)
rec, err := checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd)
if err != nil {
return cannotDistribute, err
}
topKRec := shouldDistribute
if n.estimatedInputRowCount != 0 && sd.DistributeSortRowCountThreshold > n.estimatedInputRowCount {
// Don't force distribution if we expect to process small number of
// rows.
topKRec = canDistribute
// Don't force distribution if we expect to process small number of
// rows.
topKRec := canDistribute
if n.estimatedInputRowCount == 0 {
log.VEventf(ctx, 2, "top k (no stats) recommends plan distribution")
topKRec = shouldDistribute
} else if n.estimatedInputRowCount >= sd.DistributeSortRowCountThreshold {
log.VEventf(ctx, 2, "large top k recommends plan distribution")
topKRec = shouldDistribute
}
return rec.compose(topKRec), nil

case *unaryNode:
return canDistribute, nil

case *unionNode:
recLeft, err := checkSupportForPlanNode(n.left, distSQLVisitor, sd)
recLeft, err := checkSupportForPlanNode(ctx, n.left, distSQLVisitor, sd)
if err != nil {
return cannotDistribute, err
}
recRight, err := checkSupportForPlanNode(n.right, distSQLVisitor, sd)
recRight, err := checkSupportForPlanNode(ctx, n.right, distSQLVisitor, sd)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -757,7 +771,7 @@ func checkSupportForPlanNode(
return canDistribute, nil

case *windowNode:
rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor, sd)
rec, err := checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -767,6 +781,7 @@ func checkSupportForPlanNode(
// should distribute the execution.
// TODO(yuzefovich): we might want to be smarter about this and
// don't force distribution with small inputs.
log.VEventf(ctx, 2, "window function with PARTITION BY recommends plan distribution")
return rec.compose(shouldDistribute), nil
}
}
Expand All @@ -790,6 +805,7 @@ func checkSupportForPlanNode(
}
// TODO(yuzefovich): we might want to be smarter about this and don't
// force distribution with small inputs.
log.VEventf(ctx, 2, "zigzag join recommends plan distribution")
return shouldDistribute, nil
case *cdcValuesNode:
return cannotDistribute, nil
Expand All @@ -800,9 +816,12 @@ func checkSupportForPlanNode(
}

func checkSupportForInvertedFilterNode(
n *invertedFilterNode, distSQLVisitor *distSQLExprCheckVisitor, sd *sessiondata.SessionData,
ctx context.Context,
n *invertedFilterNode,
distSQLVisitor *distSQLExprCheckVisitor,
sd *sessiondata.SessionData,
) (distRecommendation, error) {
rec, err := checkSupportForPlanNode(n.input, distSQLVisitor, sd)
rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd)
if err != nil {
return cannotDistribute, err
}
Expand All @@ -829,6 +848,7 @@ func checkSupportForInvertedFilterNode(
if n.expression.Left == nil && n.expression.Right == nil {
// TODO(yuzefovich): we might want to be smarter about this and don't
// force distribution with small inputs.
log.VEventf(ctx, 2, "inverted filter (union of inverted spans) recommends plan distribution")
filterRec = shouldDistribute
}
return rec.compose(filterRec), nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1973,7 +1973,7 @@ func getPlanDistribution(
return physicalplan.LocalPlan, nil
}

rec, err := checkSupportForPlanNode(plan.planNode, distSQLVisitor, sd)
rec, err := checkSupportForPlanNode(ctx, plan.planNode, distSQLVisitor, sd)
if err != nil {
// Don't use distSQL for this request.
log.VEventf(ctx, 1, "query not supported for distSQL: %s", err)
Expand Down

0 comments on commit 320e6c9

Please sign in to comment.