diff --git a/pkg/cmd/roachtest/schemachange.go b/pkg/cmd/roachtest/schemachange.go index e4d4e090f9d3..d2eaac70af92 100644 --- a/pkg/cmd/roachtest/schemachange.go +++ b/pkg/cmd/roachtest/schemachange.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" @@ -324,7 +325,7 @@ func makeIndexAddTpccTest(numNodes, warehouses int, length time.Duration) testSp } func registerSchemaChangeCancelIndexTPCC1000(r *registry) { - r.Add(makeIndexAddRollbackTpccTest(5, 1000, time.Hour*2)) + r.Add(makeIndexAddRollbackTpccTest(5, 1000, time.Minute*30)) } // Creates an index and job, returning the job ID and a notify channel for @@ -332,37 +333,36 @@ func registerSchemaChangeCancelIndexTPCC1000(r *registry) { func createIndexAddJob( ctx context.Context, c *cluster, prefix string, ) (int64, <-chan error, error) { - setup := func(db *gosql.DB) (txn *gosql.Tx, jobID int64, err error) { - txn, err = db.Begin() - if err != nil { - return nil, 0, err - } - if _, err = txn.Exec(`CREATE INDEX foo ON tpcc.order (o_carrier_id);`); err != nil { - return nil, 0, err - } - if err = txn.QueryRow(`SELECT job_id FROM [SHOW JOBS] ORDER BY created DESC LIMIT 1`).Scan(&jobID); err != nil { - return nil, 0, err - } - - return txn, jobID, nil - } - conn := c.Conn(ctx, 1) - - txn, jobID, err := setup(conn) + defer conn.Close() + oldJobID, err := jobutils.QueryRecentJobID(conn, 0) if err != nil { - conn.Close() return 0, nil, err } - c.l.Printf("%s: created index add job %d\n", prefix, jobID) - + // CREATE INDEX in a separate goroutine because it takes a lot of time. notifyCommit := make(chan error) go func() { - defer conn.Close() - notifyCommit <- txn.Commit() + newConn := c.Conn(ctx, 1) + defer newConn.Close() + _, err := newConn.Exec(`CREATE INDEX foo ON tpcc.order (o_carrier_id);`) + notifyCommit <- err }() + // Find the job id for the CREATE INDEX. + var jobID int64 + for r := retry.Start(base.DefaultRetryOptions()); r.Next(); { + jobID, err = jobutils.QueryRecentJobID(conn, 0) + if err != nil { + return 0, nil, err + } + if jobID != oldJobID { + break + } + } + + c.l.Printf("%s: created index add job %d\n", prefix, jobID) + return jobID, notifyCommit, nil } @@ -396,7 +396,7 @@ func makeIndexAddRollbackTpccTest(numNodes, warehouses int, length time.Duration ctx, conn, createID, - 0.25, + 0.12, retryOpts, ); err != nil { return err @@ -407,11 +407,11 @@ func makeIndexAddRollbackTpccTest(numNodes, warehouses int, length time.Duration } c.l.Printf("%s: canceled job %d\n", prefix, createID) - if schemaChangeErr := <-notifyCommit; !testutils.IsError(schemaChangeErr, "job canceled") { - return errors.Errorf("expected 'job canceled' error, but got %+v", schemaChangeErr) + if err := <-notifyCommit; !testutils.IsError(err, "job canceled") { + c.l.Printf("%s: canceled job %d, got: %+v\n", prefix, createID, err) + return errors.Errorf("expected 'job canceled' error, but got %+v", err) } - c.l.Printf("%s: rollback began %d\n", prefix, createID) rollbackID, err := jobutils.QueryRecentJobID(conn, 0) if err != nil { return err @@ -419,8 +419,10 @@ func makeIndexAddRollbackTpccTest(numNodes, warehouses int, length time.Duration return errors.Errorf("no rollback job created") } - rollbackTimeoutSecs := int((gcTTL + (15 * time.Minute)).Seconds()) - retryOpts = retry.Options{InitialBackoff: 5 * time.Second, Multiplier: 1, MaxRetries: rollbackTimeoutSecs / 5} + c.l.Printf("%s: rollback for %d began: %d\n", prefix, createID, rollbackID) + + backoff := 30 * time.Second + retryOpts = retry.Options{InitialBackoff: backoff, MaxBackoff: backoff, Multiplier: 1, MaxRetries: int(length / backoff)} return jobutils.WaitForStatus(ctx, conn, rollbackID, jobs.StatusSucceeded, retryOpts) }, Duration: length,