From c0b1cc3b9394063e6f44cb8e8b6dfe430bbfd078 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 18 Jun 2019 10:28:54 -0700 Subject: [PATCH] sql: add support for executing postqueries This commit introduces infrastructure for running "deferred subqueries" that are to be executed after the execution of the main query tree which is needed for (among other things) for foreign key checks. Release note: None --- pkg/sql/conn_executor_exec.go | 18 ++++- pkg/sql/distsql_physical_planner.go | 7 +- pkg/sql/distsql_running.go | 113 ++++++++++++++++++++++++++++ pkg/sql/plan.go | 10 +++ 4 files changed, 143 insertions(+), 5 deletions(-) diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index a9954314d5f2..2502598f6e25 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -875,15 +875,19 @@ func (ex *connExecutor) execWithDistSQLEngine( planCtx.planner = planner planCtx.stmtType = recv.stmtType - if len(planner.curPlan.subqueryPlans) != 0 { + var evalCtxFactory func() *extendedEvalContext + if len(planner.curPlan.subqueryPlans) != 0 || len(planner.curPlan.postqueryPlans) != 0 { var evalCtx extendedEvalContext ex.initEvalCtx(ctx, &evalCtx, planner) - evalCtxFactory := func() *extendedEvalContext { + evalCtxFactory = func() *extendedEvalContext { ex.resetEvalCtx(&evalCtx, planner.txn, planner.ExtendedEvalContext().StmtTimestamp) evalCtx.Placeholders = &planner.semaCtx.Placeholders evalCtx.Annotations = &planner.semaCtx.Annotations return &evalCtx } + } + + if len(planner.curPlan.subqueryPlans) != 0 { if !ex.server.cfg.DistSQLPlanner.PlanAndRunSubqueries( ctx, planner, evalCtxFactory, planner.curPlan.subqueryPlans, recv, distribute, ) { @@ -895,6 +899,16 @@ func (ex *connExecutor) execWithDistSQLEngine( // the planner whether or not to plan remote table readers. ex.server.cfg.DistSQLPlanner.PlanAndRun( ctx, evalCtx, planCtx, planner.txn, planner.curPlan.plan, recv) + if recv.commErr != nil { + return recv.bytesRead, recv.rowsRead, recv.commErr + } + + if len(planner.curPlan.postqueryPlans) != 0 { + ex.server.cfg.DistSQLPlanner.PlanAndRunPostqueries( + ctx, planner, evalCtxFactory, planner.curPlan.postqueryPlans, recv, distribute, + ) + } + return recv.bytesRead, recv.rowsRead, recv.commErr } diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 4056dcc06e2d..29f09e6511e7 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -514,9 +514,10 @@ type PlanningCtx struct { isLocal bool planner *planner // ignoreClose, when set to true, will prevent the closing of the planner's - // current plan. The top-level query needs to close it, but everything else - // (like subqueries or EXPLAIN ANALYZE) should set this to true to avoid - // double closes of the planNode tree. + // current plan. Only the top-level query needs to close it, but everything + // else (like subqueries or EXPLAIN ANALYZE) should set this to true to avoid + // double closes of the planNode tree. Postqueries also need to set it to + // true, and they are responsible for closing their own plan. ignoreClose bool stmtType tree.StatementType // planDepth is set to the current depth of the planNode tree. It's used to diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 9e144301f706..03a80746cffb 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -850,3 +850,116 @@ func (dsp *DistSQLPlanner) PlanAndRun( dsp.FinalizePlan(planCtx, &physPlan) dsp.Run(planCtx, txn, &physPlan, recv, evalCtx, nil /* finishedSetupFn */) } + +// PlanAndRunPostqueries returns false if an error was encountered and sets +// that error in the provided receiver. +func (dsp *DistSQLPlanner) PlanAndRunPostqueries( + ctx context.Context, + planner *planner, + evalCtxFactory func() *extendedEvalContext, + postqueryPlans []postquery, + recv *DistSQLReceiver, + maybeDistribute bool, +) bool { + for _, postqueryPlan := range postqueryPlans { + if err := dsp.planAndRunPostquery( + ctx, + postqueryPlan, + planner, + evalCtxFactory(), + recv, + maybeDistribute, + ); err != nil { + recv.SetError(err) + return false + } + } + + return true +} + +func (dsp *DistSQLPlanner) planAndRunPostquery( + ctx context.Context, + postqueryPlan postquery, + planner *planner, + evalCtx *extendedEvalContext, + recv *DistSQLReceiver, + maybeDistribute bool, +) error { + postqueryMonitor := mon.MakeMonitor( + "postquery", + mon.MemoryResource, + dsp.distSQLSrv.Metrics.CurBytesCount, + dsp.distSQLSrv.Metrics.MaxBytesHist, + -1, /* use default block size */ + noteworthyMemoryUsageBytes, + dsp.distSQLSrv.Settings, + ) + postqueryMonitor.Start(ctx, evalCtx.Mon, mon.BoundAccount{}) + defer postqueryMonitor.Stop(ctx) + + postqueryMemAccount := postqueryMonitor.MakeBoundAccount() + defer postqueryMemAccount.Close(ctx) + + var postqueryPlanCtx *PlanningCtx + var distributePostquery bool + if maybeDistribute { + distributePostquery = shouldDistributePlan( + ctx, planner.SessionData().DistSQLMode, dsp, postqueryPlan.plan) + } + if distributePostquery { + postqueryPlanCtx = dsp.NewPlanningCtx(ctx, evalCtx, planner.txn) + } else { + postqueryPlanCtx = dsp.newLocalPlanningCtx(ctx, evalCtx) + } + + postqueryPlanCtx.isLocal = !distributePostquery + postqueryPlanCtx.planner = planner + postqueryPlanCtx.stmtType = tree.Rows + // We cannot close the postqueries' plans through the plan top since + // postqueries haven't been executed yet, so we manually close each postquery + // plan right after the execution. + postqueryPlanCtx.ignoreClose = true + defer postqueryPlan.plan.Close(ctx) + + postqueryPhysPlan, err := dsp.createPlanForNode(postqueryPlanCtx, postqueryPlan.plan) + if err != nil { + return err + } + dsp.FinalizePlan(postqueryPlanCtx, &postqueryPhysPlan) + + postqueryRecv := recv.clone() + var postqueryRowReceiver postqueryRowResultWriter + postqueryRecv.resultWriter = &postqueryRowReceiver + dsp.Run(postqueryPlanCtx, planner.txn, &postqueryPhysPlan, postqueryRecv, evalCtx, nil /* finishedSetupFn */) + if postqueryRecv.commErr != nil { + return postqueryRecv.commErr + } + return postqueryRowReceiver.Err() +} + +// postqueryRowResultWriter is a lightweight version of RowResultWriter that +// can only write errors. It is used only for executing postqueries and is +// sufficient for that case since those can only return errors. +type postqueryRowResultWriter struct { + err error +} + +var _ rowResultWriter = &postqueryRowResultWriter{} + +func (r *postqueryRowResultWriter) AddRow(ctx context.Context, row tree.Datums) error { + return errors.Errorf("unexpectedly AddRow is called on postqueryRowResultWriter") +} + +func (r *postqueryRowResultWriter) IncrementRowsAffected(n int) { + // TODO(yuzefovich): this probably will need to change when we support + // cascades. +} + +func (r *postqueryRowResultWriter) SetError(err error) { + r.err = err +} + +func (r *postqueryRowResultWriter) Err() error { + return r.err +} diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index b0e7a88e41cc..bb14e6f7847d 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -298,6 +298,10 @@ type planTop struct { // subqueryPlans contains all the sub-query plans. subqueryPlans []subquery + // postqueryPlans contains all the plans for subqueries that are to be + // executed after the main query (for example, foreign key checks). + postqueryPlans []postquery + // auditEvents becomes non-nil if any of the descriptors used by // current statement is causing an auditing event. See exec_log.go. auditEvents []auditEvent @@ -321,6 +325,12 @@ type planTop struct { avoidBuffering bool } +// postquery is a query tree that is executed after the main one. It can only +// return an error (for example, foreign key violation). +type postquery struct { + plan planNode +} + // makePlan implements the planMaker interface. It populates the // planner's curPlan field. //