Skip to content

Commit

Permalink
jobs: ensure that async index dropping is needed
Browse files Browse the repository at this point in the history
We drop indexes through an async schema changer which eventually gets
removes an index with `ClearRange`. When this happens the index drop job
is not marked as completed, but instead marked as waiting for GC.
However, if we DROP the table that the index references in the same
transaction, then we mark the index drop job as done and we don't want
to override this.

This PR ensures and tests that we don't override already successful
index dropping jobs that would be created as explained above.

Release note: None
  • Loading branch information
pbardea committed Sep 10, 2019
1 parent 14341a1 commit a46de66
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 11 deletions.
12 changes: 12 additions & 0 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,18 @@ func (j *Job) CheckStatus(ctx context.Context) error {
})
}

// CheckTerminalStatus returns true if the job is in a terminal status.
func (j *Job) CheckTerminalStatus(ctx context.Context) bool {
err := j.Update(ctx, func(_ *client.Txn, md JobMetadata, _ *JobUpdater) error {
if !md.Status.Terminal() {
return &InvalidStatusError{md.ID, md.Status, "checking that job status is success", md.Payload.Error}
}
return nil
})

return err == nil
}

// RunningStatus updates the detailed status of a job currently in progress.
// It sets the job's RunningStatus field to the value returned by runningStatusFn
// and persists runningStatusFn's modifications to the job's details, if any.
Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/schema_change_in_txn
Original file line number Diff line number Diff line change
Expand Up @@ -1720,3 +1720,25 @@ DROP INDEX b_idx CASCADE;

statement ok
COMMIT;

# Test that deleting an index on a table that gets dropped in the same
# transaction is allowed.
subtest delete_index_and_table_in_txn

statement ok
CREATE TABLE people (id INT PRIMARY KEY, name STRING);

statement ok
CREATE INDEX people_name_index ON people (name);

statement ok
BEGIN;

statement ok
DROP INDEX people@people_name_index;

statement ok
DROP TABLE people;

statement ok
COMMIT;
29 changes: 18 additions & 11 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1315,17 +1315,24 @@ func (sc *SchemaChanger) done(ctx context.Context) (*sqlbase.ImmutableTableDescr
}

descs, err := sc.leaseMgr.PublishMultiple(ctx, tableIDsToUpdate, update, func(txn *client.Txn) error {
if jobSucceeded {
if err := sc.job.WithTxn(txn).Succeeded(ctx, jobs.NoopFn); err != nil {
return errors.Wrapf(err,
"failed to mark job %d as successful", errors.Safe(*sc.job.ID()))
}
} else {
if err := sc.job.WithTxn(txn).RunningStatus(ctx, func(ctx context.Context, details jobspb.Details) (jobs.RunningStatus, error) {
return RunningStatusWaitingGC, nil
}); err != nil {
return errors.Wrapf(err,
"failed to update running status of job %d", errors.Safe(*sc.job.ID()))
// If the job already has a terminal status, we shouldn't need to update
// its status again. One way this may happen is when a table is dropped
// all jobs that mutate that table are marked successful. So if is a job
// that mutates a table that is dropped in the same txn, then it will
// already be successful. These jobs don't need their status to be updated.
if !sc.job.WithTxn(txn).CheckTerminalStatus(ctx) {
if jobSucceeded {
if err := sc.job.WithTxn(txn).Succeeded(ctx, jobs.NoopFn); err != nil {
return errors.Wrapf(err,
"failed to mark job %d as successful", errors.Safe(*sc.job.ID()))
}
} else {
if err := sc.job.WithTxn(txn).RunningStatus(ctx, func(ctx context.Context, details jobspb.Details) (jobs.RunningStatus, error) {
return RunningStatusWaitingGC, nil
}); err != nil {
return errors.Wrapf(err,
"failed to update running status of job %d", errors.Safe(*sc.job.ID()))
}
}
}

Expand Down

0 comments on commit a46de66

Please sign in to comment.