Skip to content

Commit

Permalink
roachtest: fix schema change cancel test
Browse files Browse the repository at this point in the history
Speed up the test by reducing the fraction of data that is processed
by the backfill before issuing a cancel.

Make the test easier to understand.

Wait enough time for the rollback to complete processing.

fixes #33335

Release note: None
  • Loading branch information
vivekmenezes committed Jan 16, 2019
1 parent 31ac673 commit c04d2fa
Showing 1 changed file with 31 additions and 29 deletions.
60 changes: 31 additions & 29 deletions pkg/cmd/roachtest/schemachange.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
Expand Down Expand Up @@ -324,45 +325,44 @@ func makeIndexAddTpccTest(numNodes, warehouses int, length time.Duration) testSp
}

func registerSchemaChangeCancelIndexTPCC1000(r *registry) {
r.Add(makeIndexAddRollbackTpccTest(5, 1000, time.Hour*2))
r.Add(makeIndexAddRollbackTpccTest(5, 1000, time.Minute*30))
}

// Creates an index and job, returning the job ID and a notify channel for
// when the schema change completes or rolls back.
func createIndexAddJob(
ctx context.Context, c *cluster, prefix string,
) (int64, <-chan error, error) {
setup := func(db *gosql.DB) (txn *gosql.Tx, jobID int64, err error) {
txn, err = db.Begin()
if err != nil {
return nil, 0, err
}
if _, err = txn.Exec(`CREATE INDEX foo ON tpcc.order (o_carrier_id);`); err != nil {
return nil, 0, err
}
if err = txn.QueryRow(`SELECT job_id FROM [SHOW JOBS] ORDER BY created DESC LIMIT 1`).Scan(&jobID); err != nil {
return nil, 0, err
}

return txn, jobID, nil
}

conn := c.Conn(ctx, 1)

txn, jobID, err := setup(conn)
defer conn.Close()
oldJobID, err := jobutils.QueryRecentJobID(conn, 0)
if err != nil {
conn.Close()
return 0, nil, err
}

c.l.Printf("%s: created index add job %d\n", prefix, jobID)

// CREATE INDEX in a separate goroutine because it takes a lot of time.
notifyCommit := make(chan error)
go func() {
defer conn.Close()
notifyCommit <- txn.Commit()
newConn := c.Conn(ctx, 1)
defer newConn.Close()
_, err := newConn.Exec(`CREATE INDEX foo ON tpcc.order (o_carrier_id);`)
notifyCommit <- err
}()

// Find the job id for the CREATE INDEX.
var jobID int64
for r := retry.Start(base.DefaultRetryOptions()); r.Next(); {
jobID, err = jobutils.QueryRecentJobID(conn, 0)
if err != nil {
return 0, nil, err
}
if jobID != oldJobID {
break
}
}

c.l.Printf("%s: created index add job %d\n", prefix, jobID)

return jobID, notifyCommit, nil
}

Expand Down Expand Up @@ -396,7 +396,7 @@ func makeIndexAddRollbackTpccTest(numNodes, warehouses int, length time.Duration
ctx,
conn,
createID,
0.25,
0.12,
retryOpts,
); err != nil {
return err
Expand All @@ -407,20 +407,22 @@ func makeIndexAddRollbackTpccTest(numNodes, warehouses int, length time.Duration
}
c.l.Printf("%s: canceled job %d\n", prefix, createID)

if schemaChangeErr := <-notifyCommit; !testutils.IsError(schemaChangeErr, "job canceled") {
return errors.Errorf("expected 'job canceled' error, but got %+v", schemaChangeErr)
if err := <-notifyCommit; !testutils.IsError(err, "job canceled") {
c.l.Printf("%s: canceled job %d, got: %+v\n", prefix, createID, err)
return errors.Errorf("expected 'job canceled' error, but got %+v", err)
}

c.l.Printf("%s: rollback began %d\n", prefix, createID)
rollbackID, err := jobutils.QueryRecentJobID(conn, 0)
if err != nil {
return err
} else if rollbackID == createID {
return errors.Errorf("no rollback job created")
}

rollbackTimeoutSecs := int((gcTTL + (15 * time.Minute)).Seconds())
retryOpts = retry.Options{InitialBackoff: 5 * time.Second, Multiplier: 1, MaxRetries: rollbackTimeoutSecs / 5}
c.l.Printf("%s: rollback for %d began: %d\n", prefix, createID, rollbackID)

backoff := 30 * time.Second
retryOpts = retry.Options{InitialBackoff: backoff, MaxBackoff: backoff, Multiplier: 1, MaxRetries: int(length / backoff)}
return jobutils.WaitForStatus(ctx, conn, rollbackID, jobs.StatusSucceeded, retryOpts)
},
Duration: length,
Expand Down

0 comments on commit c04d2fa

Please sign in to comment.