Skip to content

Commit

Permalink
c2c: clean up stream ingestion completion on resume
Browse files Browse the repository at this point in the history
This patch ensures that the stream ingestion job completes properly on
resumption after the user established a cutover timestamp while the job was
paused.

Fixes #102158

Release note: none
  • Loading branch information
msbutler committed Apr 27, 2023
1 parent a2f31ca commit d8b59bd
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 83 deletions.
8 changes: 5 additions & 3 deletions pkg/ccl/streamingccl/replicationtestutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,16 +170,18 @@ func (c *TenantStreamingClusters) WaitUntilHighWatermark(
// Cutover sets the cutover timestamp on the replication job causing the job to
// stop eventually.
func (c *TenantStreamingClusters) Cutover(
producerJobID, ingestionJobID int, cutoverTime time.Time,
producerJobID, ingestionJobID int, cutoverTime time.Time, async bool,
) {
// Cut over the ingestion job and the job will stop eventually.
var cutoverStr string
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
c.Args.DestTenantName, cutoverTime).Scan(&cutoverStr)
cutoverOutput := DecimalTimeToHLC(c.T, cutoverStr)
require.Equal(c.T, cutoverTime, cutoverOutput.GoTime())
jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
jobutils.WaitForJobToSucceed(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
if !async {
jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
jobutils.WaitForJobToSucceed(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
}
}

// StartStreamReplication producer job ID and ingestion job ID.
Expand Down
89 changes: 76 additions & 13 deletions pkg/ccl/streamingccl/streamingest/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/cockroach/pkg/testutils/jobutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -73,6 +74,17 @@ import (
// Executes the specified SQL query as the specified tenant, and prints the
// results.
//
// - job as=<source-system | destination-system > args
//
// Takes some action on the replication job. Some arguments:
//
// - wait-for-state=<succeeded|paused|failed|reverting|cancelled>: wait for
// the job referenced by the tag to reach the specified state.
//
// - pause: pauses the job.
//
// - resume: resumes the job.
//
// - skip issue-num=N
// Skips the test.
func TestDataDriven(t *testing.T) {
Expand Down Expand Up @@ -156,25 +168,20 @@ func TestDataDriven(t *testing.T) {
if ok {
cutoverTime = varValue
}
var async bool
if d.HasArg("async") {
async = true
}
timestamp, _, err := tree.ParseDTimestamp(nil, cutoverTime, time.Microsecond)
require.NoError(t, err)
ds.replicationClusters.Cutover(ds.producerJobID, ds.replicationJobID, timestamp.Time)
ds.replicationClusters.Cutover(ds.producerJobID, ds.replicationJobID, timestamp.Time, async)
return ""

case "exec-sql":
var as string
d.ScanArgs(t, "as", &as)
switch as {
case "source-system":
ds.replicationClusters.SrcSysSQL.Exec(t, d.Input)
case "source-tenant":
ds.replicationClusters.SrcTenantSQL.Exec(t, d.Input)
case "destination-system":
ds.replicationClusters.DestSysSQL.Exec(t, d.Input)
case "destination-tenant":
ds.replicationClusters.DestTenantSQL.Exec(t, d.Input)
default:
t.Fatalf("unsupported value to run SQL query as: %s", as)
}
ds.execAs(t, as, d.Input)
return ""

case "query-sql":
var as string
Expand Down Expand Up @@ -224,6 +231,47 @@ func TestDataDriven(t *testing.T) {
time.Sleep(time.Duration(ms) * time.Millisecond)
return ""

case "job":
var (
as string
jobID int
runner *sqlutils.SQLRunner
)
d.ScanArgs(t, "as", &as)
if as == "source-system" {
jobID = ds.producerJobID
runner = ds.replicationClusters.SrcSysSQL
} else if as == "destination-system" {
jobID = ds.replicationJobID
runner = ds.replicationClusters.DestSysSQL
} else {
t.Fatalf("job cmd only works on consumer and producer jobs run on system tenant")
}
if d.HasArg("pause") {
ds.execAs(t, as, fmt.Sprintf(`PAUSE JOB %d`, jobID))
} else if d.HasArg("resume") {
ds.execAs(t, as, fmt.Sprintf(`RESUME JOB %d`, jobID))
} else if d.HasArg("wait-for-state") {
var state string
d.ScanArgs(t, "wait-for-state", &state)
jobPBID := jobspb.JobID(jobID)
switch state {
case "succeeded":
jobutils.WaitForJobToSucceed(t, runner, jobPBID)
case "cancelled":
jobutils.WaitForJobToCancel(t, runner, jobPBID)
case "paused":
jobutils.WaitForJobToPause(t, runner, jobPBID)
case "failed":
jobutils.WaitForJobToFail(t, runner, jobPBID)
case "reverting":
jobutils.WaitForJobReverting(t, runner, jobPBID)
default:
t.Fatalf("unknown state %s", state)
}
}
return ""

default:
t.Fatalf("unsupported instruction: %s", d.Cmd)
}
Expand Down Expand Up @@ -265,6 +313,21 @@ func (d *datadrivenTestState) queryAs(t *testing.T, as, query string) string {
return output
}

func (d *datadrivenTestState) execAs(t *testing.T, as, query string) {
switch as {
case "source-system":
d.replicationClusters.SrcSysSQL.Exec(t, query)
case "source-tenant":
d.replicationClusters.SrcTenantSQL.Exec(t, query)
case "destination-system":
d.replicationClusters.DestSysSQL.Exec(t, query)
case "destination-tenant":
d.replicationClusters.DestTenantSQL.Exec(t, query)
default:
t.Fatalf("unsupported value to run SQL query as: %s", as)
}
}

func newDatadrivenTestState() datadrivenTestState {
return datadrivenTestState{
cleanupFns: make([]func() error, 0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func TestTenantStreamingPauseOnPermanentJobError(t *testing.T) {

// Check dest has caught up the previous updates.
srcTime := c.SrcCluster.Server(0).Clock().Now()
c.Cutover(producerJobID, ingestionJobID, srcTime.GoTime())
c.Cutover(producerJobID, ingestionJobID, srcTime.GoTime(), false)
c.RequireFingerprintMatchAtTimestamp(srcTime.AsOfSystemTime())

// Ingestion happened one more time after resuming the ingestion job.
Expand Down Expand Up @@ -285,7 +285,7 @@ func TestTenantStreamingCheckpoint(t *testing.T) {

cutoverTime := c.DestSysServer.Clock().Now()
c.WaitUntilHighWatermark(cutoverTime, jobspb.JobID(ingestionJobID))
c.Cutover(producerJobID, ingestionJobID, cutoverTime.GoTime())
c.Cutover(producerJobID, ingestionJobID, cutoverTime.GoTime(), false)
cutoverFingerprint := c.RequireFingerprintMatchAtTimestamp(cutoverTime.AsOfSystemTime())

// Clients should never be started prior to a checkpointed timestamp
Expand Down Expand Up @@ -673,7 +673,7 @@ func TestTenantStreamingMultipleNodes(t *testing.T) {
c.WaitUntilStartTimeReached(jobspb.JobID(ingestionJobID))

cutoverTime := c.DestSysServer.Clock().Now()
c.Cutover(producerJobID, ingestionJobID, cutoverTime.GoTime())
c.Cutover(producerJobID, ingestionJobID, cutoverTime.GoTime(), false)
c.RequireFingerprintMatchAtTimestamp(cutoverTime.AsOfSystemTime())

// Since the data was distributed across multiple nodes, multiple nodes should've been connected to
Expand Down Expand Up @@ -797,7 +797,7 @@ func TestTenantReplicationProtectedTimestampManagement(t *testing.T) {
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(replicationJobID))
var cutoverTime time.Time
c.DestSysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime)
c.Cutover(producerJobID, replicationJobID, cutoverTime)
c.Cutover(producerJobID, replicationJobID, cutoverTime, false)
}

// Set GC TTL low, so that the GC job completes quickly in the test.
Expand Down
15 changes: 11 additions & 4 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,7 @@ import (
)

func startDistIngestion(
ctx context.Context,
execCtx sql.JobExecContext,
ingestionJob *jobs.Job,
client streamclient.Client,
ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.Job,
) error {

details := ingestionJob.Details().(jobspb.StreamIngestionDetails)
Expand All @@ -58,6 +55,16 @@ func startDistIngestion(
streamID := streampb.StreamID(details.StreamID)
updateRunningStatus(ctx, execCtx, ingestionJob, jobspb.InitializingReplication,
fmt.Sprintf("connecting to the producer job %d and resuming a stream replication plan", streamID))

client, err := connectToActiveClient(ctx, ingestionJob, execCtx.ExecCfg().InternalDB)
if err != nil {
return err
}
defer func() {
if err := client.Close(ctx); err != nil {
log.Warningf(ctx, "stream ingestion client did not shut down properly: %s", err.Error())
}
}()
if err := waitUntilProducerActive(ctx, client, streamID, heartbeatTimestamp, ingestionJob.ID()); err != nil {
return err
}
Expand Down
104 changes: 46 additions & 58 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func completeStreamIngestion(
ingestionJobID jobspb.JobID,
cutoverTimestamp hlc.Timestamp,
) error {
log.Infof(ctx, "adding cutover time %s to job record", cutoverTimestamp)
if err := jobRegistry.UpdateJobWithTxn(ctx, ingestionJobID, txn, false,
func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
progress := md.Progress.GetStreamIngest()
Expand Down Expand Up @@ -191,17 +192,8 @@ func updateRunningStatusInternal(
}

func completeIngestion(
ctx context.Context,
execCtx sql.JobExecContext,
ingestionJob *jobs.Job,
client streamclient.Client,
ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.Job,
) error {
log.Infof(ctx,
"reverting to the specified cutover timestamp for stream ingestion job %d",
ingestionJob.ID())
if err := revertToCutoverTimestamp(ctx, execCtx, ingestionJob); err != nil {
return err
}
details := ingestionJob.Details().(jobspb.StreamIngestionDetails)
log.Infof(ctx, "activating destination tenant %d", details.DestinationTenantID)
if err := activateTenant(ctx, execCtx, details.DestinationTenantID); err != nil {
Expand All @@ -212,17 +204,8 @@ func completeIngestion(
log.Infof(ctx, "completing the producer job %d", streamID)
updateRunningStatus(ctx, execCtx, ingestionJob, jobspb.ReplicationCuttingOver,
"completing the producer job in the source cluster")
// Completes the producer job in the source cluster on best effort. In a real
// disaster recovery scenario, who knows what state the source cluster will be
// in; thus, we should not fail the cutover step on the consumer side if we
// cannot complete the producer job.
if err := contextutil.RunWithTimeout(ctx, "complete producer job", 30*time.Second,
func(ctx context.Context) error {
return client.Complete(ctx, streampb.StreamID(streamID), true /* successfulIngestion */)
},
); err != nil {
log.Warningf(ctx, "encountered error when completing the source cluster producer job %d: %s", streamID, err.Error())
}
completeProducerJob(ctx, ingestionJob, execCtx.ExecCfg().InternalDB, true)

// Now that we have completed the cutover we can release the protected
// timestamp record on the destination tenant's keyspace.
if details.ProtectedTimestampRecordID != nil {
Expand All @@ -239,6 +222,34 @@ func completeIngestion(
}
return nil
}

// completeProducerJob on the source cluster is best effort. In a real
// disaster recovery scenario, who knows what state the source cluster will be
// in; thus, we should not fail the cutover step on the consumer side if we
// cannot complete the producer job.
func completeProducerJob(
ctx context.Context, ingestionJob *jobs.Job, internalDB *sql.InternalDB, successfulIngestion bool,
) {
streamID := ingestionJob.Details().(jobspb.StreamIngestionDetails).StreamID
if err := contextutil.RunWithTimeout(ctx, "complete producer job", 30*time.Second,
func(ctx context.Context) error {
client, err := connectToActiveClient(ctx, ingestionJob, internalDB)
if err != nil {
return err
}
defer func() {
if err := client.Close(ctx); err != nil {
log.Warningf(ctx, "error encountered when closing stream client: %s",
err.Error())
}
}()
return client.Complete(ctx, streampb.StreamID(streamID), successfulIngestion)
},
); err != nil {
log.Warningf(ctx, `encountered error when completing the source cluster producer job %d: %s`, streamID, err.Error())
}
}

func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.Job) error {
// Cutover should be the *first* thing checked upon resumption as it is the
// most critical task in disaster recovery.
Expand All @@ -248,30 +259,24 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.
}
if reverted {
log.Infof(ctx, "job completed cutover on resume")
return nil
return completeIngestion(ctx, execCtx, ingestionJob)
}
if knobs := execCtx.ExecCfg().StreamingTestingKnobs; knobs != nil && knobs.BeforeIngestionStart != nil {
if err := knobs.BeforeIngestionStart(ctx); err != nil {
return err
}
}
client, err := connectToActiveClient(ctx, ingestionJob, execCtx.ExecCfg().InternalDB)
if err != nil {
return err
}
defer func() {
if err := client.Close(ctx); err != nil {
log.Warningf(ctx, "stream ingestion client did not shut down properly: %s", err.Error())
}
}()
if err = startDistIngestion(ctx, execCtx, ingestionJob, client); err != nil {
return err
}
// A nil error is only possible if the job was signaled to cutover and the
// processors shut down gracefully, i.e stopped ingesting any additional
// events from the replication stream. At this point it is safe to revert to
// the cutoff time to leave the cluster in a consistent state.
return completeIngestion(ctx, execCtx, ingestionJob, client)
if err = startDistIngestion(ctx, execCtx, ingestionJob); err != nil {
return err
}
if err := revertToCutoverTimestamp(ctx, execCtx, ingestionJob); err != nil {
return err
}
return completeIngestion(ctx, execCtx, ingestionJob)
}

func ingestWithRetries(
Expand Down Expand Up @@ -453,6 +458,7 @@ func cutoverTimeIsEligibleForCutover(
func maybeRevertToCutoverTimestamp(
ctx context.Context, p sql.JobExecContext, ingestionJob *jobs.Job,
) (bool, error) {

ctx, span := tracing.ChildSpan(ctx, "streamingest.revertToCutoverTimestamp")
defer span.Finish()

Expand Down Expand Up @@ -500,7 +506,9 @@ func maybeRevertToCutoverTimestamp(
if !shouldRevertToCutover {
return false, nil
}

log.Infof(ctx,
"reverting to cutover timestamp %s for stream ingestion job %d",
cutoverTimestamp, ingestionJob.ID())
if p.ExecCfg().StreamingTestingKnobs != nil && p.ExecCfg().StreamingTestingKnobs.AfterCutoverStarted != nil {
p.ExecCfg().StreamingTestingKnobs.AfterCutoverStarted()
}
Expand Down Expand Up @@ -548,26 +556,6 @@ func activateTenant(ctx context.Context, execCtx interface{}, newTenantID roachp
})
}

func (s *streamIngestionResumer) cancelProducerJob(
ctx context.Context, details jobspb.StreamIngestionDetails, db isql.DB,
) {
streamID := streampb.StreamID(details.StreamID)
addr := streamingccl.StreamAddress(details.StreamAddress)
client, err := streamclient.NewStreamClient(ctx, addr, db)
if err != nil {
log.Warningf(ctx, "encountered error when creating the stream client: %v", err)
return
}
log.Infof(ctx, "canceling the producer job %d as stream ingestion job %d is being canceled",
streamID, s.job.ID())
if err = client.Complete(ctx, streamID, false /* successfulIngestion */); err != nil {
log.Warningf(ctx, "encountered error when canceling the producer job: %v", err)
}
if err = client.Close(ctx); err != nil {
log.Warningf(ctx, "encountered error when closing the stream client: %v", err)
}
}

// OnFailOrCancel is part of the jobs.Resumer interface.
// There is a know race between the ingestion processors shutting down, and
// OnFailOrCancel being invoked. As a result of which we might see some keys
Expand All @@ -581,9 +569,9 @@ func (s *streamIngestionResumer) OnFailOrCancel(
// longer needed as this ingestion job is in 'reverting' status and we won't resume
// ingestion anymore.
jobExecCtx := execCtx.(sql.JobExecContext)
details := s.job.Details().(jobspb.StreamIngestionDetails)
s.cancelProducerJob(ctx, details, jobExecCtx.ExecCfg().InternalDB)
completeProducerJob(ctx, s.job, jobExecCtx.ExecCfg().InternalDB, false)

details := s.job.Details().(jobspb.StreamIngestionDetails)
execCfg := jobExecCtx.ExecCfg()
return execCfg.InternalDB.Txn(ctx, func(
ctx context.Context, txn isql.Txn,
Expand Down
Loading

0 comments on commit d8b59bd

Please sign in to comment.