Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
125302: roachtest: small ldr roachtest r=stevendanna a=stevendanna

This is a small roachtest for LDR. It would be nice to share more infrastructure with the cluster to cluster tests, but I am hoping that happens a bit more organically as we develop the corpus of LDR tests.

Epic: none
Release note: None

Co-authored-by: Steven Danna <danna@cockroachlabs.com>
  • Loading branch information
craig[bot] and stevendanna committed Jun 23, 2024
2 parents de3b267 + 149c9c6 commit b56e743
Show file tree
Hide file tree
Showing 9 changed files with 444 additions and 114 deletions.
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ go_library(
"limit_capacity.go",
"liquibase.go",
"liquibase_blocklist.go",
"logical_data_replication.go",
"loss_of_quorum_recovery.go",
"many_splits.go",
"mismatched_locality.go",
Expand Down
42 changes: 7 additions & 35 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
"github.com/cockroachdb/cockroach/pkg/roachprod/vm"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload/debug"
Expand Down Expand Up @@ -3353,48 +3350,23 @@ func (cfc *changefeedCreator) Create() (int, error) {
}

type changefeedInfo struct {
status string
errMsg string
*jobRecord

startedTime time.Time
statementTime time.Time
highwaterTime time.Time
finishedTime time.Time
}

func (c *changefeedInfo) GetHighWater() time.Time { return c.highwaterTime }
func (c *changefeedInfo) GetFinishedTime() time.Time { return c.finishedTime }
func (c *changefeedInfo) GetStatus() string { return c.status }
func (c *changefeedInfo) GetError() string { return c.status }

var _ jobInfo = (*changefeedInfo)(nil)

func getChangefeedInfo(db *gosql.DB, jobID int) (*changefeedInfo, error) {
var status string
var payloadBytes []byte
var progressBytes []byte
if err := db.QueryRow(jobutils.InternalSystemJobsBaseQuery, jobID).Scan(&status, &payloadBytes, &progressBytes); err != nil {
return nil, err
}
var payload jobspb.Payload
if err := protoutil.Unmarshal(payloadBytes, &payload); err != nil {
return nil, err
}
var progress jobspb.Progress
if err := protoutil.Unmarshal(progressBytes, &progress); err != nil {
jr, err := getJobRecord(db, jobID)
if err != nil {
return nil, err
}
var highwaterTime time.Time
highwater := progress.GetHighWater()
if highwater != nil {
highwaterTime = highwater.GoTime()
}
return &changefeedInfo{
status: status,
errMsg: payload.Error,
startedTime: time.UnixMicro(payload.StartedMicros),
statementTime: payload.GetChangefeed().StatementTime.GoTime(),
highwaterTime: highwaterTime,
finishedTime: time.UnixMicro(payload.FinishedMicros),
jobRecord: jr,
startedTime: time.UnixMicro(jr.payload.StartedMicros),
statementTime: jr.payload.GetChangefeed().StatementTime.GoTime(),
}, nil
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/cmd/roachtest/tests/cdc_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func runCDCBenchScan(
return err
}

duration := info.finishedTime.Sub(info.startedTime)
duration := info.GetFinishedTime().Sub(info.startedTime)
rate := int64(float64(numRows) / duration.Seconds())
t.L().Printf("changefeed completed in %s (scanned %s rows per second)",
duration.Truncate(time.Second), humanize.Comma(rate))
Expand Down Expand Up @@ -474,15 +474,15 @@ func runCDCBenchWorkload(
switch jobs.Status(info.status) {
case jobs.StatusPending, jobs.StatusRunning:
doneValue := done.Load()
return doneValue != nil && info.highwaterTime.After(doneValue.(time.Time)), nil
return doneValue != nil && info.GetHighWater().After(doneValue.(time.Time)), nil
default:
return false, errors.Errorf("unexpected changefeed status %s", info.status)
}
})
if err != nil {
return err
}
t.L().Printf("changefeed watermark is %s", info.highwaterTime.Format(time.RFC3339))
t.L().Printf("changefeed watermark is %s", info.GetHighWater().Format(time.RFC3339))
return nil
})

Expand All @@ -494,13 +494,13 @@ func runCDCBenchWorkload(
info, err := waitForChangefeed(ctx, conn, jobID, t.L(), func(info changefeedInfo) (bool, error) {
switch jobs.Status(info.status) {
case jobs.StatusPending, jobs.StatusRunning:
return info.highwaterTime.After(now), nil
return info.GetHighWater().After(now), nil
default:
return false, errors.Errorf("unexpected changefeed status %s", info.status)
}
})
require.NoError(t, err)
t.L().Printf("changefeed watermark is %s", info.highwaterTime.Format(time.RFC3339))
t.L().Printf("changefeed watermark is %s", info.GetHighWater().Format(time.RFC3339))

} else {
t.L().Printf("control run, not starting changefeed")
Expand Down Expand Up @@ -540,7 +540,7 @@ func runCDCBenchWorkload(
return err
}
t.L().Printf("waiting for changefeed watermark to reach %s (lagging by %s)",
now.Format(time.RFC3339), now.Sub(info.highwaterTime).Truncate(time.Second))
now.Format(time.RFC3339), now.Sub(info.GetHighWater()).Truncate(time.Second))
}
return nil
})
Expand Down Expand Up @@ -588,8 +588,8 @@ func waitForChangefeed(
return changefeedInfo{}, errors.Wrapf(err, "failed %d attempts to get changefeed info", maxLoadJobAttempts)
}
continue
} else if info.errMsg != "" {
return changefeedInfo{}, errors.Errorf("changefeed error: %s", info.errMsg)
} else if info.GetError() != "" {
return changefeedInfo{}, errors.Errorf("changefeed error: %s", info.GetError())
}
if ok, err := f(*info); err != nil {
return changefeedInfo{}, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/roachtest/tests/cdc_filtering.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func checkCDCEvents[S any](
_, err := waitForChangefeed(ctx, conn, jobID, t.L(), func(info changefeedInfo) (bool, error) {
switch jobs.Status(info.status) {
case jobs.StatusPending, jobs.StatusRunning:
return info.highwaterTime.After(now), nil
return info.GetHighWater().After(now), nil
default:
return false, errors.Errorf("unexpected changefeed status %s", info.status)
}
Expand Down
114 changes: 62 additions & 52 deletions pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type clusterInfo struct {
ID int

// pgurl is a connection string to the system tenant
pgURL string
pgURL *url.URL

// db provides a connection to the system tenant
db *gosql.DB
Expand All @@ -74,6 +74,12 @@ type clusterInfo struct {
nodes option.NodeListOption
}

func (i *clusterInfo) PgURLForDatabase(database string) string {
uri := *i.pgURL
uri.Path = database
return uri.String()
}

type c2cSetup struct {
src *clusterInfo
dst *clusterInfo
Expand Down Expand Up @@ -669,7 +675,7 @@ func (rd *replicationDriver) preStreamingWorkload(ctx context.Context) {

func (rd *replicationDriver) startReplicationStream(ctx context.Context) int {
streamReplStmt := fmt.Sprintf("CREATE TENANT %q FROM REPLICATION OF %q ON '%s'",
rd.setup.dst.name, rd.setup.src.name, rd.setup.src.pgURL)
rd.setup.dst.name, rd.setup.src.name, rd.setup.src.pgURL.String())
rd.setup.dst.sysSQL.Exec(rd.t, streamReplStmt)
rd.replicationStartHook(ctx, rd)
return getIngestionJobID(rd.t, rd.setup.dst.sysSQL, rd.setup.dst.name)
Expand All @@ -681,16 +687,7 @@ func (rd *replicationDriver) runWorkload(ctx context.Context) error {
}

func (rd *replicationDriver) waitForReplicatedTime(ingestionJobID int, wait time.Duration) {
testutils.SucceedsWithin(rd.t, func() error {
info, err := getStreamIngestionJobInfo(rd.setup.dst.db, ingestionJobID)
if err != nil {
return err
}
if info.GetHighWater().IsZero() {
return errors.New("no replicated time")
}
return nil
}, wait)
waitForReplicatedTime(rd.t, ingestionJobID, rd.setup.dst.db, getStreamIngestionJobInfo, wait)
}

func (rd *replicationDriver) getWorkloadTimeout() time.Duration {
Expand Down Expand Up @@ -1692,51 +1689,29 @@ func getIngestionJobID(t test.Test, dstSQL *sqlutils.SQLRunner, dstTenantName st
return int(tenantInfo.PhysicalReplicationConsumerJobID)
}

type streamIngesitonJobInfo struct {
status string
errMsg string
replicatedTime hlc.Timestamp
finishedTime time.Time
type streamIngestionJobInfo struct {
*jobRecord
}

// GetHighWater returns the replicated time. The GetHighWater name is
// retained here as this is implementing the jobInfo interface used by
// the latency verifier.
func (c *streamIngesitonJobInfo) GetHighWater() time.Time {
if c.replicatedTime.IsEmpty() {
func (c *streamIngestionJobInfo) GetHighWater() time.Time {
replicatedTime := replicationutils.ReplicatedTimeFromProgress(&c.progress)
if replicatedTime.IsEmpty() {
return time.Time{}
}
return c.replicatedTime.GoTime()
return replicatedTime.GoTime()
}
func (c *streamIngesitonJobInfo) GetFinishedTime() time.Time { return c.finishedTime }
func (c *streamIngesitonJobInfo) GetStatus() string { return c.status }
func (c *streamIngesitonJobInfo) GetError() string { return c.status }

var _ jobInfo = (*streamIngesitonJobInfo)(nil)
var _ jobInfo = (*streamIngestionJobInfo)(nil)

func getStreamIngestionJobInfo(db *gosql.DB, jobID int) (jobInfo, error) {
var status string
var payloadBytes []byte
var progressBytes []byte
if err := db.QueryRow(
`SELECT status, payload, progress FROM crdb_internal.system_jobs WHERE id = $1`, jobID,
).Scan(&status, &payloadBytes, &progressBytes); err != nil {
return nil, err
}
var payload jobspb.Payload
if err := protoutil.Unmarshal(payloadBytes, &payload); err != nil {
return nil, err
}
var progress jobspb.Progress
if err := protoutil.Unmarshal(progressBytes, &progress); err != nil {
jr, err := getJobRecord(db, jobID)
if err != nil {
return nil, err
}
return &streamIngesitonJobInfo{
status: status,
errMsg: payload.Error,
replicatedTime: replicationutils.ReplicatedTimeFromProgress(&progress),
finishedTime: time.UnixMicro(payload.FinishedMicros),
}, nil
return &streamIngestionJobInfo{jr}, nil
}

func srcClusterSettings(t test.Test, db *sqlutils.SQLRunner) {
Expand Down Expand Up @@ -1770,40 +1745,75 @@ func overrideSrcAndDestTenantTTL(
destSQL.Exec(t, `ALTER RANGE tenants CONFIGURE ZONE USING gc.ttlseconds = $1`, overrideTTL.Seconds())
}

func waitForReplicatedTimeToReachTimestamp(
t testutils.TestFataler,
jobID int,
db *gosql.DB,
jf jobFetcher,
wait time.Duration,
target time.Time,
) {
testutils.SucceedsWithin(t, func() error {
info, err := jf(db, jobID)
if err != nil {
return err
}
if info.GetHighWater().Compare(target) < 0 {
return errors.Newf("replicated time %s not yet at %s", info.GetHighWater(), target)
}
return nil
}, wait)
}

func waitForReplicatedTime(
t testutils.TestFataler, jobID int, db *gosql.DB, jf jobFetcher, wait time.Duration,
) {
testutils.SucceedsWithin(t, func() error {
info, err := jf(db, jobID)
if err != nil {
return err
}
if info.GetHighWater().IsZero() {
return errors.New("no replicated time")
}
return nil
}, wait)
}

func copyPGCertsAndMakeURL(
ctx context.Context,
t test.Test,
c cluster.Cluster,
srcNode option.NodeListOption,
pgURLDir string,
urlString string,
) (string, error) {
) (*url.URL, error) {
pgURL, err := url.Parse(urlString)
if err != nil {
return "", err
return nil, err
}

tmpDir, err := os.MkdirTemp("", install.CockroachNodeCertsDir)
if err != nil {
return "", err
return nil, err
}
func() { _ = os.RemoveAll(tmpDir) }()

if err := c.Get(ctx, t.L(), pgURLDir, tmpDir, srcNode); err != nil {
return "", err
return nil, err
}

sslRootCert, err := os.ReadFile(filepath.Join(tmpDir, "ca.crt"))
if err != nil {
return "", err
return nil, err
}
sslClientCert, err := os.ReadFile(filepath.Join(tmpDir, fmt.Sprintf("client.%s.crt", install.DefaultUser)))
if err != nil {
return "", err
return nil, err
}
sslClientKey, err := os.ReadFile(filepath.Join(tmpDir, fmt.Sprintf("client.%s.key", install.DefaultUser)))
if err != nil {
return "", err
return nil, err
}

options := pgURL.Query()
Expand All @@ -1813,5 +1823,5 @@ func copyPGCertsAndMakeURL(
options.Set("sslcert", string(sslClientCert))
options.Set("sslkey", string(sslClientKey))
pgURL.RawQuery = options.Encode()
return pgURL.String(), nil
return pgURL, nil
}
39 changes: 39 additions & 0 deletions pkg/cmd/roachtest/tests/jobs_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,45 @@ func WaitForRunning(
}, maxWait)
}

type jobRecord struct {
status string
payload jobspb.Payload
progress jobspb.Progress
}

func (jr *jobRecord) GetHighWater() time.Time {
var highwaterTime time.Time
highwater := jr.progress.GetHighWater()
if highwater != nil {
highwaterTime = highwater.GoTime()
}
return highwaterTime
}
func (jr *jobRecord) GetFinishedTime() time.Time { return time.UnixMicro(jr.payload.FinishedMicros) }
func (jr *jobRecord) GetStatus() string { return jr.status }
func (jr *jobRecord) GetError() string { return jr.payload.Error }

func getJobRecord(db *gosql.DB, jobID int) (*jobRecord, error) {
var (
jr jobRecord
payloadBytes []byte
progressBytes []byte
)
if err := db.QueryRow(
`SELECT status, payload, progress FROM crdb_internal.system_jobs WHERE id = $1`, jobID,
).Scan(&jr.status, &payloadBytes, &progressBytes); err != nil {
return nil, err
}

if err := protoutil.Unmarshal(payloadBytes, &jr.payload); err != nil {
return nil, err
}
if err := protoutil.Unmarshal(progressBytes, &jr.progress); err != nil {
return nil, err
}
return &jr, nil
}

func getJobProgress(t test.Test, db *sqlutils.SQLRunner, jobID jobspb.JobID) *jobspb.Progress {
ret := &jobspb.Progress{}
var buf []byte
Expand Down
Loading

0 comments on commit b56e743

Please sign in to comment.