diff --git a/pkg/ccl/changefeedccl/cdctest/validator.go b/pkg/ccl/changefeedccl/cdctest/validator.go index bef55c6c0302..250ed5e41d15 100644 --- a/pkg/ccl/changefeedccl/cdctest/validator.go +++ b/pkg/ccl/changefeedccl/cdctest/validator.go @@ -77,7 +77,7 @@ func NewOrderValidator(topic string) Validator { } } -// NewStreamOrderValidator wraps and orderValidator as described above, and +// NewStreamOrderValidator wraps an orderValidator as described above, and // exposes additional methods for introspection. func NewStreamOrderValidator() StreamValidator { return &orderValidator{ diff --git a/pkg/ccl/streamingccl/streamingest/BUILD.bazel b/pkg/ccl/streamingccl/streamingest/BUILD.bazel index 04ca6fde0896..ed39d819a300 100644 --- a/pkg/ccl/streamingccl/streamingest/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamingest/BUILD.bazel @@ -36,6 +36,7 @@ go_library( "//pkg/sql/sem/tree", "//pkg/sql/types", "//pkg/storage", + "//pkg/util/ctxgroup", "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/protoutil", @@ -67,6 +68,7 @@ go_test( "//pkg/ccl/storageccl", "//pkg/ccl/streamingccl", "//pkg/ccl/streamingccl/streamclient", + "//pkg/ccl/streamingccl/streamingutils", "//pkg/ccl/utilccl", "//pkg/jobs", "//pkg/jobs/jobspb", diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index a586a8a8e7ee..0f1c81607259 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -10,6 +10,7 @@ package streamingest import ( "context" + "time" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" @@ -19,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/errors" ) @@ -70,23 +72,89 @@ func ingest( } // Plan and run the DistSQL flow. - err = distStreamIngest(ctx, execCtx, nodes, jobID, planCtx, dsp, streamIngestionSpecs, + return distStreamIngest(ctx, execCtx, nodes, jobID, planCtx, dsp, streamIngestionSpecs, streamIngestionFrontierSpec) - if err != nil { - return err - } +} - return nil +// checkForCutoverSignal periodically loads the job progress to check for the +// sentinel value that signals the ingestion job to complete. +func (s *streamIngestionResumer) checkForCutoverSignal( + ctx context.Context, stopPoller chan struct{}, registry *jobs.Registry, cancelIngestionCtx func(), +) error { + tick := time.NewTicker(time.Second * 10) + defer tick.Stop() + for { + select { + case <-stopPoller: + return nil + case <-ctx.Done(): + return ctx.Err() + case <-tick.C: + j, err := registry.LoadJob(ctx, *s.job.ID()) + if err != nil { + return err + } + progress := j.Progress() + var sp *jobspb.Progress_StreamIngest + var ok bool + if sp, ok = progress.GetDetails().(*jobspb.Progress_StreamIngest); !ok { + return errors.Newf("unknown progress type %T in stream ingestion job %d", + j.Progress().Progress, *s.job.ID()) + } + // Job has been signaled to complete. + if !sp.StreamIngest.CutoverTime.IsEmpty() { + // Sanity check that the requested cutover time is less than equal to + // the resolved ts recorded in the job progress. This should already + // have been enforced when the cutover was signaled via the builtin. + // TODO(adityamaru): Remove this when we allow users to specify a + // cutover time in the future. + resolvedTimestamp := progress.GetHighWater() + if resolvedTimestamp == nil { + return errors.Newf("cutover has been requested before job %d has had a chance to"+ + " record a resolved ts", *s.job.ID()) + } + if resolvedTimestamp.Less(sp.StreamIngest.CutoverTime) { + return errors.Newf("requested cutover time %s is before the resolved time %s recorded"+ + " in job %d", sp.StreamIngest.CutoverTime.String(), resolvedTimestamp.String(), + *s.job.ID()) + } + cancelIngestionCtx() + return nil + } + } + } } // Resume is part of the jobs.Resumer interface. -func (s *streamIngestionResumer) Resume(ctx context.Context, execCtx interface{}) error { +func (s *streamIngestionResumer) Resume(resumeCtx context.Context, execCtx interface{}) error { details := s.job.Details().(jobspb.StreamIngestionDetails) p := execCtx.(sql.JobExecContext) - err := ingest(ctx, p, details.StreamAddress, s.job.Progress(), - *s.job.ID()) - if err != nil { + // ingestCtx is used to plan and run the DistSQL flow. + ingestCtx, cancelIngest := context.WithCancel(resumeCtx) + g := ctxgroup.WithContext(ingestCtx) + + // Start a poller to check if the job has been requested to cutover. + stopPoller := make(chan struct{}) + g.GoCtx(func(ctx context.Context) error { + return s.checkForCutoverSignal(ctx, stopPoller, p.ExecCfg().JobRegistry, cancelIngest) + }) + + g.GoCtx(func(ctx context.Context) error { + defer close(stopPoller) + return ingest(ctx, p, details.StreamAddress, s.job.Progress(), *s.job.ID()) + }) + + if err := g.Wait(); err != nil { + // Check if the ingestCtx has been canceled while the resumeCtx does not + // have an error set on it. This is only possible if the resumer observed a + // cutover and explicitly requested a teardown via the ingestCtx, in which + // case we should revert the data to the cutover time to get the cluster + // into a consistent state. + // In all other cases we should treat the context cancellation as an error. + if errors.Is(err, context.Canceled) && resumeCtx.Err() == nil { + return s.revertToLatestResolvedTimestamp(resumeCtx, execCtx) + } return err } @@ -97,39 +165,72 @@ func (s *streamIngestionResumer) Resume(ctx context.Context, execCtx interface{} return nil } -// OnFailOrCancel is part of the jobs.Resumer interface. -func (s *streamIngestionResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error { +// revertToLatestResolvedTimestamp reads the job progress for the cutover time +// and issues a RevertRangeRequest with the target time set to that cutover +// time, to bring the ingesting cluster to a consistent state. +func (s *streamIngestionResumer) revertToLatestResolvedTimestamp( + ctx context.Context, execCtx interface{}, +) error { p := execCtx.(sql.JobExecContext) db := p.ExecCfg().DB - details := s.job.Details().(jobspb.StreamIngestionDetails) - - resolvedTime := details.StartTime - prog := s.job.Progress() - if highWatermark := prog.GetHighWater(); highWatermark != nil { - if highWatermark.Less(resolvedTime) { - return errors.Newf("progress timestamp %+v cannot be older than start time %+v", - highWatermark, resolvedTime) - } - resolvedTime = *highWatermark + j, err := p.ExecCfg().JobRegistry.LoadJob(ctx, *s.job.ID()) + if err != nil { + return err + } + details := j.Details() + var sd jobspb.StreamIngestionDetails + var ok bool + if sd, ok = details.(jobspb.StreamIngestionDetails); !ok { + return errors.Newf("unknown details type %T in stream ingestion job %d", + details, *s.job.ID()) + } + progress := j.Progress() + var sp *jobspb.Progress_StreamIngest + if sp, ok = progress.GetDetails().(*jobspb.Progress_StreamIngest); !ok { + return errors.Newf("unknown progress type %T in stream ingestion job %d", + j.Progress().Progress, *s.job.ID()) } - // TODO(adityamaru): If the job progress was not set then we should - // probably ClearRange. Take this into account when writing the ClearRange - // OnFailOrCancel(). - if resolvedTime.IsEmpty() { - return nil + if sp.StreamIngest.CutoverTime.IsEmpty() { + return errors.New("cutover time is unexpectedly empty, cannot revert to a consistent state") } var b kv.Batch b.AddRawRequest(&roachpb.RevertRangeRequest{ RequestHeader: roachpb.RequestHeader{ - Key: details.Span.Key, - EndKey: details.Span.EndKey, + Key: sd.Span.Key, + EndKey: sd.Span.EndKey, }, - TargetTime: resolvedTime, + TargetTime: sp.StreamIngest.CutoverTime, EnableTimeBoundIteratorOptimization: true, }) b.Header.MaxSpanRequestKeys = sql.RevertTableDefaultBatchSize + + return db.Run(ctx, &b) +} + +// OnFailOrCancel is part of the jobs.Resumer interface. +func (s *streamIngestionResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error { + p := execCtx.(sql.JobExecContext) + db := p.ExecCfg().DB + j, err := p.ExecCfg().JobRegistry.LoadJob(ctx, *s.job.ID()) + if err != nil { + return err + } + details := j.Details() + var sd jobspb.StreamIngestionDetails + var ok bool + if sd, ok = details.(jobspb.StreamIngestionDetails); !ok { + return errors.Newf("unknown details type %T in stream ingestion job %d", + details, *s.job.ID()) + } + var b kv.Batch + b.AddRawRequest(&roachpb.ClearRangeRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: sd.Span.Key, + EndKey: sd.Span.EndKey, + }, + }) return db.Run(ctx, &b) } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go index 9560d657859c..60cc0fe36291 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go @@ -14,6 +14,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + _ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingutils" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go index 66f85d560d30..fd106a0362e7 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go @@ -10,7 +10,9 @@ package streamingest import ( "context" + gosql "database/sql" "fmt" + "strconv" "testing" "time" @@ -35,14 +37,29 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) +func getHighWaterMark(jobID int, sqlDB *gosql.DB) (*hlc.Timestamp, error) { + var progressBytes []byte + if err := sqlDB.QueryRow( + `SELECT progress FROM system.jobs WHERE id = $1`, jobID, + ).Scan(&progressBytes); err != nil { + return nil, err + } + var payload jobspb.Progress + if err := protoutil.Unmarshal(progressBytes, &payload); err != nil { + return nil, err + } + return payload.GetHighWater(), nil +} + // TestStreamIngestionJobWithRandomClient creates a stream ingestion job that is // fed KVs from the random stream client. After receiving a certain number of -// resolved timestamp events the test cancels the job to tear down the flow, and -// rollback to the latest resolved frontier timestamp. +// resolved timestamp events the test completes the job to tear down the flow, +// and rollback to the latest resolved frontier timestamp. // The test scans the KV store to compare all MVCC KVs against the relevant // streamed KV Events, thereby ensuring that we end up in a consistent state. func TestStreamIngestionJobWithRandomClient(t *testing.T) { @@ -55,17 +72,17 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) { ctx := context.Background() defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond) - cancelJobCh := make(chan struct{}) + canBeCompletedCh := make(chan struct{}) threshold := 10 mu := syncutil.Mutex{} - cancelJobAfterCheckpoints := makeCheckpointEventCounter(&mu, threshold, func() { - cancelJobCh <- struct{}{} + completeJobAfterCheckpoints := makeCheckpointEventCounter(&mu, threshold, func() { + canBeCompletedCh <- struct{}{} }) streamValidator := newStreamClientValidator() registerValidator := registerValidatorWithClient(streamValidator) knobs := base.TestingKnobs{ DistSQL: &execinfra.TestingKnobs{StreamIngestionTestingKnobs: &sql.StreamIngestionTestingKnobs{ - Interceptors: []func(event streamingccl.Event, pa streamingccl.PartitionAddress){cancelJobAfterCheckpoints, + Interceptors: []func(event streamingccl.Event, pa streamingccl.PartitionAddress){completeJobAfterCheckpoints, registerValidator}, }, }, @@ -73,9 +90,21 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) { serverArgs := base.TestServerArgs{} serverArgs.Knobs = knobs + var receivedRevertRequest chan struct{} var allowResponse chan struct{} + var revertRangeTargetTime hlc.Timestamp params := base.TestClusterArgs{ServerArgs: serverArgs} params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{ + TestingRequestFilter: func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error { + for _, req := range ba.Requests { + switch r := req.GetInner().(type) { + case *roachpb.RevertRangeRequest: + revertRangeTargetTime = r.TargetTime + receivedRevertRequest <- struct{}{} + } + } + return nil + }, TestingResponseFilter: jobutils.BulkOpResponseFilter(&allowResponse), } @@ -96,6 +125,7 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) { // Start the ingestion stream and wait for at least one AddSSTable to ensure the job is running. allowResponse = make(chan struct{}) + receivedRevertRequest = make(chan struct{}) errCh := make(chan error) defer close(errCh) _, err := conn.Exec(`SET CLUSTER SETTING bulkio.stream_ingestion.minimum_flush_interval= '0.0005ms'`) @@ -112,29 +142,162 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) { } close(allowResponse) - var streamJobID string + var jobID string + var streamJobID int testutils.SucceedsSoon(t, func() error { row := conn.QueryRow("SELECT id FROM system.jobs ORDER BY created DESC LIMIT 1") - return row.Scan(&streamJobID) + return row.Scan(&jobID) }) + streamJobID, err = strconv.Atoi(jobID) + require.NoError(t, err) + + // Wait for the job to signal that it is ready to be cutover. + <-canBeCompletedCh + close(canBeCompletedCh) + + // Ensure that the job has made some progress. + var highwater hlc.Timestamp + testutils.SucceedsSoon(t, func() error { + hw, err := getHighWaterMark(streamJobID, conn) + require.NoError(t, err) + if hw == nil { + return errors.New("highwatermark is unset, no progress has been reported") + } + highwater = *hw + return nil + }) + + // Cutting over the job should shutdown the ingestion processors via a context + // cancellation, and subsequently rollback data above our frontier timestamp. + tz := timeutil.Unix(0, highwater.WallTime).UTC() + _, err = conn.Exec(`SELECT crdb_internal.complete_stream_ingestion_job ($1, $2)`, + streamJobID, tz) + require.NoError(t, err) + + // Wait for the job to issue a revert request. + <-receivedRevertRequest + close(receivedRevertRequest) + require.True(t, !revertRangeTargetTime.IsEmpty()) + require.Equal(t, revertRangeTargetTime, highwater) + + // Expect the job to complete without an error. + if err := <-errCh; err != nil { + t.Fatal(err) + } + + // Wait for the ingestion job to have been marked as succeeded. + testutils.SucceedsSoon(t, func() error { + var status string + sqlDB.QueryRow(t, `SELECT status FROM system.jobs WHERE id = $1`, streamJobID).Scan(&status) + if jobs.Status(status) != jobs.StatusSucceeded { + return errors.New("job not in succeeded state") + } + return nil + }) + + // Check the validator for any failures. + for _, err := range streamValidator.failures() { + t.Fatal(err) + } + + tenantPrefix := keys.MakeTenantPrefix(roachpb.MakeTenantID(uint64(tenantID))) + maxIngestedTS := assertExactlyEqualKVs(t, tc, streamValidator, revertRangeTargetTime, tenantPrefix) + + // Sanity check that the max ts in the store is less than the revert range + // target timestamp. + require.True(t, maxIngestedTS.LessEq(revertRangeTargetTime)) +} + +func TestStreamIngestionJobCancelWithRandomClient(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond) + + canBeCanceledCh := make(chan struct{}) + threshold := 10 + mu := syncutil.Mutex{} + completeJobAfterCheckpoints := makeCheckpointEventCounter(&mu, threshold, func() { + canBeCanceledCh <- struct{}{} + }) + streamValidator := newStreamClientValidator() + registerValidator := registerValidatorWithClient(streamValidator) + knobs := base.TestingKnobs{ + DistSQL: &execinfra.TestingKnobs{StreamIngestionTestingKnobs: &sql.StreamIngestionTestingKnobs{ + Interceptors: []func(event streamingccl.Event, pa streamingccl.PartitionAddress){completeJobAfterCheckpoints, + registerValidator}, + }, + }, + } + serverArgs := base.TestServerArgs{} + serverArgs.Knobs = knobs + + var allowResponse chan struct{} + params := base.TestClusterArgs{ServerArgs: serverArgs} + params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{ + TestingResponseFilter: jobutils.BulkOpResponseFilter(&allowResponse), + } + + numNodes := 3 + tc := testcluster.StartTestCluster(t, numNodes, params) + defer tc.Stopper().Stop(ctx) + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + conn := tc.Conns[0] + + tenantID := 10 + valueRange := 100 + kvsPerResolved := 200 + kvFrequency := 50 * time.Nanosecond + numPartitions := 2 + dupProbability := 0.2 + streamAddr := makeTestStreamURI(valueRange, kvsPerResolved, numPartitions, tenantID, kvFrequency, + dupProbability) + + // Start the ingestion stream and wait for at least one AddSSTable to ensure the job is running. + allowResponse = make(chan struct{}) + errCh := make(chan error) + defer close(errCh) + _, err := conn.Exec(`SET CLUSTER SETTING bulkio.stream_ingestion.minimum_flush_interval= '0.0005ms'`) + require.NoError(t, err) + query := fmt.Sprintf(`RESTORE TENANT 10 FROM REPLICATION STREAM FROM '%s'`, streamAddr) + go func() { + _, err := conn.Exec(query) + errCh <- err + }() + select { + case allowResponse <- struct{}{}: + case err := <-errCh: + t.Fatalf("%s: query returned before expected: %s", err, query) + } + close(allowResponse) + + var jobID string + var streamJobID int + testutils.SucceedsSoon(t, func() error { + row := conn.QueryRow("SELECT id FROM system.jobs ORDER BY created DESC LIMIT 1") + return row.Scan(&jobID) + }) + streamJobID, err = strconv.Atoi(jobID) + require.NoError(t, err) // Wait for the job to signal that it is ready to be canceled. - <-cancelJobCh - close(cancelJobCh) - - // Canceling the job should shutdown the ingestion processors via a context - // cancellation, and subsequently rollback data above our frontier - // timestamp. - // TODO(adityamaru): Change this to cutover once we have cutover logic in - // place. + <-canBeCanceledCh + close(canBeCanceledCh) + + tenantPrefix := keys.MakeTenantPrefix(roachpb.MakeTenantID(uint64(tenantID))) + store := tc.GetFirstStoreFromServer(t, 0) + require.False(t, isKeyspaceIsEmpty(t, store, tenantPrefix, tenantPrefix.PrefixEnd())) + _, err = conn.Exec(`CANCEL JOB $1`, streamJobID) require.NoError(t, err) - // We expect the statement to fail. + + // Expect the job to complete with an error. if err := <-errCh; err == nil { t.Fatal(err) } - // Wait for the ingestion job to have been canceled. + // Wait for the ingestion job to have been marked as canceled. testutils.SucceedsSoon(t, func() error { var status string sqlDB.QueryRow(t, `SELECT status FROM system.jobs WHERE id = $1`, streamJobID).Scan(&status) @@ -144,33 +307,23 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) { return nil }) - progress := &jobspb.Progress{} - var streamProgress []byte - sqlDB.QueryRow( - t, `SELECT progress FROM system.jobs WHERE id=$1`, streamJobID, - ).Scan(&streamProgress) - - if err := protoutil.Unmarshal(streamProgress, progress); err != nil { - t.Fatal("cannot unmarshal job progress from system.jobs") - } - highWaterTimestamp := progress.GetHighWater() - if highWaterTimestamp == nil { - t.Fatal(errors.New("expected the highWaterTimestamp written to progress to be non-nil")) - } - ts := *highWaterTimestamp - require.True(t, !ts.IsEmpty()) - // Check the validator for any failures. for _, err := range streamValidator.failures() { t.Fatal(err) } - tenantPrefix := keys.MakeTenantPrefix(roachpb.MakeTenantID(uint64(tenantID))) - maxIngestedTS := assertExactlyEqualKVs(t, tc, streamValidator, ts, tenantPrefix) + require.True(t, isKeyspaceIsEmpty(t, store, tenantPrefix, tenantPrefix.PrefixEnd())) +} - //Sanity check that the max ts in the store is less than the ts stored in the - //job progress. - require.True(t, maxIngestedTS.LessEq(ts)) +func isKeyspaceIsEmpty(t *testing.T, store *kvserver.Store, startKey, endKey roachpb.Key) bool { + it := store.Engine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ + UpperBound: endKey, + }) + defer it.Close() + it.SeekGE(storage.MVCCKey{Key: startKey}) + ok, err := it.Valid() + require.NoError(t, err) + return !ok } // assertExactlyEqualKVs runs an incremental iterator on the underlying store. diff --git a/pkg/ccl/streamingccl/streamingutils/BUILD.bazel b/pkg/ccl/streamingccl/streamingutils/BUILD.bazel index 4bf25f325dd5..108cb579fca5 100644 --- a/pkg/ccl/streamingccl/streamingutils/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamingutils/BUILD.bazel @@ -32,6 +32,7 @@ go_test( "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/keys", + "//pkg/kv", "//pkg/roachpb", "//pkg/security", "//pkg/security/securitytest", diff --git a/pkg/ccl/streamingccl/streamingutils/utils.go b/pkg/ccl/streamingccl/streamingutils/utils.go index 2056bac36bdd..7b25197491be 100644 --- a/pkg/ccl/streamingccl/streamingutils/utils.go +++ b/pkg/ccl/streamingccl/streamingutils/utils.go @@ -50,6 +50,31 @@ func doCompleteIngestion( return errors.Newf("job %d: not of expected type StreamIngest", jobID) } + // Check that the supplied cutover time is a valid one. + // TODO(adityamaru): This will change once we allow a future cutover time to + // be specified. + hw := progress.GetHighWater() + if hw == nil { + return errors.Newf("cannot cutover to a timestamp %s that is after the latest resolved time"+ + " %s for job %d", cutoverTimestamp.String(), hlc.Timestamp{}.String(), jobID) + } + + highWaterTimestamp := *hw + if highWaterTimestamp.Less(cutoverTimestamp) { + return errors.Newf("cannot cutover to a timestamp %s that is after the latest resolved time"+ + " %s for job %d", cutoverTimestamp.String(), highWaterTimestamp.String(), jobID) + } + + // Reject setting a cutover time, if an earlier request to cutover has already + // been set. + // TODO(adityamaru): This should change in the future, a user should be + // allowed to correct their cutover time if the process of reverting the job + // has not started.. + if !sp.StreamIngest.CutoverTime.IsEmpty() { + return errors.Newf("cutover timestamp already set to %s, "+ + "job %d is in the process of cutting over", sp.StreamIngest.CutoverTime.String(), jobID) + } + // Update the sentinel being polled by the stream ingestion job to // check if a complete has been signaled. sp.StreamIngest.CutoverTime = cutoverTimestamp diff --git a/pkg/ccl/streamingccl/streamingutils/utils_test.go b/pkg/ccl/streamingccl/streamingutils/utils_test.go index f4b89fd1100e..d82c628d358a 100644 --- a/pkg/ccl/streamingccl/streamingutils/utils_test.go +++ b/pkg/ccl/streamingccl/streamingutils/utils_test.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -59,14 +60,48 @@ func TestCutoverBuiltin(t *testing.T) { require.True(t, ok) require.True(t, sp.StreamIngest.CutoverTime.IsEmpty()) + // This should fail since no highwatermark is set on the progress. cutoverTime := timeutil.Now() - var jobID int + _, err = db.ExecContext( + ctx, + `SELECT crdb_internal.complete_stream_ingestion_job($1, $2)`, + *job.ID(), cutoverTime) + require.Error(t, err, "cannot cutover to a timestamp") + + var highWater time.Time + err = job.HighWaterProgressed(ctx, nil /* txn */, func(ctx context.Context, txn *kv.Txn, + details jobspb.ProgressDetails) (hlc.Timestamp, error) { + highWater = timeutil.Now() + hlcHighWater := hlc.Timestamp{WallTime: highWater.UnixNano()} + return hlcHighWater, nil + }) + require.NoError(t, err) + + // This should fail since the highwatermark is less than the cutover time + // passed to the builtin. + cutoverTime = timeutil.Now() + _, err = db.ExecContext( + ctx, + `SELECT crdb_internal.complete_stream_ingestion_job($1, $2)`, + *job.ID(), cutoverTime) + require.Error(t, err, "cannot cutover to a timestamp") + + // This should succeed since the highwatermark is equal to the cutover time. + var jobID int64 err = db.QueryRowContext( ctx, `SELECT crdb_internal.complete_stream_ingestion_job($1, $2)`, - *job.ID(), cutoverTime).Scan(&jobID) + *job.ID(), highWater).Scan(&jobID) require.NoError(t, err) - require.Equal(t, *job.ID(), int64(jobID)) + require.Equal(t, *job.ID(), jobID) + + // This should fail since we already have a cutover time set on the job + // progress. + _, err = db.ExecContext( + ctx, + `SELECT crdb_internal.complete_stream_ingestion_job($1, $2)`, + *job.ID(), cutoverTime) + require.Error(t, err, "cutover timestamp already set") // Check that sentinel is set on the job progress. sj, err := registry.LoadJob(ctx, *job.ID()) @@ -77,5 +112,5 @@ func TestCutoverBuiltin(t *testing.T) { // The builtin only offers microsecond precision and so we must account for // that when comparing against our chosen time. cutoverTime = cutoverTime.Round(time.Microsecond) - require.Equal(t, hlc.Timestamp{WallTime: cutoverTime.UnixNano()}, sp.StreamIngest.CutoverTime) + require.Equal(t, hlc.Timestamp{WallTime: highWater.UnixNano()}, sp.StreamIngest.CutoverTime) }