diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index cfe90012c3e0..0f1c81607259 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -105,7 +105,7 @@ func (s *streamIngestionResumer) checkForCutoverSignal( if !sp.StreamIngest.CutoverTime.IsEmpty() { // Sanity check that the requested cutover time is less than equal to // the resolved ts recorded in the job progress. This should already - // have been enforced when the cutover was signalled via the builtin. + // have been enforced when the cutover was signaled via the builtin. // TODO(adityamaru): Remove this when we allow users to specify a // cutover time in the future. resolvedTimestamp := progress.GetHighWater() @@ -211,8 +211,27 @@ func (s *streamIngestionResumer) revertToLatestResolvedTimestamp( // OnFailOrCancel is part of the jobs.Resumer interface. func (s *streamIngestionResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error { - // TODO(adityamaru): Add ClearRange logic. - return nil + p := execCtx.(sql.JobExecContext) + db := p.ExecCfg().DB + j, err := p.ExecCfg().JobRegistry.LoadJob(ctx, *s.job.ID()) + if err != nil { + return err + } + details := j.Details() + var sd jobspb.StreamIngestionDetails + var ok bool + if sd, ok = details.(jobspb.StreamIngestionDetails); !ok { + return errors.Newf("unknown details type %T in stream ingestion job %d", + details, *s.job.ID()) + } + var b kv.Batch + b.AddRawRequest(&roachpb.ClearRangeRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: sd.Span.Key, + EndKey: sd.Span.EndKey, + }, + }) + return db.Run(ctx, &b) } var _ jobs.Resumer = &streamIngestionResumer{} diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go index 7d91216b3d5a..fd106a0362e7 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go @@ -37,6 +37,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -168,7 +169,7 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) { // Cutting over the job should shutdown the ingestion processors via a context // cancellation, and subsequently rollback data above our frontier timestamp. - tz := time.Unix(0, highwater.WallTime).UTC() + tz := timeutil.Unix(0, highwater.WallTime).UTC() _, err = conn.Exec(`SELECT crdb_internal.complete_stream_ingestion_job ($1, $2)`, streamJobID, tz) require.NoError(t, err) @@ -207,6 +208,124 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) { require.True(t, maxIngestedTS.LessEq(revertRangeTargetTime)) } +func TestStreamIngestionJobCancelWithRandomClient(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond) + + canBeCanceledCh := make(chan struct{}) + threshold := 10 + mu := syncutil.Mutex{} + completeJobAfterCheckpoints := makeCheckpointEventCounter(&mu, threshold, func() { + canBeCanceledCh <- struct{}{} + }) + streamValidator := newStreamClientValidator() + registerValidator := registerValidatorWithClient(streamValidator) + knobs := base.TestingKnobs{ + DistSQL: &execinfra.TestingKnobs{StreamIngestionTestingKnobs: &sql.StreamIngestionTestingKnobs{ + Interceptors: []func(event streamingccl.Event, pa streamingccl.PartitionAddress){completeJobAfterCheckpoints, + registerValidator}, + }, + }, + } + serverArgs := base.TestServerArgs{} + serverArgs.Knobs = knobs + + var allowResponse chan struct{} + params := base.TestClusterArgs{ServerArgs: serverArgs} + params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{ + TestingResponseFilter: jobutils.BulkOpResponseFilter(&allowResponse), + } + + numNodes := 3 + tc := testcluster.StartTestCluster(t, numNodes, params) + defer tc.Stopper().Stop(ctx) + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + conn := tc.Conns[0] + + tenantID := 10 + valueRange := 100 + kvsPerResolved := 200 + kvFrequency := 50 * time.Nanosecond + numPartitions := 2 + dupProbability := 0.2 + streamAddr := makeTestStreamURI(valueRange, kvsPerResolved, numPartitions, tenantID, kvFrequency, + dupProbability) + + // Start the ingestion stream and wait for at least one AddSSTable to ensure the job is running. + allowResponse = make(chan struct{}) + errCh := make(chan error) + defer close(errCh) + _, err := conn.Exec(`SET CLUSTER SETTING bulkio.stream_ingestion.minimum_flush_interval= '0.0005ms'`) + require.NoError(t, err) + query := fmt.Sprintf(`RESTORE TENANT 10 FROM REPLICATION STREAM FROM '%s'`, streamAddr) + go func() { + _, err := conn.Exec(query) + errCh <- err + }() + select { + case allowResponse <- struct{}{}: + case err := <-errCh: + t.Fatalf("%s: query returned before expected: %s", err, query) + } + close(allowResponse) + + var jobID string + var streamJobID int + testutils.SucceedsSoon(t, func() error { + row := conn.QueryRow("SELECT id FROM system.jobs ORDER BY created DESC LIMIT 1") + return row.Scan(&jobID) + }) + streamJobID, err = strconv.Atoi(jobID) + require.NoError(t, err) + + // Wait for the job to signal that it is ready to be canceled. + <-canBeCanceledCh + close(canBeCanceledCh) + + tenantPrefix := keys.MakeTenantPrefix(roachpb.MakeTenantID(uint64(tenantID))) + store := tc.GetFirstStoreFromServer(t, 0) + require.False(t, isKeyspaceIsEmpty(t, store, tenantPrefix, tenantPrefix.PrefixEnd())) + + _, err = conn.Exec(`CANCEL JOB $1`, streamJobID) + require.NoError(t, err) + + // Expect the job to complete with an error. + if err := <-errCh; err == nil { + t.Fatal(err) + } + + // Wait for the ingestion job to have been marked as canceled. + testutils.SucceedsSoon(t, func() error { + var status string + sqlDB.QueryRow(t, `SELECT status FROM system.jobs WHERE id = $1`, streamJobID).Scan(&status) + if jobs.Status(status) != jobs.StatusCanceled { + return errors.New("job not in canceled state") + } + return nil + }) + + // Check the validator for any failures. + for _, err := range streamValidator.failures() { + t.Fatal(err) + } + + require.True(t, isKeyspaceIsEmpty(t, store, tenantPrefix, tenantPrefix.PrefixEnd())) +} + +func isKeyspaceIsEmpty(t *testing.T, store *kvserver.Store, startKey, endKey roachpb.Key) bool { + it := store.Engine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ + UpperBound: endKey, + }) + defer it.Close() + it.SeekGE(storage.MVCCKey{Key: startKey}) + ok, err := it.Valid() + require.NoError(t, err) + return !ok +} + // assertExactlyEqualKVs runs an incremental iterator on the underlying store. // At every key the method polls the `streamValidator` to return the KVEvents // for that particular key, and have a timestamp less than equal to the