Skip to content

Commit

Permalink
streamingest: add OnFailOrCancel ClearRange logic to stream ingestion
Browse files Browse the repository at this point in the history
When the job fails, we must ClearRange the target keyspace being
ingested into.

Release note: None
  • Loading branch information
adityamaru committed Feb 23, 2021
1 parent 90ec8f3 commit ac660ae
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 4 deletions.
25 changes: 22 additions & 3 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (s *streamIngestionResumer) checkForCutoverSignal(
if !sp.StreamIngest.CutoverTime.IsEmpty() {
// Sanity check that the requested cutover time is less than equal to
// the resolved ts recorded in the job progress. This should already
// have been enforced when the cutover was signalled via the builtin.
// have been enforced when the cutover was signaled via the builtin.
// TODO(adityamaru): Remove this when we allow users to specify a
// cutover time in the future.
resolvedTimestamp := progress.GetHighWater()
Expand Down Expand Up @@ -211,8 +211,27 @@ func (s *streamIngestionResumer) revertToLatestResolvedTimestamp(

// OnFailOrCancel is part of the jobs.Resumer interface.
func (s *streamIngestionResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error {
// TODO(adityamaru): Add ClearRange logic.
return nil
p := execCtx.(sql.JobExecContext)
db := p.ExecCfg().DB
j, err := p.ExecCfg().JobRegistry.LoadJob(ctx, *s.job.ID())
if err != nil {
return err
}
details := j.Details()
var sd jobspb.StreamIngestionDetails
var ok bool
if sd, ok = details.(jobspb.StreamIngestionDetails); !ok {
return errors.Newf("unknown details type %T in stream ingestion job %d",
details, *s.job.ID())
}
var b kv.Batch
b.AddRawRequest(&roachpb.ClearRangeRequest{
RequestHeader: roachpb.RequestHeader{
Key: sd.Span.Key,
EndKey: sd.Span.EndKey,
},
})
return db.Run(ctx, &b)
}

var _ jobs.Resumer = &streamIngestionResumer{}
Expand Down
121 changes: 120 additions & 1 deletion pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -168,7 +169,7 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) {

// Cutting over the job should shutdown the ingestion processors via a context
// cancellation, and subsequently rollback data above our frontier timestamp.
tz := time.Unix(0, highwater.WallTime).UTC()
tz := timeutil.Unix(0, highwater.WallTime).UTC()
_, err = conn.Exec(`SELECT crdb_internal.complete_stream_ingestion_job ($1, $2)`,
streamJobID, tz)
require.NoError(t, err)
Expand Down Expand Up @@ -207,6 +208,124 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) {
require.True(t, maxIngestedTS.LessEq(revertRangeTargetTime))
}

func TestStreamIngestionJobCancelWithRandomClient(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond)

canBeCanceledCh := make(chan struct{})
threshold := 10
mu := syncutil.Mutex{}
completeJobAfterCheckpoints := makeCheckpointEventCounter(&mu, threshold, func() {
canBeCanceledCh <- struct{}{}
})
streamValidator := newStreamClientValidator()
registerValidator := registerValidatorWithClient(streamValidator)
knobs := base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{StreamIngestionTestingKnobs: &sql.StreamIngestionTestingKnobs{
Interceptors: []func(event streamingccl.Event, pa streamingccl.PartitionAddress){completeJobAfterCheckpoints,
registerValidator},
},
},
}
serverArgs := base.TestServerArgs{}
serverArgs.Knobs = knobs

var allowResponse chan struct{}
params := base.TestClusterArgs{ServerArgs: serverArgs}
params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
TestingResponseFilter: jobutils.BulkOpResponseFilter(&allowResponse),
}

numNodes := 3
tc := testcluster.StartTestCluster(t, numNodes, params)
defer tc.Stopper().Stop(ctx)
sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))
conn := tc.Conns[0]

tenantID := 10
valueRange := 100
kvsPerResolved := 200
kvFrequency := 50 * time.Nanosecond
numPartitions := 2
dupProbability := 0.2
streamAddr := makeTestStreamURI(valueRange, kvsPerResolved, numPartitions, tenantID, kvFrequency,
dupProbability)

// Start the ingestion stream and wait for at least one AddSSTable to ensure the job is running.
allowResponse = make(chan struct{})
errCh := make(chan error)
defer close(errCh)
_, err := conn.Exec(`SET CLUSTER SETTING bulkio.stream_ingestion.minimum_flush_interval= '0.0005ms'`)
require.NoError(t, err)
query := fmt.Sprintf(`RESTORE TENANT 10 FROM REPLICATION STREAM FROM '%s'`, streamAddr)
go func() {
_, err := conn.Exec(query)
errCh <- err
}()
select {
case allowResponse <- struct{}{}:
case err := <-errCh:
t.Fatalf("%s: query returned before expected: %s", err, query)
}
close(allowResponse)

var jobID string
var streamJobID int
testutils.SucceedsSoon(t, func() error {
row := conn.QueryRow("SELECT id FROM system.jobs ORDER BY created DESC LIMIT 1")
return row.Scan(&jobID)
})
streamJobID, err = strconv.Atoi(jobID)
require.NoError(t, err)

// Wait for the job to signal that it is ready to be canceled.
<-canBeCanceledCh
close(canBeCanceledCh)

tenantPrefix := keys.MakeTenantPrefix(roachpb.MakeTenantID(uint64(tenantID)))
store := tc.GetFirstStoreFromServer(t, 0)
require.False(t, isKeyspaceIsEmpty(t, store, tenantPrefix, tenantPrefix.PrefixEnd()))

_, err = conn.Exec(`CANCEL JOB $1`, streamJobID)
require.NoError(t, err)

// Expect the job to complete with an error.
if err := <-errCh; err == nil {
t.Fatal(err)
}

// Wait for the ingestion job to have been marked as canceled.
testutils.SucceedsSoon(t, func() error {
var status string
sqlDB.QueryRow(t, `SELECT status FROM system.jobs WHERE id = $1`, streamJobID).Scan(&status)
if jobs.Status(status) != jobs.StatusCanceled {
return errors.New("job not in canceled state")
}
return nil
})

// Check the validator for any failures.
for _, err := range streamValidator.failures() {
t.Fatal(err)
}

require.True(t, isKeyspaceIsEmpty(t, store, tenantPrefix, tenantPrefix.PrefixEnd()))
}

func isKeyspaceIsEmpty(t *testing.T, store *kvserver.Store, startKey, endKey roachpb.Key) bool {
it := store.Engine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
UpperBound: endKey,
})
defer it.Close()
it.SeekGE(storage.MVCCKey{Key: startKey})
ok, err := it.Valid()
require.NoError(t, err)
return !ok
}

// assertExactlyEqualKVs runs an incremental iterator on the underlying store.
// At every key the method polls the `streamValidator` to return the KVEvents
// for that particular key, and have a timestamp less than equal to the
Expand Down

0 comments on commit ac660ae

Please sign in to comment.