diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index a9954314d5f2..2502598f6e25 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -875,15 +875,19 @@ func (ex *connExecutor) execWithDistSQLEngine( planCtx.planner = planner planCtx.stmtType = recv.stmtType - if len(planner.curPlan.subqueryPlans) != 0 { + var evalCtxFactory func() *extendedEvalContext + if len(planner.curPlan.subqueryPlans) != 0 || len(planner.curPlan.postqueryPlans) != 0 { var evalCtx extendedEvalContext ex.initEvalCtx(ctx, &evalCtx, planner) - evalCtxFactory := func() *extendedEvalContext { + evalCtxFactory = func() *extendedEvalContext { ex.resetEvalCtx(&evalCtx, planner.txn, planner.ExtendedEvalContext().StmtTimestamp) evalCtx.Placeholders = &planner.semaCtx.Placeholders evalCtx.Annotations = &planner.semaCtx.Annotations return &evalCtx } + } + + if len(planner.curPlan.subqueryPlans) != 0 { if !ex.server.cfg.DistSQLPlanner.PlanAndRunSubqueries( ctx, planner, evalCtxFactory, planner.curPlan.subqueryPlans, recv, distribute, ) { @@ -895,6 +899,16 @@ func (ex *connExecutor) execWithDistSQLEngine( // the planner whether or not to plan remote table readers. ex.server.cfg.DistSQLPlanner.PlanAndRun( ctx, evalCtx, planCtx, planner.txn, planner.curPlan.plan, recv) + if recv.commErr != nil { + return recv.bytesRead, recv.rowsRead, recv.commErr + } + + if len(planner.curPlan.postqueryPlans) != 0 { + ex.server.cfg.DistSQLPlanner.PlanAndRunPostqueries( + ctx, planner, evalCtxFactory, planner.curPlan.postqueryPlans, recv, distribute, + ) + } + return recv.bytesRead, recv.rowsRead, recv.commErr } diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 4056dcc06e2d..29f09e6511e7 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -514,9 +514,10 @@ type PlanningCtx struct { isLocal bool planner *planner // ignoreClose, when set to true, will prevent the closing of the planner's - // current plan. The top-level query needs to close it, but everything else - // (like subqueries or EXPLAIN ANALYZE) should set this to true to avoid - // double closes of the planNode tree. + // current plan. Only the top-level query needs to close it, but everything + // else (like subqueries or EXPLAIN ANALYZE) should set this to true to avoid + // double closes of the planNode tree. Postqueries also need to set it to + // true, and they are responsible for closing their own plan. ignoreClose bool stmtType tree.StatementType // planDepth is set to the current depth of the planNode tree. It's used to diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 9e144301f706..03a80746cffb 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -850,3 +850,116 @@ func (dsp *DistSQLPlanner) PlanAndRun( dsp.FinalizePlan(planCtx, &physPlan) dsp.Run(planCtx, txn, &physPlan, recv, evalCtx, nil /* finishedSetupFn */) } + +// PlanAndRunPostqueries returns false if an error was encountered and sets +// that error in the provided receiver. +func (dsp *DistSQLPlanner) PlanAndRunPostqueries( + ctx context.Context, + planner *planner, + evalCtxFactory func() *extendedEvalContext, + postqueryPlans []postquery, + recv *DistSQLReceiver, + maybeDistribute bool, +) bool { + for _, postqueryPlan := range postqueryPlans { + if err := dsp.planAndRunPostquery( + ctx, + postqueryPlan, + planner, + evalCtxFactory(), + recv, + maybeDistribute, + ); err != nil { + recv.SetError(err) + return false + } + } + + return true +} + +func (dsp *DistSQLPlanner) planAndRunPostquery( + ctx context.Context, + postqueryPlan postquery, + planner *planner, + evalCtx *extendedEvalContext, + recv *DistSQLReceiver, + maybeDistribute bool, +) error { + postqueryMonitor := mon.MakeMonitor( + "postquery", + mon.MemoryResource, + dsp.distSQLSrv.Metrics.CurBytesCount, + dsp.distSQLSrv.Metrics.MaxBytesHist, + -1, /* use default block size */ + noteworthyMemoryUsageBytes, + dsp.distSQLSrv.Settings, + ) + postqueryMonitor.Start(ctx, evalCtx.Mon, mon.BoundAccount{}) + defer postqueryMonitor.Stop(ctx) + + postqueryMemAccount := postqueryMonitor.MakeBoundAccount() + defer postqueryMemAccount.Close(ctx) + + var postqueryPlanCtx *PlanningCtx + var distributePostquery bool + if maybeDistribute { + distributePostquery = shouldDistributePlan( + ctx, planner.SessionData().DistSQLMode, dsp, postqueryPlan.plan) + } + if distributePostquery { + postqueryPlanCtx = dsp.NewPlanningCtx(ctx, evalCtx, planner.txn) + } else { + postqueryPlanCtx = dsp.newLocalPlanningCtx(ctx, evalCtx) + } + + postqueryPlanCtx.isLocal = !distributePostquery + postqueryPlanCtx.planner = planner + postqueryPlanCtx.stmtType = tree.Rows + // We cannot close the postqueries' plans through the plan top since + // postqueries haven't been executed yet, so we manually close each postquery + // plan right after the execution. + postqueryPlanCtx.ignoreClose = true + defer postqueryPlan.plan.Close(ctx) + + postqueryPhysPlan, err := dsp.createPlanForNode(postqueryPlanCtx, postqueryPlan.plan) + if err != nil { + return err + } + dsp.FinalizePlan(postqueryPlanCtx, &postqueryPhysPlan) + + postqueryRecv := recv.clone() + var postqueryRowReceiver postqueryRowResultWriter + postqueryRecv.resultWriter = &postqueryRowReceiver + dsp.Run(postqueryPlanCtx, planner.txn, &postqueryPhysPlan, postqueryRecv, evalCtx, nil /* finishedSetupFn */) + if postqueryRecv.commErr != nil { + return postqueryRecv.commErr + } + return postqueryRowReceiver.Err() +} + +// postqueryRowResultWriter is a lightweight version of RowResultWriter that +// can only write errors. It is used only for executing postqueries and is +// sufficient for that case since those can only return errors. +type postqueryRowResultWriter struct { + err error +} + +var _ rowResultWriter = &postqueryRowResultWriter{} + +func (r *postqueryRowResultWriter) AddRow(ctx context.Context, row tree.Datums) error { + return errors.Errorf("unexpectedly AddRow is called on postqueryRowResultWriter") +} + +func (r *postqueryRowResultWriter) IncrementRowsAffected(n int) { + // TODO(yuzefovich): this probably will need to change when we support + // cascades. +} + +func (r *postqueryRowResultWriter) SetError(err error) { + r.err = err +} + +func (r *postqueryRowResultWriter) Err() error { + return r.err +} diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index b0e7a88e41cc..bb14e6f7847d 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -298,6 +298,10 @@ type planTop struct { // subqueryPlans contains all the sub-query plans. subqueryPlans []subquery + // postqueryPlans contains all the plans for subqueries that are to be + // executed after the main query (for example, foreign key checks). + postqueryPlans []postquery + // auditEvents becomes non-nil if any of the descriptors used by // current statement is causing an auditing event. See exec_log.go. auditEvents []auditEvent @@ -321,6 +325,12 @@ type planTop struct { avoidBuffering bool } +// postquery is a query tree that is executed after the main one. It can only +// return an error (for example, foreign key violation). +type postquery struct { + plan planNode +} + // makePlan implements the planMaker interface. It populates the // planner's curPlan field. //