Skip to content

Commit

Permalink
sql: address a couple of old TODOs
Browse files Browse the repository at this point in the history
This commit addresses a couple of old TODOs in the
`connExecutor.dispatchToExecutionEngine` method.

First, it simply removes the now-stale TODO about needing to `Step()`
the txn before executing the query. The TODO mentioned things about the
old way FK checks were performed, but we now always run checks in
a deferred fashion, and perform the `Step`s correctly there for cascades
and checks, thus, the TODO can be removed. (The TODO itself explicitly
mentions that it can be removed once we do checks and cascades in the
deferred fashion.)

Second, it addresses a TODO about how some plan information is saved. Up
until 961e66f the cleanup of `planNode`
tree and `flow` infra was intertwined, so in 6ae4881
in order to "sample" the plan with correct info (some of which is only
available _after_ the execution is done) we delegated that sampling to
`planTop.close`. However, with `planNode` tree and `flow` cleanups
refactored and de-coupled, we no longer have to close the plan early on.
As a result, we now close the plan in a defer right after we create it
(now it's the only place we do so), and this commit makes it so that we
record the plan info explicitly right after returning from the execution
engine, thus, addressing the second TODO.

Epic: None

Release note: None
  • Loading branch information
yuzefovich committed Mar 17, 2023
1 parent 85c6e38 commit 4d835ec
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 42 deletions.
1 change: 1 addition & 0 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2763,6 +2763,7 @@ func (ex *connExecutor) execCopyIn(
ctx, cmd.Conn, cmd.Stmt, &p, txnOpt, ex.state.mon, ex.implicitTxn(),
// execInsertPlan
func(ctx context.Context, p *planner, res RestrictedCommandResult) error {
defer p.curPlan.close(ctx)
_, err := ex.execWithDistSQLEngine(ctx, p, tree.RowsAffected, res, DistributionTypeNone, nil /* progressAtomic */)
return err
},
Expand Down
29 changes: 3 additions & 26 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,19 +701,6 @@ func (ex *connExecutor) execStmtInOpenState(
// placed. There are also sequencing point after every stage of
// constraint checks and cascading actions at the _end_ of a
// statement's execution.
//
// TODO(knz): At the time of this writing CockroachDB performs
// cascading actions and the corresponding FK existence checks
// interleaved with mutations. This is incorrect; the correct
// behavior, as described in issue
// https://github.com/cockroachdb/cockroach/issues/33475, is to
// execute cascading actions no earlier than after all the "main
// effects" of the current statement (including all its CTEs) have
// completed. There should be a sequence point between the end of
// the main execution and the start of the cascading actions, as
// well as in-between very stage of cascading actions.
// This TODO can be removed when the cascading code is reorganized
// accordingly and the missing call to Step() is introduced.
if err := ex.state.mu.txn.Step(ctx); err != nil {
return makeErrEvent(err)
}
Expand Down Expand Up @@ -1259,8 +1246,6 @@ func (ex *connExecutor) dispatchToExecutionEngine(
// Prepare the plan. Note, the error is processed below. Everything
// between here and there needs to happen even if there's an error.
err := ex.makeExecPlan(ctx, planner)
// We'll be closing the plan manually below after execution; this
// defer is a catch-all in case some other return path is taken.
defer planner.curPlan.close(ctx)

// include gist in error reports
Expand Down Expand Up @@ -1342,17 +1327,6 @@ func (ex *connExecutor) dispatchToExecutionEngine(
panic(err)
}

// We need to set the "exec done" flag early because
// curPlan.close(), which will need to observe it, may be closed
// during execution (PlanAndRun).
//
// TODO(knz): This is a mis-design. Andrei says "it's OK if
// execution closes the plan" but it transfers responsibility to
// run any "finalizers" on the plan (including plan sampling for
// stats) to the execution engine. That's a lot of responsibility
// to transfer! It would be better if this responsibility remained
// around here.
planner.curPlan.flags.Set(planFlagExecDone)
if !planner.ExecCfg().Codec.ForSystemTenant() {
planner.curPlan.flags.Set(planFlagTenant)
}
Expand Down Expand Up @@ -1736,6 +1710,8 @@ func (s *topLevelQueryStats) add(other *topLevelQueryStats) {
// runs it.
// If an error is returned, the connection needs to stop processing queries.
// Query execution errors are written to res; they are not returned.
// NB: the plan (in planner.curPlan) is not closed, so it is the caller's
// responsibility to do so.
func (ex *connExecutor) execWithDistSQLEngine(
ctx context.Context,
planner *planner,
Expand All @@ -1744,6 +1720,7 @@ func (ex *connExecutor) execWithDistSQLEngine(
distribute DistributionType,
progressAtomic *uint64,
) (topLevelQueryStats, error) {
defer planner.curPlan.savePlanInfo()
recv := MakeDistSQLReceiver(
ctx, res, stmtType,
ex.server.cfg.RangeDescriptorCache,
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -1541,6 +1541,8 @@ func getFinishedSetupFn(planner *planner) (finishedSetupFn func(flowinfra.Flow),
// PlanAndRunAll combines running the main query, subqueries and cascades/checks.
// If an error is returned, the connection needs to stop processing queries.
// Query execution errors stored in recv; they are not returned.
// NB: the plan (in planner.curPlan) is not closed, so it is the caller's
// responsibility to do so.
func (dsp *DistSQLPlanner) PlanAndRunAll(
ctx context.Context,
evalCtx *extendedEvalContext,
Expand All @@ -1549,7 +1551,6 @@ func (dsp *DistSQLPlanner) PlanAndRunAll(
recv *DistSQLReceiver,
evalCtxFactory func(usedConcurrently bool) *extendedEvalContext,
) error {
defer planner.curPlan.close(ctx)
if len(planner.curPlan.subqueryPlans) != 0 {
// Create a separate memory account for the results of the subqueries.
// Note that we intentionally defer the closure of the account until we
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/explain_tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ func TestPlanToTreeAndPlanToString(t *testing.T) {
if err := p.makeOptimizerPlan(ctx); err != nil {
t.Fatal(err)
}
p.curPlan.flags.Set(planFlagExecDone)
p.curPlan.close(ctx)
defer p.curPlan.close(ctx)
p.curPlan.savePlanInfo()
if d.Cmd == "plan-string" {
ob := ih.emitExplainAnalyzePlanToOutputBuilder(
explain.Flags{Verbose: true, ShowTypes: true},
Expand Down
17 changes: 4 additions & 13 deletions pkg/sql/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,16 +472,10 @@ func (p *planTop) init(stmt *Statement, instrumentation *instrumentationHelper)
}
}

// close ensures that the plan's resources have been deallocated.
func (p *planTop) close(ctx context.Context) {
if p.flags.IsSet(planFlagExecDone) {
p.savePlanInfo(ctx)
}
p.planComponents.close(ctx)
}

// savePlanInfo uses p.explainPlan to populate the plan string and/or tree.
func (p *planTop) savePlanInfo(ctx context.Context) {
// savePlanInfo updates the instrumentationHelper with information about how the
// plan was executed.
// NB: should only be called _after_ the execution of the plan has completed.
func (p *planTop) savePlanInfo() {
vectorized := p.flags.IsSet(planFlagVectorized)
distribution := physicalplan.LocalPlan
if p.flags.IsSet(planFlagFullyDistributed) {
Expand Down Expand Up @@ -589,9 +583,6 @@ const (
// planFlagNotDistributed is set if the query execution is not distributed.
planFlagNotDistributed

// planFlagExecDone marks that execution has been completed.
planFlagExecDone

// planFlagImplicitTxn marks that the plan was run inside of an implicit
// transaction.
planFlagImplicitTxn
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func (dsp *DistSQLPlanner) Exec(
func (dsp *DistSQLPlanner) ExecLocalAll(
ctx context.Context, execCfg ExecutorConfig, p *planner, res RestrictedCommandResult,
) error {
defer p.curPlan.close(ctx)
recv := MakeDistSQLReceiver(
ctx,
res,
Expand Down

0 comments on commit 4d835ec

Please sign in to comment.