From e03be7bac6a450eb9ad69fd5cb05966195885215 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Wed, 9 Nov 2022 04:07:16 +0000 Subject: [PATCH] sql: attempt to reduce duplication in legacy schema changer txn management These functions can all be defined in terms of `txnWithExecutor`. It was confusing that they were not. Release note: None --- pkg/sql/backfill.go | 30 +++++++++++++++++------------- pkg/sql/schema_changer.go | 27 +++++++++++++++++---------- 2 files changed, 34 insertions(+), 23 deletions(-) diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index 2ea22bfe9fd8..20967cf71261 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -172,13 +172,17 @@ func (sc *SchemaChanger) makeFixedTimestampRunner(readAsOf hlc.Timestamp) histor func (sc *SchemaChanger) makeFixedTimestampInternalExecRunner( readAsOf hlc.Timestamp, ) descs.HistoricalInternalExecTxnRunner { - return descs.NewHistoricalInternalExecTxnRunner(readAsOf, func(ctx context.Context, retryable descs.InternalExecFn) error { - return sc.fixedTimestampTxn(ctx, readAsOf, func( - ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, + return descs.NewHistoricalInternalExecTxnRunner(readAsOf, func( + ctx context.Context, retryable descs.InternalExecFn, + ) error { + return sc.fixedTimestampTxnWithExecutor(ctx, readAsOf, func( + ctx context.Context, + txn *kv.Txn, + _ *sessiondata.SessionData, + descriptors *descs.Collection, + ie sqlutil.InternalExecutor, ) error { - // We need to re-create the evalCtx since the txn may retry. - ie := sc.ieFactory.NewInternalExecutor(NewFakeSessionData(sc.execCfg.SV())) - return retryable(ctx, txn, ie, nil /* descriptors */) + return retryable(ctx, txn, ie, descriptors) }) }) } @@ -188,10 +192,10 @@ func (sc *SchemaChanger) fixedTimestampTxn( readAsOf hlc.Timestamp, retryable func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error, ) error { - return sc.txn(ctx, func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error { - if err := txn.SetFixedTimestamp(ctx, readAsOf); err != nil { - return err - } + return sc.fixedTimestampTxnWithExecutor(ctx, readAsOf, func( + ctx context.Context, txn *kv.Txn, _ *sessiondata.SessionData, + descriptors *descs.Collection, _ sqlutil.InternalExecutor, + ) error { return retryable(ctx, txn, descriptors) }) } @@ -207,9 +211,9 @@ func (sc *SchemaChanger) fixedTimestampTxnWithExecutor( ie sqlutil.InternalExecutor, ) error, ) error { - sd := NewFakeSessionData(sc.execCfg.SV()) - return sc.txnWithExecutor(ctx, sd, func( - ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ie sqlutil.InternalExecutor, + return sc.txnWithExecutor(ctx, func( + ctx context.Context, txn *kv.Txn, sd *sessiondata.SessionData, + descriptors *descs.Collection, ie sqlutil.InternalExecutor, ) error { if err := txn.SetFixedTimestamp(ctx, readAsOf); err != nil { return err diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 5863cc4b82cb..5209bce40f3a 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -2456,27 +2456,34 @@ func (*SchemaChangerTestingKnobs) ModuleTestingKnobs() {} func (sc *SchemaChanger) txn( ctx context.Context, f func(context.Context, *kv.Txn, *descs.Collection) error, ) error { - if fn := sc.testingKnobs.RunBeforeDescTxn; fn != nil { - if err := fn(sc.job.ID()); err != nil { - return err - } - } - return sc.execCfg.InternalExecutorFactory.DescsTxn(ctx, sc.db, f) + return sc.txnWithExecutor(ctx, func( + ctx context.Context, txn *kv.Txn, _ *sessiondata.SessionData, + collection *descs.Collection, _ sqlutil.InternalExecutor, + ) error { + return f(ctx, txn, collection) + }) } // txnWithExecutor is to run internal executor within a txn. func (sc *SchemaChanger) txnWithExecutor( ctx context.Context, - sd *sessiondata.SessionData, - f func(context.Context, *kv.Txn, *descs.Collection, sqlutil.InternalExecutor) error, + f func( + context.Context, *kv.Txn, *sessiondata.SessionData, + *descs.Collection, sqlutil.InternalExecutor, + ) error, ) error { if fn := sc.testingKnobs.RunBeforeDescTxn; fn != nil { if err := fn(sc.job.ID()); err != nil { return err } } - return sc.execCfg.InternalExecutorFactory. - DescsTxnWithExecutor(ctx, sc.db, sd, f) + sd := NewFakeSessionData(sc.execCfg.SV()) + return sc.execCfg.InternalExecutorFactory.DescsTxnWithExecutor(ctx, sc.db, sd, func( + ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, + ie sqlutil.InternalExecutor, + ) error { + return f(ctx, txn, sd, descriptors, ie) + }) } // createSchemaChangeEvalCtx creates an extendedEvalContext() to be used for backfills.