Skip to content

Commit

Permalink
sql: fix InternalExecutor bug
Browse files Browse the repository at this point in the history
Any time we use WithSyntheticDescriptors, it has to be on an unshared internal
executor. The move in cockroachdb#71246 to not have an internal executor hanging around
for the current session hurts here.

Fixes cockroachdb#73788

Release note: None
  • Loading branch information
ajwerner authored and GustasValdavicius committed Jan 4, 2022
1 parent 1258c3f commit f1373bf
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 34 deletions.
15 changes: 11 additions & 4 deletions pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,8 @@ func (n *alterTableNode) startExec(params runParams) error {
"constraint %q in the middle of being added, try again later", t.Constraint)
}
if err := validateCheckInTxn(
params.ctx, &params.p.semaCtx, params.ExecCfg().InternalExecutor, params.SessionData(), n.tableDesc, params.EvalContext().Txn, ck.Expr,
params.ctx, &params.p.semaCtx, params.ExecCfg().InternalExecutorFactory,
params.SessionData(), n.tableDesc, params.EvalContext().Txn, ck.Expr,
); err != nil {
return err
}
Expand All @@ -822,8 +823,12 @@ func (n *alterTableNode) startExec(params runParams) error {
"constraint %q in the middle of being added, try again later", t.Constraint)
}
if err := validateFkInTxn(
params.ctx, params.p.LeaseMgr(), params.ExecCfg().InternalExecutor,
n.tableDesc, params.EvalContext().Txn, name, params.EvalContext().Codec,
params.ctx, params.p.LeaseMgr(),
params.ExecCfg().InternalExecutorFactory,
params.p.SessionData(),
n.tableDesc,
params.EvalContext().Txn,
name, params.EvalContext().Codec,
); err != nil {
return err
}
Expand All @@ -846,7 +851,9 @@ func (n *alterTableNode) startExec(params runParams) error {
"constraint %q in the middle of being added, try again later", t.Constraint)
}
if err := validateUniqueWithoutIndexConstraintInTxn(
params.ctx, params.ExecCfg().InternalExecutor, n.tableDesc, params.EvalContext().Txn, name,
params.ctx, params.ExecCfg().InternalExecutorFactory(
params.ctx, params.SessionData(),
), n.tableDesc, params.EvalContext().Txn, name,
); err != nil {
return err
}
Expand Down
31 changes: 16 additions & 15 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (sc *SchemaChanger) makeFixedTimestampRunner(readAsOf hlc.Timestamp) histor
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
// We need to re-create the evalCtx since the txn may retry.
evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, readAsOf, sc.ieFactory, descriptors)
evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, readAsOf, descriptors)
return retryable(ctx, txn, &evalCtx)
})
}
Expand All @@ -159,9 +159,7 @@ func (sc *SchemaChanger) makeFixedTimestampInternalExecRunner(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
// We need to re-create the evalCtx since the txn may retry.
ie := createSchemaChangeEvalCtx(
ctx, sc.execCfg, readAsOf, sc.ieFactory, descriptors,
).SchemaChangeInternalExecutor
ie := sc.ieFactory(ctx, NewFakeSessionData(sc.execCfg.SV()))
return retryable(ctx, txn, ie)
})
}
Expand Down Expand Up @@ -704,21 +702,21 @@ func (sc *SchemaChanger) validateConstraints(
defer func() { collection.ReleaseAll(ctx) }()
if c.IsCheck() {
if err := validateCheckInTxn(
ctx, &semaCtx, evalCtx.SchemaChangeInternalExecutor, evalCtx.SessionData(), desc, txn, c.Check().Expr,
ctx, &semaCtx, sc.ieFactory, evalCtx.SessionData(), desc, txn, c.Check().Expr,
); err != nil {
return err
}
} else if c.IsForeignKey() {
if err := validateFkInTxn(ctx, sc.leaseMgr, evalCtx.SchemaChangeInternalExecutor, desc, txn, c.GetName(), evalCtx.Codec); err != nil {
if err := validateFkInTxn(ctx, sc.leaseMgr, sc.ieFactory, evalCtx.SessionData(), desc, txn, c.GetName(), evalCtx.Codec); err != nil {
return err
}
} else if c.IsUniqueWithoutIndex() {
if err := validateUniqueWithoutIndexConstraintInTxn(ctx, evalCtx.SchemaChangeInternalExecutor, desc, txn, c.GetName()); err != nil {
if err := validateUniqueWithoutIndexConstraintInTxn(ctx, sc.ieFactory(ctx, evalCtx.SessionData()), desc, txn, c.GetName()); err != nil {
return err
}
} else if c.IsNotNull() {
if err := validateCheckInTxn(
ctx, &semaCtx, evalCtx.SchemaChangeInternalExecutor, evalCtx.SessionData(), desc, txn, c.Check().Expr,
ctx, &semaCtx, sc.ieFactory, evalCtx.SessionData(), desc, txn, c.Check().Expr,
); err != nil {
// TODO (lucy): This should distinguish between constraint
// validation errors and other types of unexpected errors, and
Expand Down Expand Up @@ -1018,7 +1016,7 @@ func (sc *SchemaChanger) distIndexBackfill(
if err != nil {
return err
}
evalCtx = createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.ReadTimestamp(), sc.ieFactory, descriptors)
evalCtx = createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.ReadTimestamp(), descriptors)
planCtx = sc.distSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn,
true /* distribute */)
indexBatchSize := indexBackfillBatchSize.Get(&sc.execCfg.Settings.SV)
Expand Down Expand Up @@ -1292,7 +1290,7 @@ func (sc *SchemaChanger) distColumnBackfill(
return nil
}
cbw := MetadataCallbackWriter{rowResultWriter: &errOnlyResultWriter{}, fn: metaFn}
evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.ReadTimestamp(), sc.ieFactory, descriptors)
evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.ReadTimestamp(), descriptors)
recv := MakeDistSQLReceiver(
ctx,
&cbw,
Expand Down Expand Up @@ -2117,7 +2115,8 @@ func runSchemaChangesInTxn(
check := &c.ConstraintToUpdateDesc().Check
if check.Validity == descpb.ConstraintValidity_Validating {
if err := validateCheckInTxn(
ctx, &planner.semaCtx, planner.ExecCfg().InternalExecutor, planner.SessionData(), tableDesc, planner.txn, check.Expr,
ctx, &planner.semaCtx, planner.ExecCfg().InternalExecutorFactory,
planner.SessionData(), tableDesc, planner.txn, check.Expr,
); err != nil {
return err
}
Expand Down Expand Up @@ -2214,7 +2213,7 @@ func runSchemaChangesInTxn(
func validateCheckInTxn(
ctx context.Context,
semaCtx *tree.SemaContext,
ie *InternalExecutor,
ief sqlutil.SessionBoundInternalExecutorFactory,
sessionData *sessiondata.SessionData,
tableDesc *tabledesc.Mutable,
txn *kv.Txn,
Expand All @@ -2224,6 +2223,7 @@ func validateCheckInTxn(
if tableDesc.Version > tableDesc.ClusterVersion.Version {
syntheticDescs = append(syntheticDescs, tableDesc)
}
ie := ief(ctx, sessionData)
return ie.WithSyntheticDescriptors(syntheticDescs, func() error {
return validateCheckExpr(ctx, semaCtx, sessionData, checkExpr, tableDesc, ie, txn)
})
Expand All @@ -2244,7 +2244,8 @@ func validateCheckInTxn(
func validateFkInTxn(
ctx context.Context,
leaseMgr *lease.Manager,
ie *InternalExecutor,
ief sqlutil.SessionBoundInternalExecutorFactory,
sd *sessiondata.SessionData,
tableDesc *tabledesc.Mutable,
txn *kv.Txn,
fkName string,
Expand All @@ -2266,7 +2267,7 @@ func validateFkInTxn(
if fk == nil {
return errors.AssertionFailedf("foreign key %s does not exist", fkName)
}

ie := ief(ctx, sd)
return ie.WithSyntheticDescriptors(syntheticDescs, func() error {
return validateForeignKey(ctx, tableDesc, fk, ie, txn, codec)
})
Expand All @@ -2286,7 +2287,7 @@ func validateFkInTxn(
// reuse an existing kv.Txn safely.
func validateUniqueWithoutIndexConstraintInTxn(
ctx context.Context,
ie *InternalExecutor,
ie sqlutil.InternalExecutor,
tableDesc *tabledesc.Mutable,
txn *kv.Txn,
constraintName string,
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func validateCheckExpr(
sessionData *sessiondata.SessionData,
exprStr string,
tableDesc *tabledesc.Mutable,
ie *InternalExecutor,
ie sqlutil.InternalExecutor,
txn *kv.Txn,
) error {
expr, err := schemaexpr.FormatExprForDisplay(ctx, tableDesc, exprStr, semaCtx, sessionData, tree.FmtParsable)
Expand Down Expand Up @@ -237,7 +237,7 @@ func validateForeignKey(
ctx context.Context,
srcTable *tabledesc.Mutable,
fk *descpb.ForeignKeyConstraint,
ie *InternalExecutor,
ie sqlutil.InternalExecutor,
txn *kv.Txn,
codec keys.SQLCodec,
) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/index_backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (ib *IndexBackfillPlanner) plan(
if err := DescsTxn(ctx, ib.execCfg, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
evalCtx = createSchemaChangeEvalCtx(ctx, ib.execCfg, nowTimestamp, ib.ieFactory, descriptors)
evalCtx = createSchemaChangeEvalCtx(ctx, ib.execCfg, nowTimestamp, descriptors)
planCtx = ib.execCfg.DistSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn,
true /* distribute */)
// TODO(ajwerner): Adopt util.ConstantWithMetamorphicTestRange for the
Expand Down
2 changes: 0 additions & 2 deletions pkg/sql/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ type extendedEvalContext struct {
indexUsageStats *idxusage.LocalIndexUsageStats

SchemaChangerState *SchemaChangerState

SchemaChangeInternalExecutor *InternalExecutor
}

// copyFromExecCfg copies relevant fields from an ExecutorConfig.
Expand Down
14 changes: 4 additions & 10 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1938,24 +1938,18 @@ func (sc *SchemaChanger) txn(
// used in the surrounding SQL session, so session tracing is unable
// to capture schema change activity.
func createSchemaChangeEvalCtx(
ctx context.Context,
execCfg *ExecutorConfig,
ts hlc.Timestamp,
ieFactory sqlutil.SessionBoundInternalExecutorFactory,
descriptors *descs.Collection,
ctx context.Context, execCfg *ExecutorConfig, ts hlc.Timestamp, descriptors *descs.Collection,
) extendedEvalContext {

sd := NewFakeSessionData(execCfg.SV())
ie := ieFactory(ctx, sd)

evalCtx := extendedEvalContext{
// Make a session tracing object on-the-fly. This is OK
// because it sets "enabled: false" and thus none of the
// other fields are used.
Tracing: &SessionTracing{},
ExecCfg: execCfg,
Descs: descriptors,
SchemaChangeInternalExecutor: ie.(*InternalExecutor),
Tracing: &SessionTracing{},
ExecCfg: execCfg,
Descs: descriptors,
EvalContext: tree.EvalContext{
SessionDataStack: sessiondata.NewStack(sd),
// TODO(andrei): This is wrong (just like on the main code path on
Expand Down

0 comments on commit f1373bf

Please sign in to comment.