Skip to content

Commit

Permalink
jobs: replace the ie in Registry.createJobsInBatchWithTxn
Browse files Browse the repository at this point in the history
When creating a job, we should use an internal executor bound to the outer txn
and related metadata.

We also found that in this case, if a user changes the timezone and then triggers
a schema change job, the job's creation time (i.e. value createdcolumn in the
system.jobs) will automatically switch to the timestamp in the updated timezone,
rather than UTC. This is because we have the created using now() in the SQL
statement. We now change it to be inserted always with UTC time.

Informs: cockroachdb#91004
Informs: cockroachdb#91718

Release note: None
  • Loading branch information
ZhouXing19 committed Nov 11, 2022
1 parent 45abe91 commit 7c43111
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 30 deletions.
17 changes: 8 additions & 9 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1512,13 +1512,12 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
// public.
// TODO (lucy): Ideally we'd just create the database in the public state in
// the first place, as a special case.
publishDescriptors := func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) (err error) {
return r.publishDescriptors(ctx, txn, p.ExecCfg(), p.User(), descsCol, details, nil)
publishDescriptors := func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ie sqlutil.InternalExecutor) (err error) {
return r.publishDescriptors(ctx, txn, ie, p.ExecCfg(), p.User(), descsCol, details, nil)
}
if err := sql.DescsTxn(ctx, r.execCfg, publishDescriptors); err != nil {
if err := r.execCfg.InternalExecutorFactory.DescsTxnWithExecutor(ctx, r.execCfg.DB, nil /* sd */, publishDescriptors); err != nil {
return err
}

p.ExecCfg().JobRegistry.NotifyToAdoptJobs()
if err := p.ExecCfg().JobRegistry.CheckPausepoint(
"restore.after_publishing_descriptors"); err != nil {
Expand Down Expand Up @@ -1645,11 +1644,10 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
devalidateIndexes = bad
}

publishDescriptors := func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) (err error) {
err = r.publishDescriptors(ctx, txn, p.ExecCfg(), p.User(), descsCol, details, devalidateIndexes)
return err
publishDescriptors := func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ie sqlutil.InternalExecutor) (err error) {
return r.publishDescriptors(ctx, txn, ie, p.ExecCfg(), p.User(), descsCol, details, devalidateIndexes)
}
if err := sql.DescsTxn(ctx, p.ExecCfg(), publishDescriptors); err != nil {
if err := r.execCfg.InternalExecutorFactory.DescsTxnWithExecutor(ctx, r.execCfg.DB, nil /* sd */, publishDescriptors); err != nil {
return err
}

Expand Down Expand Up @@ -1956,6 +1954,7 @@ func insertStats(
func (r *restoreResumer) publishDescriptors(
ctx context.Context,
txn *kv.Txn,
ie sqlutil.InternalExecutor,
execCfg *sql.ExecutorConfig,
user username.SQLUsername,
descsCol *descs.Collection,
Expand Down Expand Up @@ -1997,7 +1996,7 @@ func (r *restoreResumer) publishDescriptors(
// Go through the descriptors and find any declarative schema change jobs
// affecting them.
if err := scbackup.CreateDeclarativeSchemaChangeJobs(
ctx, r.execCfg.JobRegistry, txn, all,
ctx, r.execCfg.JobRegistry, txn, ie, all,
); err != nil {
return err
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,21 @@ func (r *Registry) maybeDumpTrace(
// could have been canceled at this point.
dumpCtx, _ := r.makeCtx()

ieNotBoundToTxn := r.internalExecutorFactory.MakeInternalExecutorWithoutTxn()

// If the job has failed, and the dump mode is set to anything
// except noDump, then we should dump the trace.
// The string comparison is unfortunate but is used to differentiate a job
// that has failed from a job that has been canceled.
if jobErr != nil && !HasErrJobCanceled(jobErr) && resumerCtx.Err() == nil {
r.td.Dump(dumpCtx, strconv.Itoa(int(jobID)), traceID, r.ex)
r.td.Dump(dumpCtx, strconv.Itoa(int(jobID)), traceID, ieNotBoundToTxn)
return
}

// If the dump mode is set to `dumpOnStop` then we should dump the
// trace when the job is any of paused, canceled, succeeded or failed state.
if dumpMode == int64(dumpOnStop) {
r.td.Dump(dumpCtx, strconv.Itoa(int(jobID)), traceID, r.ex)
r.td.Dump(dumpCtx, strconv.Itoa(int(jobID)), traceID, ieNotBoundToTxn)
}
}

Expand Down
22 changes: 14 additions & 8 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type Registry struct {
stopper *stop.Stopper
db *kv.DB
ex sqlutil.InternalExecutor
ief sqlutil.InternalExecutorFactory
clock *hlc.Clock
clusterID *base.ClusterIDContainer
nodeID *base.SQLIDContainer
Expand Down Expand Up @@ -187,6 +188,7 @@ func MakeRegistry(
clock *hlc.Clock,
db *kv.DB,
ex sqlutil.InternalExecutor,
ief sqlutil.InternalExecutorFactory,
clusterID *base.ClusterIDContainer,
nodeID *base.SQLIDContainer,
sqlInstance sqlliveness.Instance,
Expand All @@ -204,6 +206,7 @@ func MakeRegistry(
clock: clock,
db: db,
ex: ex,
ief: ief,
clusterID: clusterID,
nodeID: nodeID,
sqlInstance: sqlInstance,
Expand Down Expand Up @@ -327,7 +330,7 @@ func (r *Registry) makeProgress(record *Record) jobspb.Progress {
// one job to create, otherwise the function returns an error. The function
// returns the IDs of the jobs created.
func (r *Registry) CreateJobsWithTxn(
ctx context.Context, txn *kv.Txn, records []*Record,
ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor, records []*Record,
) ([]jobspb.JobID, error) {
created := make([]jobspb.JobID, 0, len(records))
for toCreate := records; len(toCreate) > 0; {
Expand All @@ -336,7 +339,7 @@ func (r *Registry) CreateJobsWithTxn(
if batchSize > maxBatchSize {
batchSize = maxBatchSize
}
createdInBatch, err := r.createJobsInBatchWithTxn(ctx, txn, toCreate[:batchSize])
createdInBatch, err := r.createJobsInBatchWithTxn(ctx, txn, ie, toCreate[:batchSize])
if err != nil {
return nil, err
}
Expand All @@ -349,7 +352,7 @@ func (r *Registry) CreateJobsWithTxn(
// createJobsInBatchWithTxn creates a batch of jobs from given records in a
// transaction.
func (r *Registry) createJobsInBatchWithTxn(
ctx context.Context, txn *kv.Txn, records []*Record,
ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor, records []*Record,
) ([]jobspb.JobID, error) {
s, err := r.sqlInstance.Session(ctx)
if err != nil {
Expand All @@ -364,11 +367,13 @@ func (r *Registry) createJobsInBatchWithTxn(
if err != nil {
return nil, err
}
if _, err = r.ex.Exec(
ctx, "job-rows-batch-insert", txn, stmt, args...,
); err != nil {
_, err = ie.ExecEx(
ctx, "job-rows-batch-insert", txn, sessiondata.InternalExecutorOverride{User: username.RootUserName()}, stmt, args...,
)
if err != nil {
return nil, err
}

return jobIDs, nil
}

Expand All @@ -378,8 +383,8 @@ func (r *Registry) batchJobInsertStmt(
ctx context.Context, sessionID sqlliveness.SessionID, records []*Record, modifiedMicros int64,
) (string, []interface{}, []jobspb.JobID, error) {
instanceID := r.ID()
const numColumns = 6
columns := [numColumns]string{`id`, `status`, `payload`, `progress`, `claim_session_id`, `claim_instance_id`}
const numColumns = 7
columns := [numColumns]string{`id`, `created`, `status`, `payload`, `progress`, `claim_session_id`, `claim_instance_id`}
marshalPanic := func(m protoutil.Message) []byte {
data, err := protoutil.Marshal(m)
if err != nil {
Expand All @@ -389,6 +394,7 @@ func (r *Registry) batchJobInsertStmt(
}
valueFns := map[string]func(*Record) interface{}{
`id`: func(rec *Record) interface{} { return rec.JobID },
`created`: func(rec *Record) interface{} { return r.clock.Now().GoTime().Format(time.RFC3339Nano) },
`status`: func(rec *Record) interface{} { return StatusRunning },
`claim_session_id`: func(rec *Record) interface{} { return sessionID.UnsafeBytes() },
`claim_instance_id`: func(rec *Record) interface{} { return instanceID },
Expand Down
13 changes: 7 additions & 6 deletions pkg/jobs/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ func TestBatchJobsCreation(t *testing.T) {

ctx := context.Background()
s, sqlDB, kvDB := serverutils.StartServer(t, args)
ief := s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory)
tdb := sqlutils.MakeSQLRunner(sqlDB)
defer s.Stopper().Stop(ctx)
r := s.JobRegistry().(*Registry)
Expand All @@ -334,9 +335,9 @@ func TestBatchJobsCreation(t *testing.T) {
}
// Create jobs in a batch.
var jobIDs []jobspb.JobID
require.NoError(t, kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
require.NoError(t, ief.TxnWithExecutor(ctx, kvDB, nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
var err error
jobIDs, err = r.CreateJobsWithTxn(ctx, txn, records)
jobIDs, err = r.CreateJobsWithTxn(ctx, txn, ie, records)
return err
}))
require.Equal(t, len(jobIDs), test.batchSize)
Expand Down Expand Up @@ -941,6 +942,8 @@ func TestRunWithoutLoop(t *testing.T) {
Settings: settings,
})

ief := s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory)

defer s.Stopper().Stop(ctx)
r := s.JobRegistry().(*Registry)
var records []*Record
Expand All @@ -955,10 +958,8 @@ func TestRunWithoutLoop(t *testing.T) {
})
}
var jobIDs []jobspb.JobID
require.NoError(t, kvDB.Txn(ctx, func(
ctx context.Context, txn *kv.Txn,
) (err error) {
jobIDs, err = r.CreateJobsWithTxn(ctx, txn, records)
require.NoError(t, ief.TxnWithExecutor(ctx, kvDB, nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) (err error) {
jobIDs, err = r.CreateJobsWithTxn(ctx, txn, ie, records)
return err
}))
require.EqualError(t, r.Run(
Expand Down
1 change: 1 addition & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
cfg.clock,
cfg.db,
cfg.circularInternalExecutor,
cfg.internalExecutorFactory,
cfg.rpcContext.LogicalClusterID,
cfg.nodeIDContainer,
cfg.sqlLivenessProvider,
Expand Down
10 changes: 8 additions & 2 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
Expand All @@ -44,6 +45,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
Expand Down Expand Up @@ -1033,8 +1035,12 @@ func (ex *connExecutor) createJobs(ctx context.Context) error {
for _, record := range ex.extraTxnState.schemaChangeJobRecords {
records = append(records, record)
}
jobIDs, err := ex.server.cfg.JobRegistry.CreateJobsWithTxn(ctx, ex.planner.Txn(), records)
if err != nil {
var jobIDs []jobspb.JobID
var err error
if err := ex.planner.WithInternalExecutor(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
jobIDs, err = ex.server.cfg.JobRegistry.CreateJobsWithTxn(ctx, ex.planner.Txn(), ie, records)
return err
}); err != nil {
return err
}
ex.planner.extendedEvalCtx.Jobs.add(jobIDs...)
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,9 @@ func (ie *InternalExecutor) commitTxn(ctx context.Context) error {
if err != nil {
return errors.Wrap(err, "cannot create conn executor to commit txn")
}
// TODO(janexing): is this correct?
ex.planner.txn = ie.extraTxnState.txn

defer ex.close(ctx, externalTxnClose)
return ex.commitSQLTransactionInternal(ctx)
}
Expand Down Expand Up @@ -1060,7 +1063,7 @@ func (ie *InternalExecutor) checkIfStmtIsAllowed(stmt tree.Statement, txn *kv.Tx
// deprecated.
func (ie *InternalExecutor) checkIfTxnIsConsistent(txn *kv.Txn) error {
if txn == nil && ie.extraTxnState != nil {
return errors.New("the current internal executor was contructed with" +
return errors.New("the current internal executor was contructed with " +
"a txn. To use an internal executor without a txn, call " +
"sqlutil.InternalExecutorFactory.MakeInternalExecutorWithoutTxn()")
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/schemachanger/scbackup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//pkg/sql/schemachanger/scexec",
"//pkg/sql/schemachanger/scpb",
"//pkg/sql/schemachanger/screl",
"//pkg/sql/sqlutil",
],
)

Expand Down
9 changes: 7 additions & 2 deletions pkg/sql/schemachanger/scbackup/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/screl"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
)

// CreateDeclarativeSchemaChangeJobs is called during the last phase of a
Expand All @@ -31,7 +32,11 @@ import (
// It should only be called for backups which do not restore the jobs table
// directly.
func CreateDeclarativeSchemaChangeJobs(
ctx context.Context, registry *jobs.Registry, txn *kv.Txn, allMut nstree.Catalog,
ctx context.Context,
registry *jobs.Registry,
txn *kv.Txn,
ie sqlutil.InternalExecutor,
allMut nstree.Catalog,
) error {
byJobID := make(map[catpb.JobID][]catalog.MutableDescriptor)
_ = allMut.ForEachDescriptorEntry(func(d catalog.Descriptor) error {
Expand Down Expand Up @@ -71,6 +76,6 @@ func CreateDeclarativeSchemaChangeJobs(
runningStatus,
))
}
_, err := registry.CreateJobsWithTxn(ctx, txn, records)
_, err := registry.CreateJobsWithTxn(ctx, txn, ie, records)
return err
}

0 comments on commit 7c43111

Please sign in to comment.