diff --git a/pkg/ccl/changefeedccl/cdctest/validator.go b/pkg/ccl/changefeedccl/cdctest/validator.go index 77c7ed060a07..bef55c6c0302 100644 --- a/pkg/ccl/changefeedccl/cdctest/validator.go +++ b/pkg/ccl/changefeedccl/cdctest/validator.go @@ -33,65 +33,13 @@ type Validator interface { Failures() []string } -// StreamClientValidatorWrapper wraps a Validator and exposes additional methods -// used by stream ingestion to check for correctness. -type StreamClientValidatorWrapper interface { - GetValuesForKeyBelowTimestamp(key string, timestamp hlc.Timestamp) ([]roachpb.KeyValue, error) - GetValidator() Validator -} - -type streamValidator struct { +// StreamValidator wraps a Validator and exposes additional methods for +// introspection. +type StreamValidator interface { Validator -} - -var _ StreamClientValidatorWrapper = &streamValidator{} - -// NewStreamClientValidatorWrapper returns a wrapped Validator, that can be used -// to validate the events emitted by the cluster to cluster streaming client. -// The wrapper currently only "wraps" an orderValidator, but can be built out -// to utilize other Validator's. -// The wrapper also allows querying the orderValidator to retrieve streamed -// events from an in-memory store. -func NewStreamClientValidatorWrapper() StreamClientValidatorWrapper { - ov := NewOrderValidator("unusedC2C") - return &streamValidator{ - ov, - } -} - -// GetValidator implements the StreamClientValidatorWrapper interface. -func (sv *streamValidator) GetValidator() Validator { - return sv.Validator -} - -// GetValuesForKeyBelowTimestamp implements the StreamClientValidatorWrapper -// interface. -// It returns the streamed KV updates for `key` with a ts less than equal to -// `timestamp`. -func (sv *streamValidator) GetValuesForKeyBelowTimestamp( - key string, timestamp hlc.Timestamp, -) ([]roachpb.KeyValue, error) { - orderValidator, ok := sv.GetValidator().(*orderValidator) - if !ok { - return nil, errors.Newf("unknown validator %T: ", sv.GetValidator()) - } - timestampValueTuples := orderValidator.keyTimestampAndValues[key] - timestampsIdx := sort.Search(len(timestampValueTuples), func(i int) bool { - return timestamp.Less(timestampValueTuples[i].ts) - }) - var kv []roachpb.KeyValue - for _, tsValue := range timestampValueTuples[:timestampsIdx] { - byteRep := []byte(key) - kv = append(kv, roachpb.KeyValue{ - Key: byteRep, - Value: roachpb.Value{ - RawBytes: []byte(tsValue.value), - Timestamp: tsValue.ts, - }, - }) - } - - return kv, nil + // GetValuesForKeyBelowTimestamp returns the streamed KV updates for `key` + // with a ts less than equal to timestamp`. + GetValuesForKeyBelowTimestamp(key string, timestamp hlc.Timestamp) ([]roachpb.KeyValue, error) } type timestampValue struct { @@ -109,6 +57,7 @@ type orderValidator struct { } var _ Validator = &orderValidator{} +var _ StreamValidator = &orderValidator{} // NewOrderValidator returns a Validator that checks the row and resolved // timestamp ordering guarantees. It also asserts that keys have an affinity to @@ -128,6 +77,40 @@ func NewOrderValidator(topic string) Validator { } } +// NewStreamOrderValidator wraps and orderValidator as described above, and +// exposes additional methods for introspection. +func NewStreamOrderValidator() StreamValidator { + return &orderValidator{ + topic: "unused", + partitionForKey: make(map[string]string), + keyTimestampAndValues: make(map[string][]timestampValue), + resolved: make(map[string]hlc.Timestamp), + } +} + +// GetValuesForKeyBelowTimestamp implements the StreamValidator interface. +func (v *orderValidator) GetValuesForKeyBelowTimestamp( + key string, timestamp hlc.Timestamp, +) ([]roachpb.KeyValue, error) { + timestampValueTuples := v.keyTimestampAndValues[key] + timestampsIdx := sort.Search(len(timestampValueTuples), func(i int) bool { + return timestamp.Less(timestampValueTuples[i].ts) + }) + var kv []roachpb.KeyValue + for _, tsValue := range timestampValueTuples[:timestampsIdx] { + byteRep := []byte(key) + kv = append(kv, roachpb.KeyValue{ + Key: byteRep, + Value: roachpb.Value{ + RawBytes: []byte(tsValue.value), + Timestamp: tsValue.ts, + }, + }) + } + + return kv, nil +} + // NoteRow implements the Validator interface. func (v *orderValidator) NoteRow(partition string, key, value string, updated hlc.Timestamp) error { if prev, ok := v.partitionForKey[key]; ok && prev != partition { diff --git a/pkg/ccl/streamingccl/streamclient/BUILD.bazel b/pkg/ccl/streamingccl/streamclient/BUILD.bazel index 0b24249f1236..f94d1b63917b 100644 --- a/pkg/ccl/streamingccl/streamclient/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamclient/BUILD.bazel @@ -16,7 +16,6 @@ go_library( "//pkg/sql", "//pkg/sql/catalog/catalogkeys", "//pkg/sql/catalog/descpb", - "//pkg/sql/catalog/systemschema", "//pkg/sql/catalog/tabledesc", "//pkg/sql/rowenc", "//pkg/sql/sem/tree", diff --git a/pkg/ccl/streamingccl/streamclient/random_stream_client.go b/pkg/ccl/streamingccl/streamclient/random_stream_client.go index 4012c97a6a47..6813e5a4afd0 100644 --- a/pkg/ccl/streamingccl/streamclient/random_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/random_stream_client.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -54,6 +53,10 @@ const ( // DupProbability controls the probability with which we emit duplicate KV // events. DupProbability = "DUP_PROBABILITY" + // TenantID specifies the ID of the tenant we are ingesting data into. This + // allows the client to prefix the generated KVs with the appropriate tenant + // prefix. + TenantID = "TENANT_ID" // IngestionDatabaseID is the ID used in the generated table descriptor. IngestionDatabaseID = 50 /* defaultDB */ // IngestionTablePrefix is the prefix of the table name used in the generated @@ -79,6 +82,7 @@ type randomStreamConfig struct { kvsPerCheckpoint int numPartitions int dupProbability float64 + tenantID roachpb.TenantID } func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) { @@ -88,6 +92,7 @@ func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) { kvsPerCheckpoint: 100, numPartitions: 1, dupProbability: 0.5, + tenantID: roachpb.SystemTenantID, } var err error @@ -126,6 +131,14 @@ func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) { return c, err } } + + if tenantIDStr := streamURL.Query().Get(TenantID); tenantIDStr != "" { + id, err := strconv.Atoi(tenantIDStr) + if err != nil { + return c, err + } + c.tenantID = roachpb.MakeTenantID(uint64(id)) + } return c, nil } @@ -147,6 +160,7 @@ type randomStreamClient struct { // interceptors can be registered to peek at every event generated by this // client. interceptors []func(streamingccl.Event, streamingccl.PartitionAddress) + tableID int } } @@ -162,16 +176,18 @@ func newRandomStreamClient(streamURL *url.URL) (Client, error) { return nil, err } - return &randomStreamClient{ - config: streamConfig, - }, nil + client := randomStreamClient{config: streamConfig} + client.mu.Lock() + defer client.mu.Unlock() + client.mu.tableID = 52 + return &client, nil } -var testTableID = 52 - -func getNextTableID() int { - ret := testTableID - testTableID++ +func (m *randomStreamClient) getNextTableID() int { + m.mu.Lock() + defer m.mu.Unlock() + ret := m.mu.tableID + m.mu.tableID++ return ret } @@ -184,7 +200,7 @@ func (m *randomStreamClient) GetTopology( // Allocate table IDs and return one per partition address in the topology. for i := 0; i < m.config.numPartitions; i++ { - tableID := descpb.ID(getNextTableID()) + tableID := descpb.ID(m.getNextTableID()) partitionURI := url.URL{ Scheme: TestScheme, Host: strconv.Itoa(int(tableID)), @@ -207,7 +223,7 @@ func (m *randomStreamClient) getDescriptorAndNamespaceKVForTableID( IngestionDatabaseID, tableID, fmt.Sprintf(RandomStreamSchemaPlaceholder, tableName), - systemschema.JobsTable.GetPrivileges(), + &descpb.PrivilegeDescriptor{}, ) if err != nil { return nil, nil, err @@ -215,20 +231,24 @@ func (m *randomStreamClient) getDescriptorAndNamespaceKVForTableID( // Generate namespace entry. key := catalogkeys.NewTableKey(50, keys.PublicSchemaID, testTable.Name) + k := rekey(m.config.tenantID, key.Key(keys.TODOSQLCodec)) var value roachpb.Value value.SetInt(int64(testTable.GetID())) + value.InitChecksum(k) namespaceKV := roachpb.KeyValue{ - Key: key.Key(keys.TODOSQLCodec), + Key: k, Value: value, } // Generate descriptor entry. descKey := catalogkeys.MakeDescMetadataKey(keys.TODOSQLCodec, testTable.GetID()) + descKey = rekey(m.config.tenantID, descKey) descDesc := testTable.DescriptorProto() var descValue roachpb.Value if err := descValue.SetProto(descDesc); err != nil { panic(err) } + descValue.InitChecksum(descKey) descKV := roachpb.KeyValue{ Key: descKey, Value: descValue, @@ -280,6 +300,7 @@ func (m *randomStreamClient) ConsumePartition( resolvedTime := timeutil.Now() hlcResolvedTime := hlc.Timestamp{WallTime: resolvedTime.UnixNano()} event = streamingccl.MakeCheckpointEvent(hlcResolvedTime) + dupKVEvent = nil numKVEventsSinceLastResolved = 0 } else { @@ -328,6 +349,19 @@ func (m *randomStreamClient) ConsumePartition( return eventCh, nil } +func rekey(tenantID roachpb.TenantID, k roachpb.Key) roachpb.Key { + // Strip old prefix. + tenantPrefix := keys.MakeTenantPrefix(tenantID) + noTenantPrefix, _, err := keys.DecodeTenantPrefix(k) + if err != nil { + panic(err) + } + + // Prepend tenant prefix. + rekeyedKey := append(tenantPrefix, noTenantPrefix...) + return rekeyedKey +} + func (m *randomStreamClient) makeRandomKey( r *rand.Rand, tableDesc *tabledesc.Mutable, ) roachpb.KeyValue { @@ -338,6 +372,8 @@ func (m *randomStreamClient) makeRandomKey( } k = keys.MakeFamilyKey(k, uint32(tableDesc.Families[0].ID)) + k = rekey(m.config.tenantID, k) + // Create a value holding a random integer. valueDatum := tree.NewDInt(tree.DInt(r.Intn(m.config.valueRange))) valueBuf, err := rowenc.EncodeTableValue( diff --git a/pkg/ccl/streamingccl/streamingest/BUILD.bazel b/pkg/ccl/streamingccl/streamingest/BUILD.bazel index 9844eaa1ed24..04ca6fde0896 100644 --- a/pkg/ccl/streamingccl/streamingest/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamingest/BUILD.bazel @@ -8,10 +8,12 @@ go_library( "stream_ingestion_planning.go", "stream_ingestion_processor.go", "stream_ingestion_processor_planning.go", + "stream_ingestion_test_utils.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingest", visibility = ["//visibility:public"], deps = [ + "//pkg/ccl/changefeedccl/cdctest", "//pkg/ccl/storageccl", "//pkg/ccl/streamingccl", "//pkg/ccl/streamingccl/streamclient", @@ -38,6 +40,7 @@ go_library( "//pkg/util/log", "//pkg/util/protoutil", "//pkg/util/span", + "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/tracing", "@com_github_cockroachdb_errors//:errors", @@ -55,11 +58,12 @@ go_test( "stream_ingestion_frontier_processor_test.go", "stream_ingestion_job_test.go", "stream_ingestion_processor_test.go", + "stream_ingestion_test.go", ], embed = [":streamingest"], deps = [ "//pkg/base", - "//pkg/ccl/changefeedccl/cdctest", + "//pkg/ccl/kvccl/kvtenantccl", "//pkg/ccl/storageccl", "//pkg/ccl/streamingccl", "//pkg/ccl/streamingccl/streamclient", @@ -68,6 +72,7 @@ go_test( "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/kv", + "//pkg/kv/kvserver", "//pkg/roachpb", "//pkg/security", "//pkg/security/securitytest", @@ -80,7 +85,9 @@ go_test( "//pkg/storage", "//pkg/testutils", "//pkg/testutils/distsqlutils", + "//pkg/testutils/jobutils", "//pkg/testutils/serverutils", + "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util/hlc", @@ -88,6 +95,7 @@ go_test( "//pkg/util/log", "//pkg/util/protoutil", "//pkg/util/randutil", + "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index b9dbfdf6da13..a586a8a8e7ee 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/errors" ) @@ -48,6 +49,10 @@ func ingest( // KVs. We can skip to ingesting after this resolved ts. Plumb the // initialHighwatermark to the ingestion processor spec based on what we read // from the job progress. + var initialHighWater hlc.Timestamp + if h := progress.GetHighWater(); h != nil && !h.IsEmpty() { + initialHighWater = *h + } evalCtx := execCtx.ExtendedEvalContext() dsp := execCtx.DistSQLPlanner() @@ -59,14 +64,12 @@ func ingest( // Construct stream ingestion processor specs. streamIngestionSpecs, streamIngestionFrontierSpec, err := distStreamIngestionPlanSpecs( - streamAddress, topology, nodes, jobID) + streamAddress, topology, nodes, initialHighWater) if err != nil { return err } // Plan and run the DistSQL flow. - // TODO: Updates from this flow need to feed back into the job to update the - // progress. err = distStreamIngest(ctx, execCtx, nodes, jobID, planCtx, dsp, streamIngestionSpecs, streamIngestionFrontierSpec) if err != nil { @@ -109,6 +112,14 @@ func (s *streamIngestionResumer) OnFailOrCancel(ctx context.Context, execCtx int } resolvedTime = *highWatermark } + + // TODO(adityamaru): If the job progress was not set then we should + // probably ClearRange. Take this into account when writing the ClearRange + // OnFailOrCancel(). + if resolvedTime.IsEmpty() { + return nil + } + var b kv.Batch b.AddRawRequest(&roachpb.RevertRangeRequest{ RequestHeader: roachpb.RequestHeader{ diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go index 1444cee45d4f..9560d657859c 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go @@ -27,7 +27,7 @@ import ( "github.com/stretchr/testify/require" ) -// TestStreamIngestionJobRollBack tests that the job rollsback the data to the +// TestStreamIngestionJobRollBack tests that the job rolls back the data to the // start time if there are no progress updates. This test should be expanded // after the job's progress field is updated as the job runs. func TestStreamIngestionJobRollBack(t *testing.T) { diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go index dc4b2bff4afe..2aaed0cf2b88 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go @@ -80,7 +80,7 @@ func ingestionPlanHook( prefix := keys.MakeTenantPrefix(ingestionStmt.Targets.Tenant) streamIngestionDetails := jobspb.StreamIngestionDetails{ StreamAddress: streamingccl.StreamAddress(from[0]), - Span: roachpb.Span{Key: prefix, EndKey: prefix.Next()}, + Span: roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()}, // TODO: Figure out what the initial ts should be. StartTime: hlc.Timestamp{}, } @@ -110,7 +110,10 @@ func ingestionPlanHook( return err } - return sj.Start(ctx) + if err := sj.Start(ctx); err != nil { + return err + } + return sj.AwaitCompletion(ctx) } return fn, utilccl.BulkJobExecutionResultHeader, nil, false, nil diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go index 562cf7fd03db..c8e032cc2197 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go @@ -30,7 +30,7 @@ func distStreamIngestionPlanSpecs( streamAddress streamingccl.StreamAddress, topology streamingccl.Topology, nodes []roachpb.NodeID, - jobID int64, + initialHighWater hlc.Timestamp, ) ([]*execinfrapb.StreamIngestionDataSpec, *execinfrapb.StreamIngestionFrontierSpec, error) { // For each stream partition in the topology, assign it to a node. @@ -63,8 +63,8 @@ func distStreamIngestionPlanSpecs( // Create a spec for the StreamIngestionFrontier processor on the coordinator // node. - // TODO: set HighWaterAtStart once the job progress logic has been hooked up. - streamIngestionFrontierSpec := &execinfrapb.StreamIngestionFrontierSpec{TrackedSpans: trackedSpans} + streamIngestionFrontierSpec := &execinfrapb.StreamIngestionFrontierSpec{ + HighWaterAtStart: initialHighWater, TrackedSpans: trackedSpans} return streamIngestionSpecs, streamIngestionFrontierSpec, nil } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go index f3648d724373..d96cb96b85c1 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go @@ -16,7 +16,6 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -37,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" @@ -196,7 +196,7 @@ func getPartitionSpanToTableID( func assertEqualKVs( t *testing.T, tc *testcluster.TestCluster, - streamValidator cdctest.StreamClientValidatorWrapper, + streamValidator *streamClientValidator, tableID int, partitionTimestamp hlc.Timestamp, ) { @@ -233,7 +233,7 @@ func assertEqualKVs( // All value ts should have been drained at this point, otherwise there is // a mismatch between the streamed and ingested data. require.Equal(t, 0, len(valueTimestampTuples)) - valueTimestampTuples, err = streamValidator.GetValuesForKeyBelowTimestamp( + valueTimestampTuples, err = streamValidator.getValuesForKeyBelowTimestamp( string(it.Key().Key), partitionTimestamp) require.NoError(t, err) } @@ -255,6 +255,19 @@ func assertEqualKVs( } } +func makeTestStreamURI( + valueRange, kvsPerResolved, numPartitions, tenantID int, + kvFrequency time.Duration, + dupProbability float64, +) string { + return "test:///" + "?VALUE_RANGE=" + strconv.Itoa(valueRange) + + "&EVENT_FREQUENCY=" + strconv.Itoa(int(kvFrequency)) + + "&KVS_PER_CHECKPOINT=" + strconv.Itoa(kvsPerResolved) + + "&NUM_PARTITIONS=" + strconv.Itoa(numPartitions) + + "&DUP_PROBABILITY=" + strconv.FormatFloat(dupProbability, 'f', -1, 32) + + "&TENANT_ID=" + strconv.Itoa(tenantID) +} + // TestRandomClientGeneration tests the ingestion processor against a random // stream workload. func TestRandomClientGeneration(t *testing.T) { @@ -263,17 +276,6 @@ func TestRandomClientGeneration(t *testing.T) { ctx := context.Background() - makeTestStreamURI := func( - valueRange, kvsPerResolved, numPartitions int, - kvFrequency time.Duration, dupProbability float64, - ) string { - return "test:///" + "?VALUE_RANGE=" + strconv.Itoa(valueRange) + - "&EVENT_FREQUENCY=" + strconv.Itoa(int(kvFrequency)) + - "&KVS_PER_CHECKPOINT=" + strconv.Itoa(kvsPerResolved) + - "&NUM_PARTITIONS=" + strconv.Itoa(numPartitions) + - "&DUP_PROBABILITY=" + strconv.FormatFloat(dupProbability, 'f', -1, 32) - } - tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{}) defer tc.Stopper().Stop(ctx) kvDB := tc.Server(0).DB() @@ -286,8 +288,8 @@ func TestRandomClientGeneration(t *testing.T) { kvFrequency := 50 * time.Nanosecond numPartitions := 4 dupProbability := 0.2 - streamAddr := makeTestStreamURI(valueRange, kvsPerResolved, numPartitions, kvFrequency, - dupProbability) + streamAddr := makeTestStreamURI(valueRange, kvsPerResolved, numPartitions, + int(roachpb.SystemTenantID.ToUint64()), kvFrequency, dupProbability) // The random client returns system and table data partitions. streamClient, err := streamclient.NewStreamClient(streamingccl.StreamAddress(streamAddr)) @@ -301,9 +303,10 @@ func TestRandomClientGeneration(t *testing.T) { ctx, cancel := context.WithCancel(ctx) // Cancel the flow after emitting 1000 checkpoint events from the client. - cancelAfterCheckpoints := makeCheckpointEventCounter(1000, cancel) - streamValidator := cdctest.NewStreamClientValidatorWrapper() - validator := registerValidator(streamValidator.GetValidator()) + mu := syncutil.Mutex{} + cancelAfterCheckpoints := makeCheckpointEventCounter(&mu, 1000, cancel) + streamValidator := newStreamClientValidator() + validator := registerValidatorWithClient(streamValidator) out, err := runStreamIngestionProcessor(ctx, t, kvDB, streamAddr, topo.Partitions, startTime, []func(streamingccl.Event, streamingccl.PartitionAddress){cancelAfterCheckpoints, validator}, nil /* mockClient */) @@ -350,7 +353,7 @@ func TestRandomClientGeneration(t *testing.T) { } // Ensure that no errors were reported to the validator. - for _, failure := range streamValidator.GetValidator().Failures() { + for _, failure := range streamValidator.Failures() { t.Error(failure) } @@ -424,21 +427,21 @@ func runStreamIngestionProcessor( return out, err } -func registerValidator( - validator cdctest.Validator, +func registerValidatorWithClient( + validator *streamClientValidator, ) func(event streamingccl.Event, pa streamingccl.PartitionAddress) { return func(event streamingccl.Event, pa streamingccl.PartitionAddress) { switch event.Type() { case streamingccl.CheckpointEvent: resolvedTS := *event.GetResolved() - err := validator.NoteResolved(string(pa), resolvedTS) + err := validator.noteResolved(string(pa), resolvedTS) if err != nil { panic(err.Error()) } case streamingccl.KVEvent: kv := *event.GetKV() - err := validator.NoteRow(string(pa), string(kv.Key), string(kv.Value.RawBytes), + err := validator.noteRow(string(pa), string(kv.Key), string(kv.Value.RawBytes), kv.Value.Timestamp) if err != nil { panic(err.Error()) @@ -450,14 +453,18 @@ func registerValidator( // makeCheckpointEventCounter runs f after seeing `threshold` number of // checkpoint events. func makeCheckpointEventCounter( - threshold int, f func(), + mu *syncutil.Mutex, threshold int, f func(), ) func(streamingccl.Event, streamingccl.PartitionAddress) { + mu.Lock() + defer mu.Unlock() numCheckpointEventsGenerated := 0 return func(event streamingccl.Event, _ streamingccl.PartitionAddress) { + mu.Lock() + defer mu.Unlock() switch event.Type() { case streamingccl.CheckpointEvent: numCheckpointEventsGenerated++ - if numCheckpointEventsGenerated > threshold { + if numCheckpointEventsGenerated == threshold { f() } } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go new file mode 100644 index 000000000000..90cca8516301 --- /dev/null +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go @@ -0,0 +1,245 @@ +// Copyright 2020 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streamingest + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" + "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// TestStreamIngestionJobWithRandomClient creates a stream ingestion job that is +// fed KVs from the random stream client. After receiving a certain number of +// resolved timestamp events the test cancels the job to tear down the flow, and +// rollback to the latest resolved frontier timestamp. +// The test scans the KV store to compare all MVCC KVs against the relevant +// streamed KV Events, thereby ensuring that we end up in a consistent state. +func TestStreamIngestionJobWithRandomClient(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderRaceWithIssue(t, 60710) + + ctx := context.Background() + defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond) + + cancelJobCh := make(chan struct{}) + threshold := 10 + mu := syncutil.Mutex{} + cancelJobAfterCheckpoints := makeCheckpointEventCounter(&mu, threshold, func() { + cancelJobCh <- struct{}{} + }) + streamValidator := newStreamClientValidator() + registerValidator := registerValidatorWithClient(streamValidator) + knobs := base.TestingKnobs{ + DistSQL: &execinfra.TestingKnobs{StreamIngestionTestingKnobs: &sql.StreamIngestionTestingKnobs{ + Interceptors: []func(event streamingccl.Event, pa streamingccl.PartitionAddress){cancelJobAfterCheckpoints, + registerValidator}, + }, + }, + } + serverArgs := base.TestServerArgs{} + serverArgs.Knobs = knobs + + var allowResponse chan struct{} + params := base.TestClusterArgs{ServerArgs: serverArgs} + params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{ + TestingResponseFilter: jobutils.BulkOpResponseFilter(&allowResponse), + } + + numNodes := 3 + tc := testcluster.StartTestCluster(t, numNodes, params) + defer tc.Stopper().Stop(ctx) + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + conn := tc.Conns[0] + + tenantID := 10 + valueRange := 100 + kvsPerResolved := 200 + kvFrequency := 50 * time.Nanosecond + numPartitions := 2 + dupProbability := 0.2 + streamAddr := makeTestStreamURI(valueRange, kvsPerResolved, numPartitions, tenantID, kvFrequency, + dupProbability) + + // Start the ingestion stream and wait for at least one AddSSTable to ensure the job is running. + allowResponse = make(chan struct{}) + errCh := make(chan error) + defer close(errCh) + _, err := conn.Exec(`SET CLUSTER SETTING bulkio.stream_ingestion.minimum_flush_interval= '0.0005ms'`) + require.NoError(t, err) + query := fmt.Sprintf(`RESTORE TENANT 10 FROM REPLICATION STREAM FROM '%s'`, streamAddr) + go func() { + _, err := conn.Exec(query) + errCh <- err + }() + select { + case allowResponse <- struct{}{}: + case err := <-errCh: + t.Fatalf("%s: query returned before expected: %s", err, query) + } + close(allowResponse) + + var streamJobID string + testutils.SucceedsSoon(t, func() error { + row := conn.QueryRow("SELECT id FROM system.jobs ORDER BY created DESC LIMIT 1") + return row.Scan(&streamJobID) + }) + + // Wait for the job to signal that it is ready to be canceled. + <-cancelJobCh + close(cancelJobCh) + + // Canceling the job should shutdown the ingestion processors via a context + // cancellation, and subsequently rollback data above our frontier + // timestamp. + // TODO(adityamaru): Change this to cutover once we have cutover logic in + // place. + _, err = conn.Exec(`CANCEL JOB $1`, streamJobID) + require.NoError(t, err) + // We expect the statement to fail. + if err := <-errCh; err == nil { + t.Fatal(err) + } + + // Wait for the ingestion job to have been canceled. + testutils.SucceedsSoon(t, func() error { + var status string + sqlDB.QueryRow(t, `SELECT status FROM system.jobs WHERE id = $1`, streamJobID).Scan(&status) + if jobs.Status(status) != jobs.StatusCanceled { + return errors.New("job not in canceled state") + } + return nil + }) + + progress := &jobspb.Progress{} + var streamProgress []byte + sqlDB.QueryRow( + t, `SELECT progress FROM system.jobs WHERE id=$1`, streamJobID, + ).Scan(&streamProgress) + + if err := protoutil.Unmarshal(streamProgress, progress); err != nil { + t.Fatal("cannot unmarshal job progress from system.jobs") + } + highWaterTimestamp := progress.GetHighWater() + if highWaterTimestamp == nil { + t.Fatal(errors.New("expected the highWaterTimestamp written to progress to be non-nil")) + } + ts := *highWaterTimestamp + require.True(t, !ts.IsEmpty()) + + // Check the validator for any failures. + for _, err := range streamValidator.failures() { + t.Fatal(err) + } + + tenantPrefix := keys.MakeTenantPrefix(roachpb.MakeTenantID(uint64(tenantID))) + maxIngestedTS := assertExactlyEqualKVs(t, tc, streamValidator, ts, tenantPrefix) + + //Sanity check that the max ts in the store is less than the ts stored in the + //job progress. + require.True(t, maxIngestedTS.LessEq(ts)) +} + +// assertExactlyEqualKVs runs an incremental iterator on the underlying store. +// At every key the method polls the `streamValidator` to return the KVEvents +// for that particular key, and have a timestamp less than equal to the +// `frontierTimestamp`. The key and value must be identical between the two. +func assertExactlyEqualKVs( + t *testing.T, + tc *testcluster.TestCluster, + streamValidator *streamClientValidator, + frontierTimestamp hlc.Timestamp, + tenantPrefix roachpb.Key, +) hlc.Timestamp { + // Iterate over the store. + store := tc.GetFirstStoreFromServer(t, 0) + it := store.Engine().NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{ + LowerBound: tenantPrefix, + UpperBound: tenantPrefix.PrefixEnd(), + }) + defer it.Close() + var prevKey roachpb.Key + var valueTimestampTuples []roachpb.KeyValue + var err error + var maxKVTimestampSeen hlc.Timestamp + var matchingKVs int + for it.SeekGE(storage.MVCCKey{}); ; it.Next() { + if ok, err := it.Valid(); !ok { + if err != nil { + t.Fatal(err) + } + break + } + if maxKVTimestampSeen.Less(it.Key().Timestamp) { + maxKVTimestampSeen = it.Key().Timestamp + } + newKey := (prevKey != nil && !it.Key().Key.Equal(prevKey)) || prevKey == nil + prevKey = it.Key().Key + + if newKey { + // All value ts should have been drained at this point, otherwise there is + // a mismatch between the streamed and ingested data. + require.Equal(t, 0, len(valueTimestampTuples)) + valueTimestampTuples, err = streamValidator.getValuesForKeyBelowTimestamp( + string(it.Key().Key), frontierTimestamp) + require.NoError(t, err) + } + + // If there are no values stored in the validator to match against the + // current key then we skip to the next key. + if len(valueTimestampTuples) == 0 { + continue + } + + require.Greater(t, len(valueTimestampTuples), 0) + // Since the iterator goes from latest to older versions, we compare + // starting from the end of the slice that is sorted by timestamp. + latestVersionInChain := valueTimestampTuples[len(valueTimestampTuples)-1] + require.Equal(t, roachpb.KeyValue{ + Key: it.Key().Key, + Value: roachpb.Value{ + RawBytes: it.Value(), + Timestamp: it.Key().Timestamp, + }, + }, latestVersionInChain) + matchingKVs++ + // Truncate the latest version which we just checked against in preparation + // for the next iteration. + valueTimestampTuples = valueTimestampTuples[0 : len(valueTimestampTuples)-1] + } + // Sanity check that we have compared a non-zero number of KVs. + require.Greater(t, matchingKVs, 0) + return maxKVTimestampSeen +} diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_test_utils.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_test_utils.go new file mode 100644 index 000000000000..0caa7966fc1e --- /dev/null +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_test_utils.go @@ -0,0 +1,65 @@ +// Copyright 2020 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streamingest + +import ( + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// streamClientValidatorWrapper wraps a Validator and exposes additional methods +// used by stream ingestion to check for correctness. +type streamClientValidator struct { + cdctest.StreamValidator + + mu syncutil.Mutex +} + +// newStreamClientValidator returns a wrapped Validator, that can be used +// to validate the events emitted by the cluster to cluster streaming client. +// The wrapper currently only "wraps" an orderValidator, but can be built out +// to utilize other Validator's. +// The wrapper also allows querying the orderValidator to retrieve streamed +// events from an in-memory store. +func newStreamClientValidator() *streamClientValidator { + ov := cdctest.NewStreamOrderValidator() + return &streamClientValidator{ + StreamValidator: ov, + } +} + +func (sv *streamClientValidator) noteRow( + partition string, key, value string, updated hlc.Timestamp, +) error { + sv.mu.Lock() + defer sv.mu.Unlock() + return sv.NoteRow(partition, key, value, updated) +} + +func (sv *streamClientValidator) noteResolved(partition string, resolved hlc.Timestamp) error { + sv.mu.Lock() + defer sv.mu.Unlock() + return sv.NoteResolved(partition, resolved) +} + +func (sv *streamClientValidator) failures() []string { + sv.mu.Lock() + defer sv.mu.Unlock() + return sv.Failures() +} + +func (sv *streamClientValidator) getValuesForKeyBelowTimestamp( + key string, timestamp hlc.Timestamp, +) ([]roachpb.KeyValue, error) { + sv.mu.Lock() + defer sv.mu.Unlock() + return sv.GetValuesForKeyBelowTimestamp(key, timestamp) +} diff --git a/pkg/sql/parser/parse_test.go b/pkg/sql/parser/parse_test.go index 8e0b644b50fa..02e9219a602a 100644 --- a/pkg/sql/parser/parse_test.go +++ b/pkg/sql/parser/parse_test.go @@ -1730,6 +1730,14 @@ func TestParse(t *testing.T) { {`RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar'`}, {`RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1`}, + // Currently, we only support TENANT as a target. We have grammar rules for + // all targets supported by RESTORE but these will error out during the + // planning phase. + {`RESTORE TABLE foo FROM REPLICATION STREAM FROM 'bar'`}, + {`RESTORE TABLE foo FROM REPLICATION STREAM FROM $1`}, + {`RESTORE DATABASE foodb FROM REPLICATION STREAM FROM 'baz'`}, + {`RESTORE DATABASE foodb FROM REPLICATION STREAM FROM $1`}, + {`BACKUP TABLE foo TO 'bar' WITH revision_history, detached`}, {`RESTORE TABLE foo FROM 'bar' WITH skip_missing_foreign_keys, skip_missing_sequences, detached`},