From 665b56d7177edf05b5ff85acae35e07e53ec6828 Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Fri, 29 Jan 2021 12:26:52 -0500 Subject: [PATCH] streamingest: add job level test for stream ingestion This change adds a test that exercises all the components of the stream ingestion flow. It fixes some missing pieces that were discovered while writing the test. Release note: None --- pkg/ccl/changefeedccl/cdctest/validator.go | 37 ++- .../streamclient/random_stream_client.go | 61 ++++- .../streamingest/stream_ingestion_job.go | 9 +- .../streamingest/stream_ingestion_job_test.go | 2 +- .../streamingest/stream_ingestion_planning.go | 7 +- .../stream_ingestion_processor_planning.go | 6 +- .../stream_ingestion_processor_test.go | 68 +++-- .../streamingest/stream_ingestion_test.go | 252 ++++++++++++++++++ 8 files changed, 383 insertions(+), 59 deletions(-) create mode 100644 pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go diff --git a/pkg/ccl/changefeedccl/cdctest/validator.go b/pkg/ccl/changefeedccl/cdctest/validator.go index 77c7ed060a07..e6e541528d19 100644 --- a/pkg/ccl/changefeedccl/cdctest/validator.go +++ b/pkg/ccl/changefeedccl/cdctest/validator.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" ) @@ -36,15 +37,17 @@ type Validator interface { // StreamClientValidatorWrapper wraps a Validator and exposes additional methods // used by stream ingestion to check for correctness. type StreamClientValidatorWrapper interface { + Validator GetValuesForKeyBelowTimestamp(key string, timestamp hlc.Timestamp) ([]roachpb.KeyValue, error) - GetValidator() Validator } type streamValidator struct { Validator + mu syncutil.Mutex } var _ StreamClientValidatorWrapper = &streamValidator{} +var _ Validator = &streamValidator{} // NewStreamClientValidatorWrapper returns a wrapped Validator, that can be used // to validate the events emitted by the cluster to cluster streaming client. @@ -55,15 +58,10 @@ var _ StreamClientValidatorWrapper = &streamValidator{} func NewStreamClientValidatorWrapper() StreamClientValidatorWrapper { ov := NewOrderValidator("unusedC2C") return &streamValidator{ - ov, + Validator: ov, } } -// GetValidator implements the StreamClientValidatorWrapper interface. -func (sv *streamValidator) GetValidator() Validator { - return sv.Validator -} - // GetValuesForKeyBelowTimestamp implements the StreamClientValidatorWrapper // interface. // It returns the streamed KV updates for `key` with a ts less than equal to @@ -71,9 +69,9 @@ func (sv *streamValidator) GetValidator() Validator { func (sv *streamValidator) GetValuesForKeyBelowTimestamp( key string, timestamp hlc.Timestamp, ) ([]roachpb.KeyValue, error) { - orderValidator, ok := sv.GetValidator().(*orderValidator) + orderValidator, ok := sv.Validator.(*orderValidator) if !ok { - return nil, errors.Newf("unknown validator %T: ", sv.GetValidator()) + return nil, errors.Newf("unknown validator %T: ", sv.Validator) } timestampValueTuples := orderValidator.keyTimestampAndValues[key] timestampsIdx := sort.Search(len(timestampValueTuples), func(i int) bool { @@ -94,6 +92,27 @@ func (sv *streamValidator) GetValuesForKeyBelowTimestamp( return kv, nil } +// NoteRow implements the Validator interface. +func (sv *streamValidator) NoteRow( + partition string, key, value string, updated hlc.Timestamp, +) error { + sv.mu.Lock() + defer sv.mu.Unlock() + return sv.Validator.NoteRow(partition, key, value, updated) +} + +// NoteResolved implements the Validator interface. +func (sv *streamValidator) NoteResolved(partition string, resolved hlc.Timestamp) error { + sv.mu.Lock() + defer sv.mu.Unlock() + return sv.Validator.NoteResolved(partition, resolved) +} + +// Failures implements the Validator interface. +func (sv *streamValidator) Failures() []string { + return sv.Validator.Failures() +} + type timestampValue struct { ts hlc.Timestamp value string diff --git a/pkg/ccl/streamingccl/streamclient/random_stream_client.go b/pkg/ccl/streamingccl/streamclient/random_stream_client.go index bf9341396804..bc800bd4b0de 100644 --- a/pkg/ccl/streamingccl/streamclient/random_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/random_stream_client.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -54,6 +53,10 @@ const ( // DupProbability controls the probability with which we emit duplicate KV // events. DupProbability = "DUP_PROBABILITY" + // TenantID specifies the ID of the tenant we are ingesting data into. This + // allows the client to prefix the generated KVs with the appropriate tenant + // prefix. + TenantID = "TENANT_ID" // IngestionDatabaseID is the ID used in the generated table descriptor. IngestionDatabaseID = 50 /* defaultDB */ // IngestionTablePrefix is the prefix of the table name used in the generated @@ -79,6 +82,7 @@ type randomStreamConfig struct { kvsPerCheckpoint int numPartitions int dupProbability float64 + tenantID roachpb.TenantID } func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) { @@ -88,6 +92,7 @@ func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) { kvsPerCheckpoint: 100, numPartitions: 1, dupProbability: 0.5, + tenantID: roachpb.SystemTenantID, } var err error @@ -126,6 +131,14 @@ func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) { return c, err } } + + if tenantIDStr := streamURL.Query().Get(TenantID); tenantIDStr != "" { + id, err := strconv.Atoi(tenantIDStr) + if err != nil { + return c, err + } + c.tenantID = roachpb.MakeTenantID(uint64(id)) + } return c, nil } @@ -147,6 +160,7 @@ type randomStreamClient struct { syncutil.Mutex interceptors []func(streamingccl.Event) + tableID int } } @@ -162,16 +176,18 @@ func newRandomStreamClient(streamURL *url.URL) (Client, error) { return nil, err } - return &randomStreamClient{ - config: streamConfig, - }, nil + client := randomStreamClient{config: streamConfig} + client.mu.Lock() + defer client.mu.Unlock() + client.mu.tableID = 52 + return &client, nil } -var testTableID = 52 - -func getNextTableID() int { - ret := testTableID - testTableID++ +func (m *randomStreamClient) getNextTableID() int { + m.mu.Lock() + defer m.mu.Unlock() + ret := m.mu.tableID + m.mu.tableID++ return ret } @@ -184,7 +200,7 @@ func (m *randomStreamClient) GetTopology( // Allocate table IDs and return one per partition address in the topology. for i := 0; i < m.config.numPartitions; i++ { - tableID := descpb.ID(getNextTableID()) + tableID := descpb.ID(m.getNextTableID()) partitionURI := url.URL{ Scheme: TestScheme, Host: strconv.Itoa(int(tableID)), @@ -207,28 +223,32 @@ func (m *randomStreamClient) getDescriptorAndNamespaceKVForTableID( IngestionDatabaseID, tableID, fmt.Sprintf(RandomStreamSchemaPlaceholder, tableName), - systemschema.JobsTable.Privileges, + &descpb.PrivilegeDescriptor{}, ) if err != nil { return nil, nil, err } - //// Generate namespace entry. + // Generate namespace entry. key := catalogkeys.NewTableKey(50, keys.PublicSchemaID, testTable.Name) + k := rekey(m.config.tenantID, key.Key(keys.TODOSQLCodec)) var value roachpb.Value value.SetInt(int64(testTable.GetID())) + value.InitChecksum(k) namespaceKV := roachpb.KeyValue{ - Key: key.Key(keys.TODOSQLCodec), + Key: k, Value: value, } // Generate descriptor entry. descKey := catalogkeys.MakeDescMetadataKey(keys.TODOSQLCodec, testTable.GetID()) + descKey = rekey(m.config.tenantID, descKey) descDesc := testTable.DescriptorProto() var descValue roachpb.Value if err := descValue.SetProto(descDesc); err != nil { panic(err) } + descValue.InitChecksum(descKey) descKV := roachpb.KeyValue{ Key: descKey, Value: descValue, @@ -332,6 +352,19 @@ func (m *randomStreamClient) ConsumePartition( return eventCh, nil } +func rekey(tenantID roachpb.TenantID, k roachpb.Key) roachpb.Key { + // Strip old prefix. + tenantPrefix := keys.MakeTenantPrefix(tenantID) + noTenantPrefix, _, err := keys.DecodeTenantPrefix(k) + if err != nil { + panic(err) + } + + // Prepend tenant prefix. + rekeyedKey := append(tenantPrefix, noTenantPrefix...) + return rekeyedKey +} + func (m *randomStreamClient) makeRandomKey( r *rand.Rand, tableDesc *tabledesc.Mutable, ) roachpb.KeyValue { @@ -342,6 +375,8 @@ func (m *randomStreamClient) makeRandomKey( } k = keys.MakeFamilyKey(k, uint32(tableDesc.Families[0].ID)) + k = rekey(m.config.tenantID, k) + // Create a value holding a random integer. valueDatum := tree.NewDInt(tree.DInt(r.Intn(m.config.valueRange))) valueBuf, err := rowenc.EncodeTableValue( diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index b9dbfdf6da13..4fa66cbebdea 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/errors" ) @@ -48,6 +49,10 @@ func ingest( // KVs. We can skip to ingesting after this resolved ts. Plumb the // initialHighwatermark to the ingestion processor spec based on what we read // from the job progress. + var initialHighWater hlc.Timestamp + if h := progress.GetHighWater(); h != nil && !h.IsEmpty() { + initialHighWater = *h + } evalCtx := execCtx.ExtendedEvalContext() dsp := execCtx.DistSQLPlanner() @@ -59,14 +64,12 @@ func ingest( // Construct stream ingestion processor specs. streamIngestionSpecs, streamIngestionFrontierSpec, err := distStreamIngestionPlanSpecs( - streamAddress, topology, nodes, jobID) + streamAddress, topology, nodes, initialHighWater) if err != nil { return err } // Plan and run the DistSQL flow. - // TODO: Updates from this flow need to feed back into the job to update the - // progress. err = distStreamIngest(ctx, execCtx, nodes, jobID, planCtx, dsp, streamIngestionSpecs, streamIngestionFrontierSpec) if err != nil { diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go index 1444cee45d4f..9560d657859c 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go @@ -27,7 +27,7 @@ import ( "github.com/stretchr/testify/require" ) -// TestStreamIngestionJobRollBack tests that the job rollsback the data to the +// TestStreamIngestionJobRollBack tests that the job rolls back the data to the // start time if there are no progress updates. This test should be expanded // after the job's progress field is updated as the job runs. func TestStreamIngestionJobRollBack(t *testing.T) { diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go index dc4b2bff4afe..2aaed0cf2b88 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go @@ -80,7 +80,7 @@ func ingestionPlanHook( prefix := keys.MakeTenantPrefix(ingestionStmt.Targets.Tenant) streamIngestionDetails := jobspb.StreamIngestionDetails{ StreamAddress: streamingccl.StreamAddress(from[0]), - Span: roachpb.Span{Key: prefix, EndKey: prefix.Next()}, + Span: roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()}, // TODO: Figure out what the initial ts should be. StartTime: hlc.Timestamp{}, } @@ -110,7 +110,10 @@ func ingestionPlanHook( return err } - return sj.Start(ctx) + if err := sj.Start(ctx); err != nil { + return err + } + return sj.AwaitCompletion(ctx) } return fn, utilccl.BulkJobExecutionResultHeader, nil, false, nil diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go index 712e74766dd5..97223613d92a 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go @@ -30,7 +30,7 @@ func distStreamIngestionPlanSpecs( streamAddress streamingccl.StreamAddress, topology streamingccl.Topology, nodes []roachpb.NodeID, - jobID int64, + initialHighWater hlc.Timestamp, ) ([]*execinfrapb.StreamIngestionDataSpec, *execinfrapb.StreamIngestionFrontierSpec, error) { // For each stream partition in the topology, assign it to a node. @@ -63,8 +63,8 @@ func distStreamIngestionPlanSpecs( // Create a spec for the StreamIngestionFrontier processor on the coordinator // node. - // TODO: set HighWaterAtStart once the job progress logic has been hooked up. - streamIngestionFrontierSpec := &execinfrapb.StreamIngestionFrontierSpec{TrackedSpans: trackedSpans} + streamIngestionFrontierSpec := &execinfrapb.StreamIngestionFrontierSpec{ + HighWaterAtStart: initialHighWater, TrackedSpans: trackedSpans} return streamIngestionSpecs, streamIngestionFrontierSpec, nil } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go index 1d07f011c7f8..b4711d65e180 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go @@ -37,6 +37,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" @@ -94,19 +95,21 @@ func TestStreamIngestionProcessor(t *testing.T) { v := roachpb.MakeValueFromString("value_1") v.Timestamp = hlc.Timestamp{WallTime: 1} sampleKV := roachpb.KeyValue{Key: roachpb.Key("key_1"), Value: v} - unusedPartitionAddress := streamingccl.PartitionAddress("") - events := []streamingccl.Event{ - streamingccl.MakeKVEvent(sampleKV, unusedPartitionAddress), - streamingccl.MakeKVEvent(sampleKV, unusedPartitionAddress), - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}, unusedPartitionAddress), - streamingccl.MakeKVEvent(sampleKV, unusedPartitionAddress), - streamingccl.MakeKVEvent(sampleKV, unusedPartitionAddress), - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}, unusedPartitionAddress), + getEvents := func(partition streamingccl.PartitionAddress) []streamingccl.Event { + return []streamingccl.Event{ + streamingccl.MakeKVEvent(sampleKV, partition), + streamingccl.MakeKVEvent(sampleKV, partition), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}, partition), + streamingccl.MakeKVEvent(sampleKV, partition), + streamingccl.MakeKVEvent(sampleKV, partition), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}, partition), + } } pa1 := streamingccl.PartitionAddress("partition1") pa2 := streamingccl.PartitionAddress("partition2") mockClient := &mockStreamClient{ - partitionEvents: map[streamingccl.PartitionAddress][]streamingccl.Event{pa1: events, pa2: events}, + partitionEvents: map[streamingccl.PartitionAddress][]streamingccl.Event{pa1: getEvents(pa1), + pa2: getEvents(pa2)}, } startTime := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} @@ -225,6 +228,19 @@ func assertEqualKVs( } } +func makeTestStreamURI( + valueRange, kvsPerResolved, numPartitions, tenantID int, + kvFrequency time.Duration, + dupProbability float64, +) string { + return "test:///" + "?VALUE_RANGE=" + strconv.Itoa(valueRange) + + "&KV_FREQUENCY=" + strconv.Itoa(int(kvFrequency)) + + "&KVS_PER_CHECKPOINT=" + strconv.Itoa(kvsPerResolved) + + "&NUM_PARTITIONS=" + strconv.Itoa(numPartitions) + + "&DUP_PROBABILITY=" + strconv.FormatFloat(dupProbability, 'f', -1, 32) + + "&TENANT_ID=" + strconv.Itoa(tenantID) +} + // TestRandomClientGeneration tests the ingestion processor against a random // stream workload. func TestRandomClientGeneration(t *testing.T) { @@ -233,17 +249,6 @@ func TestRandomClientGeneration(t *testing.T) { ctx := context.Background() - makeTestStreamURI := func( - valueRange, kvsPerResolved, numPartitions int, - kvFrequency time.Duration, dupProbability float64, - ) string { - return "test:///" + "?VALUE_RANGE=" + strconv.Itoa(valueRange) + - "&KV_FREQUENCY=" + strconv.Itoa(int(kvFrequency)) + - "&KVS_PER_CHECKPOINT=" + strconv.Itoa(kvsPerResolved) + - "&NUM_PARTITIONS=" + strconv.Itoa(numPartitions) + - "&DUP_PROBABILITY=" + strconv.FormatFloat(dupProbability, 'f', -1, 32) - } - tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{}) defer tc.Stopper().Stop(ctx) kvDB := tc.Server(0).DB() @@ -256,8 +261,8 @@ func TestRandomClientGeneration(t *testing.T) { kvFrequency := 50 * time.Nanosecond numPartitions := 4 dupProbability := 0.2 - streamAddr := makeTestStreamURI(valueRange, kvsPerResolved, numPartitions, kvFrequency, - dupProbability) + streamAddr := makeTestStreamURI(valueRange, kvsPerResolved, numPartitions, + int(roachpb.SystemTenantID.ToUint64()), kvFrequency, dupProbability) // The random client returns system and table data partitions. streamClient, err := streamclient.NewStreamClient(streamingccl.StreamAddress(streamAddr)) @@ -271,9 +276,10 @@ func TestRandomClientGeneration(t *testing.T) { ctx, cancel := context.WithCancel(ctx) // Cancel the flow after emitting 1000 checkpoint events from the client. - cancelAfterCheckpoints := makeCheckpointEventCounter(1000, cancel) + mu := syncutil.Mutex{} + cancelAfterCheckpoints := makeCheckpointEventCounter(&mu, 1000, cancel) streamValidator := cdctest.NewStreamClientValidatorWrapper() - validator := registerValidator(streamValidator.GetValidator()) + validator := registerValidatorWithClient(streamValidator) out, err := runStreamIngestionProcessor(ctx, t, kvDB, streamAddr, topo.Partitions, startTime, []func(streamingccl.Event){cancelAfterCheckpoints, validator}, nil /* mockClient */) require.NoError(t, err) @@ -317,7 +323,7 @@ func TestRandomClientGeneration(t *testing.T) { } // Ensure that no errors were reported to the validator. - for _, failure := range streamValidator.GetValidator().Failures() { + for _, failure := range streamValidator.Failures() { t.Error(failure) } @@ -391,7 +397,7 @@ func runStreamIngestionProcessor( return out, err } -func registerValidator(validator cdctest.Validator) func(event streamingccl.Event) { +func registerValidatorWithClient(validator cdctest.Validator) func(event streamingccl.Event) { return func(event streamingccl.Event) { switch event.Type() { case streamingccl.CheckpointEvent: @@ -416,13 +422,19 @@ func registerValidator(validator cdctest.Validator) func(event streamingccl.Even // makeCheckpointEventCounter runs f after seeing `threshold` number of // checkpoint events. -func makeCheckpointEventCounter(threshold int, f func()) func(streamingccl.Event) { +func makeCheckpointEventCounter( + mu *syncutil.Mutex, threshold int, f func(), +) func(streamingccl.Event) { + mu.Lock() + defer mu.Unlock() numCheckpointEventsGenerated := 0 return func(event streamingccl.Event) { + mu.Lock() + defer mu.Unlock() switch event.Type() { case streamingccl.CheckpointEvent: numCheckpointEventsGenerated++ - if numCheckpointEventsGenerated > threshold { + if numCheckpointEventsGenerated == threshold { f() } } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go new file mode 100644 index 000000000000..25b069aa6530 --- /dev/null +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go @@ -0,0 +1,252 @@ +// Copyright 2020 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streamingest + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" + "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" + "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// Dummy import to pull in kvtenantccl. This allows us to start tenants. +var _ = kvtenantccl.Connector{} + +// TestStreamIngestionJobWithRandomClient creates a stream ingestion job that is +// fed KVs from the random stream client. After receiving a certain number of +// resolved timestamp events the test cancels the job to tear down the flow, and +// rollback to the latest resolved frontier timestamp. +// The test scans the KV store to compare all MVCC KVs against the relevant +// streamed KV Events, thereby ensuring that we end up in a consistent state. +func TestStreamIngestionJobWithRandomClient(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond) + + cancelJobCh := make(chan struct{}) + threshold := 10 + mu := syncutil.Mutex{} + cancelJobAfterCheckpoints := makeCheckpointEventCounter(&mu, threshold, func() { + cancelJobCh <- struct{}{} + }) + streamValidator := cdctest.NewStreamClientValidatorWrapper() + registerValidator := registerValidatorWithClient(streamValidator) + knobs := base.TestingKnobs{ + DistSQL: &execinfra.TestingKnobs{StreamIngestionTestingKnobs: &sql.StreamIngestionTestingKnobs{ + Interceptors: []func(event streamingccl.Event){cancelJobAfterCheckpoints, registerValidator}, + }, + }, + } + serverArgs := base.TestServerArgs{} + serverArgs.Knobs = knobs + + var allowResponse chan struct{} + params := base.TestClusterArgs{ServerArgs: serverArgs} + params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{ + TestingResponseFilter: jobutils.BulkOpResponseFilter(&allowResponse), + } + + numNodes := 3 + tc := testcluster.StartTestCluster(t, numNodes, params) + defer tc.Stopper().Stop(ctx) + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + conn := tc.Conns[0] + + // Create a tenant. + _ = security.EmbeddedTenantIDs() + + tenantID := 10 + _, conn10 := serverutils.StartTenant(t, tc.Server(0), + base.TestTenantArgs{TenantID: roachpb.MakeTenantID(uint64(tenantID))}) + defer conn10.Close() + tenant10 := sqlutils.MakeSQLRunner(conn10) + + // Prevent a logging assertion that the server ID is initialized multiple + // times. + log.TestingClearServerIdentifiers() + + valueRange := 100 + kvsPerResolved := 10000 + kvFrequency := 50 * time.Nanosecond + numPartitions := 3 + dupProbability := 0.4 + streamAddr := makeTestStreamURI(valueRange, kvsPerResolved, numPartitions, tenantID, kvFrequency, + dupProbability) + + // Start the ingestion stream and wait for at least one AddSSTable to ensure the job is running. + allowResponse = make(chan struct{}) + errCh := make(chan error) + query := fmt.Sprintf(`RESTORE TENANT 10 FROM REPLICATION STREAM FROM '%s'`, streamAddr) + go func() { + _, err := conn.Exec(query) + errCh <- err + }() + select { + case allowResponse <- struct{}{}: + case err := <-errCh: + t.Fatalf("%s: query returned before expected: %s", err, query) + } + close(allowResponse) + + var streamJobID string + testutils.SucceedsSoon(t, func() error { + row := conn.QueryRow("SELECT id FROM system.jobs ORDER BY created DESC LIMIT 1") + return row.Scan(&streamJobID) + }) + + // Wait for the job to signal that it is ready to be cancelled. + <-cancelJobCh + close(cancelJobCh) + + // Cancelling the job should shutdown the ingestion processors via a context + // cancellation, and subsequently rollback data above our frontier + // timestamp. + _, err := conn.Exec(`CANCEL JOB $1`, streamJobID) + require.NoError(t, err) + + // Wait for the ingestion job to have been cancelled. + testutils.SucceedsSoon(t, func() error { + var status string + sqlDB.QueryRow(t, `SELECT status FROM system.jobs WHERE id = $1`, streamJobID).Scan(&status) + if jobs.Status(status) != jobs.StatusCanceled { + return errors.New("could not cancel job") + } + return nil + }) + + progress := &jobspb.Progress{} + var streamProgress []byte + sqlDB.QueryRow( + t, `SELECT progress FROM system.jobs WHERE id=$1`, streamJobID, + ).Scan(&streamProgress) + + if err := protoutil.Unmarshal(streamProgress, progress); err != nil { + t.Fatal("cannot unmarshal job progress from system.jobs") + } + highWaterTimestamp := progress.GetHighWater() + require.True(t, highWaterTimestamp != nil) + ts := *highWaterTimestamp + require.True(t, !ts.IsEmpty()) + + // Check the validator for any failures. + for _, err := range streamValidator.Failures() { + t.Fatal(err) + } + + tenantPrefix := keys.MakeTenantPrefix(roachpb.MakeTenantID(uint64(tenantID))) + maxIngestedTS := assertExactlyEqualKVs(t, tc, streamValidator, ts, tenantPrefix) + + // Sanity check that the max ts in the store is less than the ts stored in the + // job progress. + require.True(t, maxIngestedTS.LessEq(ts)) + + // Lastly, check for two tables in the tenant, since we ingested 2 partitions. + tenant10.CheckQueryResults(t, `SELECT table_name FROM [SHOW TABLES] ORDER BY table_name`, + [][]string{{"foo52"}, {"foo53"}, {"foo54"}}) +} + +// assertExactlyEqualKVs runs an incremental iterator on the underlying store. +// At every key the method polls the `streamValidator` to return the KVEvents +// for that particular key, and have a timestamp less than equal to the +// `frontierTimestamp`. The key and value must be identical between the two. +func assertExactlyEqualKVs( + t *testing.T, + tc *testcluster.TestCluster, + streamValidator cdctest.StreamClientValidatorWrapper, + frontierTimestamp hlc.Timestamp, + tenantPrefix roachpb.Key, +) hlc.Timestamp { + // Iterate over the store. + store := tc.GetFirstStoreFromServer(t, 0) + it := store.Engine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ + LowerBound: tenantPrefix, + UpperBound: tenantPrefix.PrefixEnd(), + }) + defer it.Close() + var prevKey roachpb.Key + var valueTimestampTuples []roachpb.KeyValue + var err error + var maxKVTimestampSeen hlc.Timestamp + var matchingKVs int + for it.SeekGE(storage.MVCCKey{}); ; it.Next() { + if ok, err := it.Valid(); !ok { + if err != nil { + t.Fatal(err) + } + break + } + if maxKVTimestampSeen.Less(it.Key().Timestamp) { + maxKVTimestampSeen = it.Key().Timestamp + } + newKey := (prevKey != nil && !it.Key().Key.Equal(prevKey)) || prevKey == nil + prevKey = it.Key().Key + + if newKey { + // All value ts should have been drained at this point, otherwise there is + // a mismatch between the streamed and ingested data. + require.Equal(t, 0, len(valueTimestampTuples)) + valueTimestampTuples, err = streamValidator.GetValuesForKeyBelowTimestamp( + string(it.Key().Key), frontierTimestamp) + require.NoError(t, err) + } + + // If there are no values stored in the validator to match against the + // current key then we skip to the next key. + if len(valueTimestampTuples) == 0 { + continue + } + + require.Greater(t, len(valueTimestampTuples), 0) + // Since the iterator goes from latest to older versions, we compare + // starting from the end of the slice that is sorted by timestamp. + latestVersionInChain := valueTimestampTuples[len(valueTimestampTuples)-1] + require.Equal(t, roachpb.KeyValue{ + Key: it.Key().Key, + Value: roachpb.Value{ + RawBytes: it.Value(), + Timestamp: it.Key().Timestamp, + }, + }, latestVersionInChain) + matchingKVs++ + // Truncate the latest version which we just checked against in preparation + // for the next iteration. + valueTimestampTuples = valueTimestampTuples[0 : len(valueTimestampTuples)-1] + } + // Sanity check that we have compared a non-zero number of KVs. + require.Greater(t, matchingKVs, 0) + return maxKVTimestampSeen +}