Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: address a couple of old TODOs #98823

Merged
merged 1 commit into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2763,6 +2763,7 @@ func (ex *connExecutor) execCopyIn(
ctx, cmd.Conn, cmd.Stmt, &p, txnOpt, ex.state.mon, ex.implicitTxn(),
// execInsertPlan
func(ctx context.Context, p *planner, res RestrictedCommandResult) error {
defer p.curPlan.close(ctx)
_, err := ex.execWithDistSQLEngine(ctx, p, tree.RowsAffected, res, DistributionTypeNone, nil /* progressAtomic */)
return err
},
Expand Down
29 changes: 3 additions & 26 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,19 +701,6 @@ func (ex *connExecutor) execStmtInOpenState(
// placed. There are also sequencing point after every stage of
// constraint checks and cascading actions at the _end_ of a
// statement's execution.
//
// TODO(knz): At the time of this writing CockroachDB performs
// cascading actions and the corresponding FK existence checks
// interleaved with mutations. This is incorrect; the correct
// behavior, as described in issue
// https://github.com/cockroachdb/cockroach/issues/33475, is to
// execute cascading actions no earlier than after all the "main
// effects" of the current statement (including all its CTEs) have
// completed. There should be a sequence point between the end of
// the main execution and the start of the cascading actions, as
// well as in-between very stage of cascading actions.
// This TODO can be removed when the cascading code is reorganized
// accordingly and the missing call to Step() is introduced.
if err := ex.state.mu.txn.Step(ctx); err != nil {
return makeErrEvent(err)
}
Expand Down Expand Up @@ -1259,8 +1246,6 @@ func (ex *connExecutor) dispatchToExecutionEngine(
// Prepare the plan. Note, the error is processed below. Everything
// between here and there needs to happen even if there's an error.
err := ex.makeExecPlan(ctx, planner)
// We'll be closing the plan manually below after execution; this
// defer is a catch-all in case some other return path is taken.
defer planner.curPlan.close(ctx)

// include gist in error reports
Expand Down Expand Up @@ -1342,17 +1327,6 @@ func (ex *connExecutor) dispatchToExecutionEngine(
panic(err)
}

// We need to set the "exec done" flag early because
// curPlan.close(), which will need to observe it, may be closed
// during execution (PlanAndRun).
//
// TODO(knz): This is a mis-design. Andrei says "it's OK if
// execution closes the plan" but it transfers responsibility to
// run any "finalizers" on the plan (including plan sampling for
// stats) to the execution engine. That's a lot of responsibility
// to transfer! It would be better if this responsibility remained
// around here.
planner.curPlan.flags.Set(planFlagExecDone)
if !planner.ExecCfg().Codec.ForSystemTenant() {
planner.curPlan.flags.Set(planFlagTenant)
}
Expand Down Expand Up @@ -1736,6 +1710,8 @@ func (s *topLevelQueryStats) add(other *topLevelQueryStats) {
// runs it.
// If an error is returned, the connection needs to stop processing queries.
// Query execution errors are written to res; they are not returned.
// NB: the plan (in planner.curPlan) is not closed, so it is the caller's
// responsibility to do so.
func (ex *connExecutor) execWithDistSQLEngine(
ctx context.Context,
planner *planner,
Expand All @@ -1744,6 +1720,7 @@ func (ex *connExecutor) execWithDistSQLEngine(
distribute DistributionType,
progressAtomic *uint64,
) (topLevelQueryStats, error) {
defer planner.curPlan.savePlanInfo()
recv := MakeDistSQLReceiver(
ctx, res, stmtType,
ex.server.cfg.RangeDescriptorCache,
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -1541,6 +1541,8 @@ func getFinishedSetupFn(planner *planner) (finishedSetupFn func(flowinfra.Flow),
// PlanAndRunAll combines running the main query, subqueries and cascades/checks.
// If an error is returned, the connection needs to stop processing queries.
// Query execution errors stored in recv; they are not returned.
// NB: the plan (in planner.curPlan) is not closed, so it is the caller's
// responsibility to do so.
func (dsp *DistSQLPlanner) PlanAndRunAll(
ctx context.Context,
evalCtx *extendedEvalContext,
Expand All @@ -1549,7 +1551,6 @@ func (dsp *DistSQLPlanner) PlanAndRunAll(
recv *DistSQLReceiver,
evalCtxFactory func(usedConcurrently bool) *extendedEvalContext,
) error {
defer planner.curPlan.close(ctx)
if len(planner.curPlan.subqueryPlans) != 0 {
// Create a separate memory account for the results of the subqueries.
// Note that we intentionally defer the closure of the account until we
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/explain_tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ func TestPlanToTreeAndPlanToString(t *testing.T) {
if err := p.makeOptimizerPlan(ctx); err != nil {
t.Fatal(err)
}
p.curPlan.flags.Set(planFlagExecDone)
p.curPlan.close(ctx)
defer p.curPlan.close(ctx)
p.curPlan.savePlanInfo()
if d.Cmd == "plan-string" {
ob := ih.emitExplainAnalyzePlanToOutputBuilder(
explain.Flags{Verbose: true, ShowTypes: true},
Expand Down
17 changes: 4 additions & 13 deletions pkg/sql/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,16 +472,10 @@ func (p *planTop) init(stmt *Statement, instrumentation *instrumentationHelper)
}
}

// close ensures that the plan's resources have been deallocated.
func (p *planTop) close(ctx context.Context) {
if p.flags.IsSet(planFlagExecDone) {
p.savePlanInfo(ctx)
}
p.planComponents.close(ctx)
}

// savePlanInfo uses p.explainPlan to populate the plan string and/or tree.
func (p *planTop) savePlanInfo(ctx context.Context) {
// savePlanInfo updates the instrumentationHelper with information about how the
// plan was executed.
// NB: should only be called _after_ the execution of the plan has completed.
func (p *planTop) savePlanInfo() {
vectorized := p.flags.IsSet(planFlagVectorized)
distribution := physicalplan.LocalPlan
if p.flags.IsSet(planFlagFullyDistributed) {
Expand Down Expand Up @@ -589,9 +583,6 @@ const (
// planFlagNotDistributed is set if the query execution is not distributed.
planFlagNotDistributed

// planFlagExecDone marks that execution has been completed.
planFlagExecDone

// planFlagImplicitTxn marks that the plan was run inside of an implicit
// transaction.
planFlagImplicitTxn
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func (dsp *DistSQLPlanner) Exec(
func (dsp *DistSQLPlanner) ExecLocalAll(
ctx context.Context, execCfg ExecutorConfig, p *planner, res RestrictedCommandResult,
) error {
defer p.curPlan.close(ctx)
recv := MakeDistSQLReceiver(
ctx,
res,
Expand Down