Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: fix InternalExecutor bug #74188

Merged
merged 1 commit into from
Jan 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,8 @@ func (n *alterTableNode) startExec(params runParams) error {
"constraint %q in the middle of being added, try again later", t.Constraint)
}
if err := validateCheckInTxn(
params.ctx, &params.p.semaCtx, params.ExecCfg().InternalExecutor, params.SessionData(), n.tableDesc, params.EvalContext().Txn, ck.Expr,
params.ctx, &params.p.semaCtx, params.ExecCfg().InternalExecutorFactory,
params.SessionData(), n.tableDesc, params.EvalContext().Txn, ck.Expr,
); err != nil {
return err
}
Expand All @@ -831,8 +832,12 @@ func (n *alterTableNode) startExec(params runParams) error {
"constraint %q in the middle of being added, try again later", t.Constraint)
}
if err := validateFkInTxn(
params.ctx, params.p.LeaseMgr(), params.ExecCfg().InternalExecutor,
n.tableDesc, params.EvalContext().Txn, name, params.EvalContext().Codec,
params.ctx, params.p.LeaseMgr(),
params.ExecCfg().InternalExecutorFactory,
params.p.SessionData(),
n.tableDesc,
params.EvalContext().Txn,
name, params.EvalContext().Codec,
); err != nil {
return err
}
Expand All @@ -855,7 +860,9 @@ func (n *alterTableNode) startExec(params runParams) error {
"constraint %q in the middle of being added, try again later", t.Constraint)
}
if err := validateUniqueWithoutIndexConstraintInTxn(
params.ctx, params.ExecCfg().InternalExecutor, n.tableDesc, params.EvalContext().Txn, name,
params.ctx, params.ExecCfg().InternalExecutorFactory(
params.ctx, params.SessionData(),
), n.tableDesc, params.EvalContext().Txn, name,
); err != nil {
return err
}
Expand Down
31 changes: 16 additions & 15 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (sc *SchemaChanger) makeFixedTimestampRunner(readAsOf hlc.Timestamp) histor
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
// We need to re-create the evalCtx since the txn may retry.
evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, readAsOf, sc.ieFactory, descriptors)
evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, readAsOf, descriptors)
return retryable(ctx, txn, &evalCtx)
})
}
Expand All @@ -168,9 +168,7 @@ func (sc *SchemaChanger) makeFixedTimestampInternalExecRunner(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
// We need to re-create the evalCtx since the txn may retry.
ie := createSchemaChangeEvalCtx(
ctx, sc.execCfg, readAsOf, sc.ieFactory, descriptors,
).SchemaChangeInternalExecutor
ie := sc.ieFactory(ctx, NewFakeSessionData(sc.execCfg.SV()))
return retryable(ctx, txn, ie)
})
}
Expand Down Expand Up @@ -713,21 +711,21 @@ func (sc *SchemaChanger) validateConstraints(
defer func() { collection.ReleaseAll(ctx) }()
if c.IsCheck() {
if err := validateCheckInTxn(
ctx, &semaCtx, evalCtx.SchemaChangeInternalExecutor, evalCtx.SessionData(), desc, txn, c.Check().Expr,
ctx, &semaCtx, sc.ieFactory, evalCtx.SessionData(), desc, txn, c.Check().Expr,
); err != nil {
return err
}
} else if c.IsForeignKey() {
if err := validateFkInTxn(ctx, sc.leaseMgr, evalCtx.SchemaChangeInternalExecutor, desc, txn, c.GetName(), evalCtx.Codec); err != nil {
if err := validateFkInTxn(ctx, sc.leaseMgr, sc.ieFactory, evalCtx.SessionData(), desc, txn, c.GetName(), evalCtx.Codec); err != nil {
return err
}
} else if c.IsUniqueWithoutIndex() {
if err := validateUniqueWithoutIndexConstraintInTxn(ctx, evalCtx.SchemaChangeInternalExecutor, desc, txn, c.GetName()); err != nil {
if err := validateUniqueWithoutIndexConstraintInTxn(ctx, sc.ieFactory(ctx, evalCtx.SessionData()), desc, txn, c.GetName()); err != nil {
return err
}
} else if c.IsNotNull() {
if err := validateCheckInTxn(
ctx, &semaCtx, evalCtx.SchemaChangeInternalExecutor, evalCtx.SessionData(), desc, txn, c.Check().Expr,
ctx, &semaCtx, sc.ieFactory, evalCtx.SessionData(), desc, txn, c.Check().Expr,
); err != nil {
// TODO (lucy): This should distinguish between constraint
// validation errors and other types of unexpected errors, and
Expand Down Expand Up @@ -991,7 +989,7 @@ func (sc *SchemaChanger) distIndexBackfill(
if err != nil {
return err
}
evalCtx = createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.ReadTimestamp(), sc.ieFactory, descriptors)
evalCtx = createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.ReadTimestamp(), descriptors)
planCtx = sc.distSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn,
true /* distribute */)
indexBatchSize := indexBackfillBatchSize.Get(&sc.execCfg.Settings.SV)
Expand Down Expand Up @@ -1265,7 +1263,7 @@ func (sc *SchemaChanger) distColumnBackfill(
return nil
}
cbw := MetadataCallbackWriter{rowResultWriter: &errOnlyResultWriter{}, fn: metaFn}
evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.ReadTimestamp(), sc.ieFactory, descriptors)
evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.ReadTimestamp(), descriptors)
recv := MakeDistSQLReceiver(
ctx,
&cbw,
Expand Down Expand Up @@ -2090,7 +2088,8 @@ func runSchemaChangesInTxn(
check := &c.ConstraintToUpdateDesc().Check
if check.Validity == descpb.ConstraintValidity_Validating {
if err := validateCheckInTxn(
ctx, &planner.semaCtx, planner.ExecCfg().InternalExecutor, planner.SessionData(), tableDesc, planner.txn, check.Expr,
ctx, &planner.semaCtx, planner.ExecCfg().InternalExecutorFactory,
planner.SessionData(), tableDesc, planner.txn, check.Expr,
); err != nil {
return err
}
Expand Down Expand Up @@ -2187,7 +2186,7 @@ func runSchemaChangesInTxn(
func validateCheckInTxn(
ctx context.Context,
semaCtx *tree.SemaContext,
ie *InternalExecutor,
ief sqlutil.SessionBoundInternalExecutorFactory,
sessionData *sessiondata.SessionData,
tableDesc *tabledesc.Mutable,
txn *kv.Txn,
Expand All @@ -2197,6 +2196,7 @@ func validateCheckInTxn(
if tableDesc.Version > tableDesc.ClusterVersion.Version {
syntheticDescs = append(syntheticDescs, tableDesc)
}
ie := ief(ctx, sessionData)
return ie.WithSyntheticDescriptors(syntheticDescs, func() error {
return validateCheckExpr(ctx, semaCtx, sessionData, checkExpr, tableDesc, ie, txn)
})
Expand All @@ -2217,7 +2217,8 @@ func validateCheckInTxn(
func validateFkInTxn(
ctx context.Context,
leaseMgr *lease.Manager,
ie *InternalExecutor,
ief sqlutil.SessionBoundInternalExecutorFactory,
sd *sessiondata.SessionData,
tableDesc *tabledesc.Mutable,
txn *kv.Txn,
fkName string,
Expand All @@ -2239,7 +2240,7 @@ func validateFkInTxn(
if fk == nil {
return errors.AssertionFailedf("foreign key %s does not exist", fkName)
}

ie := ief(ctx, sd)
return ie.WithSyntheticDescriptors(syntheticDescs, func() error {
return validateForeignKey(ctx, tableDesc, fk, ie, txn, codec)
})
Expand All @@ -2259,7 +2260,7 @@ func validateFkInTxn(
// reuse an existing kv.Txn safely.
func validateUniqueWithoutIndexConstraintInTxn(
ctx context.Context,
ie *InternalExecutor,
ie sqlutil.InternalExecutor,
tableDesc *tabledesc.Mutable,
txn *kv.Txn,
constraintName string,
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func validateCheckExpr(
sessionData *sessiondata.SessionData,
exprStr string,
tableDesc *tabledesc.Mutable,
ie *InternalExecutor,
ie sqlutil.InternalExecutor,
txn *kv.Txn,
) error {
expr, err := schemaexpr.FormatExprForDisplay(ctx, tableDesc, exprStr, semaCtx, sessionData, tree.FmtParsable)
Expand Down Expand Up @@ -237,7 +237,7 @@ func validateForeignKey(
ctx context.Context,
srcTable *tabledesc.Mutable,
fk *descpb.ForeignKeyConstraint,
ie *InternalExecutor,
ie sqlutil.InternalExecutor,
txn *kv.Txn,
codec keys.SQLCodec,
) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/index_backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (ib *IndexBackfillPlanner) plan(
if err := DescsTxn(ctx, ib.execCfg, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
evalCtx = createSchemaChangeEvalCtx(ctx, ib.execCfg, nowTimestamp, ib.ieFactory, descriptors)
evalCtx = createSchemaChangeEvalCtx(ctx, ib.execCfg, nowTimestamp, descriptors)
planCtx = ib.execCfg.DistSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn,
true /* distribute */)
// TODO(ajwerner): Adopt util.ConstantWithMetamorphicTestRange for the
Expand Down
2 changes: 0 additions & 2 deletions pkg/sql/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ type extendedEvalContext struct {
indexUsageStats *idxusage.LocalIndexUsageStats

SchemaChangerState *SchemaChangerState

SchemaChangeInternalExecutor *InternalExecutor
}

// copyFromExecCfg copies relevant fields from an ExecutorConfig.
Expand Down
14 changes: 4 additions & 10 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1941,24 +1941,18 @@ func (sc *SchemaChanger) txn(
// used in the surrounding SQL session, so session tracing is unable
// to capture schema change activity.
func createSchemaChangeEvalCtx(
ctx context.Context,
execCfg *ExecutorConfig,
ts hlc.Timestamp,
ieFactory sqlutil.SessionBoundInternalExecutorFactory,
descriptors *descs.Collection,
ctx context.Context, execCfg *ExecutorConfig, ts hlc.Timestamp, descriptors *descs.Collection,
) extendedEvalContext {

sd := NewFakeSessionData(execCfg.SV())
ie := ieFactory(ctx, sd)

evalCtx := extendedEvalContext{
// Make a session tracing object on-the-fly. This is OK
// because it sets "enabled: false" and thus none of the
// other fields are used.
Tracing: &SessionTracing{},
ExecCfg: execCfg,
Descs: descriptors,
SchemaChangeInternalExecutor: ie.(*InternalExecutor),
Tracing: &SessionTracing{},
ExecCfg: execCfg,
Descs: descriptors,
EvalContext: tree.EvalContext{
SessionDataStack: sessiondata.NewStack(sd),
// TODO(andrei): This is wrong (just like on the main code path on
Expand Down