Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
102189: c2c: clean up stream ingestion completion on resume r=lidorcarmel a=msbutler

This patch ensures that the stream ingestion job completes properly on
resumption after the user established a cutover timestamp while the job was
paused.

Fixes #102158

Release note: none

102484: logictestccl: add missing retries to multi_region_remote_access_error test r=rharding6373 a=msirek

PR #102103 added 2 tests to multi_region_remote_access_error which require retries in case zone configs did not propagate correctly.

This adds the retries.

Fixes #102406
Fixes #102292

Release note: None

Co-authored-by: Michael Butler <butler@cockroachlabs.com>
Co-authored-by: Mark Sirek <sirek@cockroachlabs.com>
  • Loading branch information
3 people committed Apr 27, 2023
3 parents 5d5a309 + d8b59bd + d482c2d commit 79f42d2
Show file tree
Hide file tree
Showing 9 changed files with 217 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,14 @@ SELECT
f_random_zip_code();

# A local lookup join with a CTE as input should succeed.
query IT
query IT retry
WITH vtab AS (SELECT * FROM messages_rbt)
SELECT vtab.account_id, vtab.message FROM vtab
INNER LOOKUP JOIN messages_rbr rbr on vtab.account_id = rbr.account_id and rbr.crdb_region = 'ap-southeast-2'
----

# A CTE built from an INSERT which must access remote rows should fail.
retry
statement error pq: Query has no home region\. Try adding a filter on parent\.crdb_region and/or on key column \(parent\.p_id\)\. For more information, see https://www.cockroachlabs.com/docs/stable/cost-based-optimizer.html#control-whether-queries-are-limited-to-a-single-region
WITH vtab AS (INSERT INTO parent VALUES (6) RETURNING p_id)
SELECT * FROM vtab
Expand Down
8 changes: 5 additions & 3 deletions pkg/ccl/streamingccl/replicationtestutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,16 +170,18 @@ func (c *TenantStreamingClusters) WaitUntilHighWatermark(
// Cutover sets the cutover timestamp on the replication job causing the job to
// stop eventually.
func (c *TenantStreamingClusters) Cutover(
producerJobID, ingestionJobID int, cutoverTime time.Time,
producerJobID, ingestionJobID int, cutoverTime time.Time, async bool,
) {
// Cut over the ingestion job and the job will stop eventually.
var cutoverStr string
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
c.Args.DestTenantName, cutoverTime).Scan(&cutoverStr)
cutoverOutput := DecimalTimeToHLC(c.T, cutoverStr)
require.Equal(c.T, cutoverTime, cutoverOutput.GoTime())
jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
jobutils.WaitForJobToSucceed(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
if !async {
jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
jobutils.WaitForJobToSucceed(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
}
}

// StartStreamReplication producer job ID and ingestion job ID.
Expand Down
89 changes: 76 additions & 13 deletions pkg/ccl/streamingccl/streamingest/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -73,6 +74,17 @@ import (
// Executes the specified SQL query as the specified tenant, and prints the
// results.
//
// - job as=<source-system | destination-system > args
//
// Takes some action on the replication job. Some arguments:
//
// - wait-for-state=<succeeded|paused|failed|reverting|cancelled>: wait for
// the job referenced by the tag to reach the specified state.
//
// - pause: pauses the job.
//
// - resume: resumes the job.
//
// - skip issue-num=N
// Skips the test.
func TestDataDriven(t *testing.T) {
Expand Down Expand Up @@ -156,25 +168,20 @@ func TestDataDriven(t *testing.T) {
if ok {
cutoverTime = varValue
}
var async bool
if d.HasArg("async") {
async = true
}
timestamp, _, err := tree.ParseDTimestamp(nil, cutoverTime, time.Microsecond)
require.NoError(t, err)
ds.replicationClusters.Cutover(ds.producerJobID, ds.replicationJobID, timestamp.Time)
ds.replicationClusters.Cutover(ds.producerJobID, ds.replicationJobID, timestamp.Time, async)
return ""

case "exec-sql":
var as string
d.ScanArgs(t, "as", &as)
switch as {
case "source-system":
ds.replicationClusters.SrcSysSQL.Exec(t, d.Input)
case "source-tenant":
ds.replicationClusters.SrcTenantSQL.Exec(t, d.Input)
case "destination-system":
ds.replicationClusters.DestSysSQL.Exec(t, d.Input)
case "destination-tenant":
ds.replicationClusters.DestTenantSQL.Exec(t, d.Input)
default:
t.Fatalf("unsupported value to run SQL query as: %s", as)
}
ds.execAs(t, as, d.Input)
return ""

case "query-sql":
var as string
Expand Down Expand Up @@ -224,6 +231,47 @@ func TestDataDriven(t *testing.T) {
time.Sleep(time.Duration(ms) * time.Millisecond)
return ""

case "job":
var (
as string
jobID int
runner *sqlutils.SQLRunner
)
d.ScanArgs(t, "as", &as)
if as == "source-system" {
jobID = ds.producerJobID
runner = ds.replicationClusters.SrcSysSQL
} else if as == "destination-system" {
jobID = ds.replicationJobID
runner = ds.replicationClusters.DestSysSQL
} else {
t.Fatalf("job cmd only works on consumer and producer jobs run on system tenant")
}
if d.HasArg("pause") {
ds.execAs(t, as, fmt.Sprintf(`PAUSE JOB %d`, jobID))
} else if d.HasArg("resume") {
ds.execAs(t, as, fmt.Sprintf(`RESUME JOB %d`, jobID))
} else if d.HasArg("wait-for-state") {
var state string
d.ScanArgs(t, "wait-for-state", &state)
jobPBID := jobspb.JobID(jobID)
switch state {
case "succeeded":
jobutils.WaitForJobToSucceed(t, runner, jobPBID)
case "cancelled":
jobutils.WaitForJobToCancel(t, runner, jobPBID)
case "paused":
jobutils.WaitForJobToPause(t, runner, jobPBID)
case "failed":
jobutils.WaitForJobToFail(t, runner, jobPBID)
case "reverting":
jobutils.WaitForJobReverting(t, runner, jobPBID)
default:
t.Fatalf("unknown state %s", state)
}
}
return ""

default:
t.Fatalf("unsupported instruction: %s", d.Cmd)
}
Expand Down Expand Up @@ -265,6 +313,21 @@ func (d *datadrivenTestState) queryAs(t *testing.T, as, query string) string {
return output
}

func (d *datadrivenTestState) execAs(t *testing.T, as, query string) {
switch as {
case "source-system":
d.replicationClusters.SrcSysSQL.Exec(t, query)
case "source-tenant":
d.replicationClusters.SrcTenantSQL.Exec(t, query)
case "destination-system":
d.replicationClusters.DestSysSQL.Exec(t, query)
case "destination-tenant":
d.replicationClusters.DestTenantSQL.Exec(t, query)
default:
t.Fatalf("unsupported value to run SQL query as: %s", as)
}
}

func newDatadrivenTestState() datadrivenTestState {
return datadrivenTestState{
cleanupFns: make([]func() error, 0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func TestTenantStreamingPauseOnPermanentJobError(t *testing.T) {

// Check dest has caught up the previous updates.
srcTime := c.SrcCluster.Server(0).Clock().Now()
c.Cutover(producerJobID, ingestionJobID, srcTime.GoTime())
c.Cutover(producerJobID, ingestionJobID, srcTime.GoTime(), false)
c.RequireFingerprintMatchAtTimestamp(srcTime.AsOfSystemTime())

// Ingestion happened one more time after resuming the ingestion job.
Expand Down Expand Up @@ -285,7 +285,7 @@ func TestTenantStreamingCheckpoint(t *testing.T) {

cutoverTime := c.DestSysServer.Clock().Now()
c.WaitUntilHighWatermark(cutoverTime, jobspb.JobID(ingestionJobID))
c.Cutover(producerJobID, ingestionJobID, cutoverTime.GoTime())
c.Cutover(producerJobID, ingestionJobID, cutoverTime.GoTime(), false)
cutoverFingerprint := c.RequireFingerprintMatchAtTimestamp(cutoverTime.AsOfSystemTime())

// Clients should never be started prior to a checkpointed timestamp
Expand Down Expand Up @@ -673,7 +673,7 @@ func TestTenantStreamingMultipleNodes(t *testing.T) {
c.WaitUntilStartTimeReached(jobspb.JobID(ingestionJobID))

cutoverTime := c.DestSysServer.Clock().Now()
c.Cutover(producerJobID, ingestionJobID, cutoverTime.GoTime())
c.Cutover(producerJobID, ingestionJobID, cutoverTime.GoTime(), false)
c.RequireFingerprintMatchAtTimestamp(cutoverTime.AsOfSystemTime())

// Since the data was distributed across multiple nodes, multiple nodes should've been connected to
Expand Down Expand Up @@ -797,7 +797,7 @@ func TestTenantReplicationProtectedTimestampManagement(t *testing.T) {
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(replicationJobID))
var cutoverTime time.Time
c.DestSysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime)
c.Cutover(producerJobID, replicationJobID, cutoverTime)
c.Cutover(producerJobID, replicationJobID, cutoverTime, false)
}

// Set GC TTL low, so that the GC job completes quickly in the test.
Expand Down
15 changes: 11 additions & 4 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ import (
)

func startDistIngestion(
ctx context.Context,
execCtx sql.JobExecContext,
ingestionJob *jobs.Job,
client streamclient.Client,
ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.Job,
) error {

details := ingestionJob.Details().(jobspb.StreamIngestionDetails)
Expand All @@ -58,6 +55,16 @@ func startDistIngestion(
streamID := streampb.StreamID(details.StreamID)
updateRunningStatus(ctx, execCtx, ingestionJob, jobspb.InitializingReplication,
fmt.Sprintf("connecting to the producer job %d and resuming a stream replication plan", streamID))

client, err := connectToActiveClient(ctx, ingestionJob, execCtx.ExecCfg().InternalDB)
if err != nil {
return err
}
defer func() {
if err := client.Close(ctx); err != nil {
log.Warningf(ctx, "stream ingestion client did not shut down properly: %s", err.Error())
}
}()
if err := waitUntilProducerActive(ctx, client, streamID, heartbeatTimestamp, ingestionJob.ID()); err != nil {
return err
}
Expand Down
Loading

0 comments on commit 79f42d2

Please sign in to comment.