Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backupccl: update restore/nodeshutdown tests to use new roachtest framework #98510

Merged
merged 1 commit into from
Mar 14, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 34 additions & 68 deletions pkg/cmd/roachtest/tests/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/httputil"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload/histogram"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -235,91 +234,56 @@ func (dul *DiskUsageLogger) Runner(ctx context.Context) error {
}
}
func registerRestoreNodeShutdown(r registry.Registry) {
sp := restoreSpecs{
hardware: makeHardwareSpecs(hardwareSpecs{}),
backup: makeBackupSpecs(
backupSpecs{workload: tpceRestore{customers: 5000},
version: "v22.2.1"}),
timeout: 1 * time.Hour,
}

makeRestoreStarter := func(ctx context.Context, t test.Test, c cluster.Cluster, gatewayNode int) jobStarter {
return func(c cluster.Cluster, t test.Test) (string, error) {
t.L().Printf("connecting to gateway")
gatewayDB := c.Conn(ctx, t.L(), gatewayNode)
defer gatewayDB.Close()

t.L().Printf("creating bank database")
if _, err := gatewayDB.Exec("CREATE DATABASE bank"); err != nil {
return "", err
}

errCh := make(chan error, 1)
go func() {
defer close(errCh)

// 10 GiB restore.
restoreQuery := `RESTORE bank.bank FROM
'gs://cockroach-fixtures/workload/bank/version=1.0.0,payload-bytes=100,ranges=10,rows=10000000,seed=1/bank?AUTH=implicit'`

t.L().Printf("starting to run the restore job")
if _, err := gatewayDB.Exec(restoreQuery); err != nil {
errCh <- err
}
t.L().Printf("done running restore job")
}()

// Wait for the job.
retryOpts := retry.Options{
MaxRetries: 50,
InitialBackoff: 1 * time.Second,
MaxBackoff: 5 * time.Second,
}
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
var jobCount int
if err := gatewayDB.QueryRowContext(ctx, "SELECT count(*) FROM [SHOW JOBS] WHERE job_type = 'RESTORE'").Scan(&jobCount); err != nil {
return "", err
}

select {
case err := <-errCh:
// We got an error when starting the job.
return "", err
default:
}

if jobCount == 0 {
t.L().Printf("waiting for restore job")
} else if jobCount == 1 {
t.L().Printf("found restore job")
break
} else {
t.L().Printf("found multiple restore jobs -- erroring")
return "", errors.New("unexpectedly found multiple restore jobs")
}
}

var jobID string
if err := gatewayDB.QueryRowContext(ctx, "SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'RESTORE'").Scan(&jobID); err != nil {
return "", errors.Wrap(err, "querying the job ID")
}
return jobID, nil
sp.getRuntimeSpecs(ctx, t, c)
jobID, err := sp.runDetached(ctx, "DATABASE tpce", gatewayNode)
return fmt.Sprintf("%d", jobID), err
}
}

r.Add(registry.TestSpec{
Name: "restore/nodeShutdown/worker",
Owner: registry.OwnerDisasterRecovery,
Cluster: r.MakeClusterSpec(4),
Cluster: sp.hardware.makeClusterSpecs(r),
Timeout: sp.timeout,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
gatewayNode := 2
nodeToShutdown := 3

if c.Spec().Cloud != sp.backup.cloud {
// For now, only run the test on the cloud provider that also stores the backup.
t.Skip("test configured to run on %s", sp.backup.cloud)
}
c.Put(ctx, t.Cockroach(), "./cockroach")
c.Start(ctx, t.L(), option.DefaultStartOptsNoBackups(), install.MakeClusterSettings())

jobSurvivesNodeShutdown(ctx, t, c, nodeToShutdown, makeRestoreStarter(ctx, t, c, gatewayNode))
},
})

r.Add(registry.TestSpec{
Name: "restore/nodeShutdown/coordinator",
Owner: registry.OwnerDisasterRecovery,
Cluster: r.MakeClusterSpec(4),
Cluster: sp.hardware.makeClusterSpecs(r),
Timeout: sp.timeout,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {

gatewayNode := 2
nodeToShutdown := 2

if c.Spec().Cloud != sp.backup.cloud {
// For now, only run the test on the cloud provider that also stores the backup.
t.Skip("test configured to run on %s", sp.backup.cloud)
}

c.Put(ctx, t.Cockroach(), "./cockroach")
c.Start(ctx, t.L(), option.DefaultStartOptsNoBackups(), install.MakeClusterSettings())

Expand Down Expand Up @@ -446,7 +410,7 @@ func registerRestore(r registry.Registry) {
defer close(jobIDCh)
t.Status(`running restore`)
metricCollector := withPauseSpecs.initRestorePerfMetrics(ctx, durationGauge)
jobID, err := withPauseSpecs.runDetached(ctx, "DATABASE tpce")
jobID, err := withPauseSpecs.runDetached(ctx, "DATABASE tpce", 1)
require.NoError(t, err)
jobIDCh <- jobID

Expand Down Expand Up @@ -840,14 +804,16 @@ func (sp *restoreSpecs) run(ctx context.Context, target string) error {
return sp.c.RunE(ctx, sp.c.Node(1), sp.restoreCmd(target, ""))
}

func (sp *restoreSpecs) runDetached(ctx context.Context, target string) (jobspb.JobID, error) {
if err := sp.c.RunE(ctx, sp.c.Node(1), sp.restoreCmd(target, "WITH DETACHED")); err != nil {
func (sp *restoreSpecs) runDetached(
ctx context.Context, target string, node int,
) (jobspb.JobID, error) {
if err := sp.c.RunE(ctx, sp.c.Node(node), sp.restoreCmd(target, "WITH DETACHED")); err != nil {
return 0, err
}

db, err := sp.c.ConnE(ctx, sp.t.L(), sp.c.Node(1)[0])
db, err := sp.c.ConnE(ctx, sp.t.L(), sp.c.Node(node)[0])
if err != nil {
return 0, errors.Wrap(err, "failed to connect to node 1; running restore detached")
return 0, errors.Wrapf(err, "failed to connect to node %d; running restore detached", node)
}
var jobID jobspb.JobID
if err := db.QueryRow(`SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'RESTORE'`).Scan(&jobID); err != nil {
msbutler marked this conversation as resolved.
Show resolved Hide resolved
Expand Down