Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: add support for executing postqueries #38281

Merged
merged 1 commit into from
Jun 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,15 +875,19 @@ func (ex *connExecutor) execWithDistSQLEngine(
planCtx.planner = planner
planCtx.stmtType = recv.stmtType

if len(planner.curPlan.subqueryPlans) != 0 {
var evalCtxFactory func() *extendedEvalContext
if len(planner.curPlan.subqueryPlans) != 0 || len(planner.curPlan.postqueryPlans) != 0 {
var evalCtx extendedEvalContext
ex.initEvalCtx(ctx, &evalCtx, planner)
evalCtxFactory := func() *extendedEvalContext {
evalCtxFactory = func() *extendedEvalContext {
ex.resetEvalCtx(&evalCtx, planner.txn, planner.ExtendedEvalContext().StmtTimestamp)
evalCtx.Placeholders = &planner.semaCtx.Placeholders
evalCtx.Annotations = &planner.semaCtx.Annotations
return &evalCtx
}
}

if len(planner.curPlan.subqueryPlans) != 0 {
if !ex.server.cfg.DistSQLPlanner.PlanAndRunSubqueries(
ctx, planner, evalCtxFactory, planner.curPlan.subqueryPlans, recv, distribute,
) {
Expand All @@ -895,6 +899,16 @@ func (ex *connExecutor) execWithDistSQLEngine(
// the planner whether or not to plan remote table readers.
ex.server.cfg.DistSQLPlanner.PlanAndRun(
ctx, evalCtx, planCtx, planner.txn, planner.curPlan.plan, recv)
if recv.commErr != nil {
return recv.bytesRead, recv.rowsRead, recv.commErr
}

if len(planner.curPlan.postqueryPlans) != 0 {
ex.server.cfg.DistSQLPlanner.PlanAndRunPostqueries(
ctx, planner, evalCtxFactory, planner.curPlan.postqueryPlans, recv, distribute,
)
}

return recv.bytesRead, recv.rowsRead, recv.commErr
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,9 +514,10 @@ type PlanningCtx struct {
isLocal bool
planner *planner
// ignoreClose, when set to true, will prevent the closing of the planner's
// current plan. The top-level query needs to close it, but everything else
// (like subqueries or EXPLAIN ANALYZE) should set this to true to avoid
// double closes of the planNode tree.
// current plan. Only the top-level query needs to close it, but everything
// else (like subqueries or EXPLAIN ANALYZE) should set this to true to avoid
// double closes of the planNode tree. Postqueries also need to set it to
// true, and they are responsible for closing their own plan.
ignoreClose bool
stmtType tree.StatementType
// planDepth is set to the current depth of the planNode tree. It's used to
Expand Down
113 changes: 113 additions & 0 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,3 +850,116 @@ func (dsp *DistSQLPlanner) PlanAndRun(
dsp.FinalizePlan(planCtx, &physPlan)
dsp.Run(planCtx, txn, &physPlan, recv, evalCtx, nil /* finishedSetupFn */)
}

// PlanAndRunPostqueries returns false if an error was encountered and sets
// that error in the provided receiver.
func (dsp *DistSQLPlanner) PlanAndRunPostqueries(
ctx context.Context,
planner *planner,
evalCtxFactory func() *extendedEvalContext,
postqueryPlans []postquery,
recv *DistSQLReceiver,
maybeDistribute bool,
) bool {
for _, postqueryPlan := range postqueryPlans {
if err := dsp.planAndRunPostquery(
ctx,
postqueryPlan,
planner,
evalCtxFactory(),
recv,
maybeDistribute,
); err != nil {
recv.SetError(err)
return false
}
}

return true
}

func (dsp *DistSQLPlanner) planAndRunPostquery(
ctx context.Context,
postqueryPlan postquery,
planner *planner,
evalCtx *extendedEvalContext,
recv *DistSQLReceiver,
maybeDistribute bool,
) error {
postqueryMonitor := mon.MakeMonitor(
"postquery",
mon.MemoryResource,
dsp.distSQLSrv.Metrics.CurBytesCount,
dsp.distSQLSrv.Metrics.MaxBytesHist,
-1, /* use default block size */
noteworthyMemoryUsageBytes,
dsp.distSQLSrv.Settings,
)
postqueryMonitor.Start(ctx, evalCtx.Mon, mon.BoundAccount{})
defer postqueryMonitor.Stop(ctx)

postqueryMemAccount := postqueryMonitor.MakeBoundAccount()
defer postqueryMemAccount.Close(ctx)

var postqueryPlanCtx *PlanningCtx
var distributePostquery bool
if maybeDistribute {
distributePostquery = shouldDistributePlan(
ctx, planner.SessionData().DistSQLMode, dsp, postqueryPlan.plan)
}
if distributePostquery {
postqueryPlanCtx = dsp.NewPlanningCtx(ctx, evalCtx, planner.txn)
} else {
postqueryPlanCtx = dsp.newLocalPlanningCtx(ctx, evalCtx)
}

postqueryPlanCtx.isLocal = !distributePostquery
postqueryPlanCtx.planner = planner
postqueryPlanCtx.stmtType = tree.Rows
// We cannot close the postqueries' plans through the plan top since
// postqueries haven't been executed yet, so we manually close each postquery
// plan right after the execution.
postqueryPlanCtx.ignoreClose = true
defer postqueryPlan.plan.Close(ctx)

postqueryPhysPlan, err := dsp.createPlanForNode(postqueryPlanCtx, postqueryPlan.plan)
if err != nil {
return err
}
dsp.FinalizePlan(postqueryPlanCtx, &postqueryPhysPlan)

postqueryRecv := recv.clone()
var postqueryRowReceiver postqueryRowResultWriter
postqueryRecv.resultWriter = &postqueryRowReceiver
dsp.Run(postqueryPlanCtx, planner.txn, &postqueryPhysPlan, postqueryRecv, evalCtx, nil /* finishedSetupFn */)
if postqueryRecv.commErr != nil {
return postqueryRecv.commErr
}
return postqueryRowReceiver.Err()
}

// postqueryRowResultWriter is a lightweight version of RowResultWriter that
// can only write errors. It is used only for executing postqueries and is
// sufficient for that case since those can only return errors.
type postqueryRowResultWriter struct {
err error
}

var _ rowResultWriter = &postqueryRowResultWriter{}

func (r *postqueryRowResultWriter) AddRow(ctx context.Context, row tree.Datums) error {
return errors.Errorf("unexpectedly AddRow is called on postqueryRowResultWriter")
}

func (r *postqueryRowResultWriter) IncrementRowsAffected(n int) {
// TODO(yuzefovich): this probably will need to change when we support
// cascades.
}

func (r *postqueryRowResultWriter) SetError(err error) {
r.err = err
}

func (r *postqueryRowResultWriter) Err() error {
return r.err
}
10 changes: 10 additions & 0 deletions pkg/sql/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,10 @@ type planTop struct {
// subqueryPlans contains all the sub-query plans.
subqueryPlans []subquery

// postqueryPlans contains all the plans for subqueries that are to be
// executed after the main query (for example, foreign key checks).
postqueryPlans []postquery

// auditEvents becomes non-nil if any of the descriptors used by
// current statement is causing an auditing event. See exec_log.go.
auditEvents []auditEvent
Expand All @@ -321,6 +325,12 @@ type planTop struct {
avoidBuffering bool
}

// postquery is a query tree that is executed after the main one. It can only
// return an error (for example, foreign key violation).
type postquery struct {
plan planNode
}

// makePlan implements the planMaker interface. It populates the
// planner's curPlan field.
//
Expand Down