From 3c5d71c19d5d26d9ffc396608ff5339462625575 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 16 Dec 2024 14:14:23 -0800 Subject: [PATCH] sql: add some logging to physical planning This commit introduces some logging, hidden behind a verbosity level, to the physical planning whenever we choose `shouldDistribute` recommendation. This will show up in the trace and should be helpful to understand why we decided to distribute a particular query. Release note: None --- pkg/sql/distsql_physical_planner.go | 108 ++++++++++++++++------------ pkg/sql/exec_util.go | 2 +- 2 files changed, 65 insertions(+), 45 deletions(-) diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 0ba47febfbe6..c1878420c596 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -396,7 +396,7 @@ const ( cannotDistribute distRecommendation = iota // canDistribute indicates that a plan can be distributed, but it's not - // clear whether it'll be benefit from that. + // clear whether it'll benefit from that. canDistribute // shouldDistribute indicates that a plan will likely benefit if distributed. @@ -512,7 +512,10 @@ func mustWrapValuesNode(planCtx *PlanningCtx, specifiedInQuery bool) bool { // this plan couldn't be distributed. // TODO(radu): add tests for this. func checkSupportForPlanNode( - node planNode, distSQLVisitor *distSQLExprCheckVisitor, sd *sessiondata.SessionData, + ctx context.Context, + node planNode, + distSQLVisitor *distSQLExprCheckVisitor, + sd *sessiondata.SessionData, ) (distRecommendation, error) { switch n := node.(type) { // Keep these cases alphabetized, please! @@ -523,19 +526,19 @@ func checkSupportForPlanNode( return shouldDistribute, nil case *distinctNode: - return checkSupportForPlanNode(n.plan, distSQLVisitor, sd) + return checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd) case *exportNode: - return checkSupportForPlanNode(n.source, distSQLVisitor, sd) + return checkSupportForPlanNode(ctx, n.source, distSQLVisitor, sd) case *filterNode: if err := checkExprForDistSQL(n.filter, distSQLVisitor); err != nil { return cannotDistribute, err } - return checkSupportForPlanNode(n.source.plan, distSQLVisitor, sd) + return checkSupportForPlanNode(ctx, n.source.plan, distSQLVisitor, sd) case *groupNode: - rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } @@ -544,12 +547,15 @@ func checkSupportForPlanNode( return cannotDistribute, newQueryNotSupportedErrorf("aggregate %q cannot be executed with distsql", agg.funcName) } } - // Distribute aggregations if possible. - aggRec := shouldDistribute - if n.estimatedInputRowCount != 0 && sd.DistributeGroupByRowCountThreshold > n.estimatedInputRowCount { - // Don't force distribution if we expect to process small number of - // rows. - aggRec = canDistribute + // Don't force distribution if we expect to process small number of + // rows. + aggRec := canDistribute + if n.estimatedInputRowCount == 0 { + log.VEventf(ctx, 2, "aggregation (no stats) recommends plan distribution") + aggRec = shouldDistribute + } else if n.estimatedInputRowCount >= sd.DistributeGroupByRowCountThreshold { + log.VEventf(ctx, 2, "large aggregation recommends plan distribution") + aggRec = shouldDistribute } return rec.compose(aggRec), nil @@ -563,13 +569,13 @@ func checkSupportForPlanNode( } // n.table doesn't have meaningful spans, but we need to check support (e.g. // for any filtering expression). - if _, err := checkSupportForPlanNode(n.table, distSQLVisitor, sd); err != nil { + if _, err := checkSupportForPlanNode(ctx, n.table, distSQLVisitor, sd); err != nil { return cannotDistribute, err } - return checkSupportForPlanNode(n.input, distSQLVisitor, sd) + return checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd) case *invertedFilterNode: - return checkSupportForInvertedFilterNode(n, distSQLVisitor, sd) + return checkSupportForInvertedFilterNode(ctx, n, distSQLVisitor, sd) case *invertedJoinNode: if n.table.lockingStrength != descpb.ScanLockingStrength_FOR_NONE { @@ -582,23 +588,24 @@ func checkSupportForPlanNode( if err := checkExprForDistSQL(n.onExpr, distSQLVisitor); err != nil { return cannotDistribute, err } - rec, err := checkSupportForPlanNode(n.input, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } // TODO(yuzefovich): we might want to be smarter about this and don't // force distribution with small inputs. + log.VEventf(ctx, 2, "inverted join recommends plan distribution") return rec.compose(shouldDistribute), nil case *joinNode: if err := checkExprForDistSQL(n.pred.onCond, distSQLVisitor); err != nil { return cannotDistribute, err } - recLeft, err := checkSupportForPlanNode(n.left.plan, distSQLVisitor, sd) + recLeft, err := checkSupportForPlanNode(ctx, n.left.plan, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } - recRight, err := checkSupportForPlanNode(n.right.plan, distSQLVisitor, sd) + recRight, err := checkSupportForPlanNode(ctx, n.right.plan, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } @@ -606,12 +613,10 @@ func checkSupportForPlanNode( if len(n.pred.leftEqualityIndices) > 0 { // We can partition both streams on the equality columns. if n.estimatedLeftRowCount == 0 && n.estimatedRightRowCount == 0 { - // In the absence of stats for both inputs, fall back to - // distributing. + log.VEventf(ctx, 2, "join (no stats) recommends plan distribution") rec = rec.compose(shouldDistribute) } else if n.estimatedLeftRowCount+n.estimatedRightRowCount >= sd.DistributeJoinRowCountThreshold { - // If we have stats on at least one input, then distribute only - // if the join appears to be "large". + log.VEventf(ctx, 2, "large join recommends plan distribution") rec = rec.compose(shouldDistribute) } } @@ -621,7 +626,7 @@ func checkSupportForPlanNode( // Note that we don't need to check whether we support distribution of // n.countExpr or n.offsetExpr because those expressions are evaluated // locally, during the physical planning. - return checkSupportForPlanNode(n.plan, distSQLVisitor, sd) + return checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd) case *lookupJoinNode: if n.remoteLookupExpr != nil || n.remoteOnlyLookups { @@ -645,7 +650,7 @@ func checkSupportForPlanNode( if err := checkExprForDistSQL(n.onCond, distSQLVisitor); err != nil { return cannotDistribute, err } - rec, err := checkSupportForPlanNode(n.input, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } @@ -662,7 +667,7 @@ func checkSupportForPlanNode( return cannotDistribute, err } } - return checkSupportForPlanNode(n.source, distSQLVisitor, sd) + return checkSupportForPlanNode(ctx, n.source, distSQLVisitor, sd) case *renderNode: for _, e := range n.render { @@ -670,7 +675,7 @@ func checkSupportForPlanNode( return cannotDistribute, err } } - return checkSupportForPlanNode(n.source.plan, distSQLVisitor, sd) + return checkSupportForPlanNode(ctx, n.source.plan, distSQLVisitor, sd) case *scanNode: if n.lockingStrength != descpb.ScanLockingStrength_FOR_NONE { @@ -689,39 +694,48 @@ func checkSupportForPlanNode( // here. scanRec := canDistribute if n.estimatedRowCount != 0 && n.estimatedRowCount >= sd.DistributeScanRowCountThreshold { - // This is a large scan, so we choose to distribute it. + log.VEventf(ctx, 2, "large scan recommends plan distribution") scanRec = shouldDistribute } if n.isFull && (n.estimatedRowCount == 0 || sd.AlwaysDistributeFullScans) { // In the absence of table stats, we default to always distributing // full scans. + log.VEventf(ctx, 2, "full scan recommends plan distribution") scanRec = shouldDistribute } return scanRec, nil case *sortNode: - rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } - sortRec := shouldDistribute - if n.estimatedInputRowCount != 0 && sd.DistributeSortRowCountThreshold > n.estimatedInputRowCount { - // Don't force distribution if we expect to process small number of - // rows. - sortRec = canDistribute + // Don't force distribution if we expect to process small number of + // rows. + sortRec := canDistribute + if n.estimatedInputRowCount == 0 { + log.VEventf(ctx, 2, "sort (no stats) recommends plan distribution") + sortRec = shouldDistribute + } else if n.estimatedInputRowCount >= sd.DistributeSortRowCountThreshold { + log.VEventf(ctx, 2, "large sort recommends plan distribution") + sortRec = shouldDistribute } return rec.compose(sortRec), nil case *topKNode: - rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } - topKRec := shouldDistribute - if n.estimatedInputRowCount != 0 && sd.DistributeSortRowCountThreshold > n.estimatedInputRowCount { - // Don't force distribution if we expect to process small number of - // rows. - topKRec = canDistribute + // Don't force distribution if we expect to process small number of + // rows. + topKRec := canDistribute + if n.estimatedInputRowCount == 0 { + log.VEventf(ctx, 2, "top k (no stats) recommends plan distribution") + topKRec = shouldDistribute + } else if n.estimatedInputRowCount >= sd.DistributeSortRowCountThreshold { + log.VEventf(ctx, 2, "large top k recommends plan distribution") + topKRec = shouldDistribute } return rec.compose(topKRec), nil @@ -729,11 +743,11 @@ func checkSupportForPlanNode( return canDistribute, nil case *unionNode: - recLeft, err := checkSupportForPlanNode(n.left, distSQLVisitor, sd) + recLeft, err := checkSupportForPlanNode(ctx, n.left, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } - recRight, err := checkSupportForPlanNode(n.right, distSQLVisitor, sd) + recRight, err := checkSupportForPlanNode(ctx, n.right, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } @@ -757,7 +771,7 @@ func checkSupportForPlanNode( return canDistribute, nil case *windowNode: - rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, n.plan, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } @@ -767,6 +781,7 @@ func checkSupportForPlanNode( // should distribute the execution. // TODO(yuzefovich): we might want to be smarter about this and // don't force distribution with small inputs. + log.VEventf(ctx, 2, "window function with PARTITION BY recommends plan distribution") return rec.compose(shouldDistribute), nil } } @@ -790,6 +805,7 @@ func checkSupportForPlanNode( } // TODO(yuzefovich): we might want to be smarter about this and don't // force distribution with small inputs. + log.VEventf(ctx, 2, "zigzag join recommends plan distribution") return shouldDistribute, nil case *cdcValuesNode: return cannotDistribute, nil @@ -800,9 +816,12 @@ func checkSupportForPlanNode( } func checkSupportForInvertedFilterNode( - n *invertedFilterNode, distSQLVisitor *distSQLExprCheckVisitor, sd *sessiondata.SessionData, + ctx context.Context, + n *invertedFilterNode, + distSQLVisitor *distSQLExprCheckVisitor, + sd *sessiondata.SessionData, ) (distRecommendation, error) { - rec, err := checkSupportForPlanNode(n.input, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, n.input, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } @@ -829,6 +848,7 @@ func checkSupportForInvertedFilterNode( if n.expression.Left == nil && n.expression.Right == nil { // TODO(yuzefovich): we might want to be smarter about this and don't // force distribution with small inputs. + log.VEventf(ctx, 2, "inverted filter (union of inverted spans) recommends plan distribution") filterRec = shouldDistribute } return rec.compose(filterRec), nil diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index f1c7da0c379c..47a4946fa4bc 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1973,7 +1973,7 @@ func getPlanDistribution( return physicalplan.LocalPlan, nil } - rec, err := checkSupportForPlanNode(plan.planNode, distSQLVisitor, sd) + rec, err := checkSupportForPlanNode(ctx, plan.planNode, distSQLVisitor, sd) if err != nil { // Don't use distSQL for this request. log.VEventf(ctx, 1, "query not supported for distSQL: %s", err)