diff --git a/pkg/ccl/changefeedccl/cdctest/BUILD.bazel b/pkg/ccl/changefeedccl/cdctest/BUILD.bazel index f61ecd134167..fe6307fd9e57 100644 --- a/pkg/ccl/changefeedccl/cdctest/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdctest/BUILD.bazel @@ -12,6 +12,7 @@ go_library( deps = [ "//pkg/jobs", "//pkg/jobs/jobspb", + "//pkg/roachpb", "//pkg/sql", "//pkg/sql/parser", "//pkg/sql/sem/tree", diff --git a/pkg/ccl/changefeedccl/cdctest/validator.go b/pkg/ccl/changefeedccl/cdctest/validator.go index 8025e299f35d..77c7ed060a07 100644 --- a/pkg/ccl/changefeedccl/cdctest/validator.go +++ b/pkg/ccl/changefeedccl/cdctest/validator.go @@ -16,6 +16,7 @@ import ( "sort" "strings" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/errors" @@ -32,15 +33,83 @@ type Validator interface { Failures() []string } +// StreamClientValidatorWrapper wraps a Validator and exposes additional methods +// used by stream ingestion to check for correctness. +type StreamClientValidatorWrapper interface { + GetValuesForKeyBelowTimestamp(key string, timestamp hlc.Timestamp) ([]roachpb.KeyValue, error) + GetValidator() Validator +} + +type streamValidator struct { + Validator +} + +var _ StreamClientValidatorWrapper = &streamValidator{} + +// NewStreamClientValidatorWrapper returns a wrapped Validator, that can be used +// to validate the events emitted by the cluster to cluster streaming client. +// The wrapper currently only "wraps" an orderValidator, but can be built out +// to utilize other Validator's. +// The wrapper also allows querying the orderValidator to retrieve streamed +// events from an in-memory store. +func NewStreamClientValidatorWrapper() StreamClientValidatorWrapper { + ov := NewOrderValidator("unusedC2C") + return &streamValidator{ + ov, + } +} + +// GetValidator implements the StreamClientValidatorWrapper interface. +func (sv *streamValidator) GetValidator() Validator { + return sv.Validator +} + +// GetValuesForKeyBelowTimestamp implements the StreamClientValidatorWrapper +// interface. +// It returns the streamed KV updates for `key` with a ts less than equal to +// `timestamp`. +func (sv *streamValidator) GetValuesForKeyBelowTimestamp( + key string, timestamp hlc.Timestamp, +) ([]roachpb.KeyValue, error) { + orderValidator, ok := sv.GetValidator().(*orderValidator) + if !ok { + return nil, errors.Newf("unknown validator %T: ", sv.GetValidator()) + } + timestampValueTuples := orderValidator.keyTimestampAndValues[key] + timestampsIdx := sort.Search(len(timestampValueTuples), func(i int) bool { + return timestamp.Less(timestampValueTuples[i].ts) + }) + var kv []roachpb.KeyValue + for _, tsValue := range timestampValueTuples[:timestampsIdx] { + byteRep := []byte(key) + kv = append(kv, roachpb.KeyValue{ + Key: byteRep, + Value: roachpb.Value{ + RawBytes: []byte(tsValue.value), + Timestamp: tsValue.ts, + }, + }) + } + + return kv, nil +} + +type timestampValue struct { + ts hlc.Timestamp + value string +} + type orderValidator struct { - topic string - partitionForKey map[string]string - keyTimestamps map[string][]hlc.Timestamp - resolved map[string]hlc.Timestamp + topic string + partitionForKey map[string]string + keyTimestampAndValues map[string][]timestampValue + resolved map[string]hlc.Timestamp failures []string } +var _ Validator = &orderValidator{} + // NewOrderValidator returns a Validator that checks the row and resolved // timestamp ordering guarantees. It also asserts that keys have an affinity to // a single partition. @@ -52,17 +121,15 @@ type orderValidator struct { // lower update timestamp will be emitted on that partition. func NewOrderValidator(topic string) Validator { return &orderValidator{ - topic: topic, - partitionForKey: make(map[string]string), - keyTimestamps: make(map[string][]hlc.Timestamp), - resolved: make(map[string]hlc.Timestamp), + topic: topic, + partitionForKey: make(map[string]string), + keyTimestampAndValues: make(map[string][]timestampValue), + resolved: make(map[string]hlc.Timestamp), } } // NoteRow implements the Validator interface. -func (v *orderValidator) NoteRow( - partition string, key, ignoredValue string, updated hlc.Timestamp, -) error { +func (v *orderValidator) NoteRow(partition string, key, value string, updated hlc.Timestamp) error { if prev, ok := v.partitionForKey[key]; ok && prev != partition { v.failures = append(v.failures, fmt.Sprintf( `key [%s] received on two partitions: %s and %s`, key, prev, partition, @@ -71,17 +138,20 @@ func (v *orderValidator) NoteRow( } v.partitionForKey[key] = partition - timestamps := v.keyTimestamps[key] - timestampsIdx := sort.Search(len(timestamps), func(i int) bool { - return updated.LessEq(timestamps[i]) + timestampValueTuples := v.keyTimestampAndValues[key] + timestampsIdx := sort.Search(len(timestampValueTuples), func(i int) bool { + return updated.LessEq(timestampValueTuples[i].ts) }) - seen := timestampsIdx < len(timestamps) && timestamps[timestampsIdx] == updated + seen := timestampsIdx < len(timestampValueTuples) && + timestampValueTuples[timestampsIdx].ts == updated - if !seen && len(timestamps) > 0 && updated.Less(timestamps[len(timestamps)-1]) { + if !seen && len(timestampValueTuples) > 0 && + updated.Less(timestampValueTuples[len(timestampValueTuples)-1].ts) { v.failures = append(v.failures, fmt.Sprintf( `topic %s partition %s: saw new row timestamp %s after %s was seen`, v.topic, partition, - updated.AsOfSystemTime(), timestamps[len(timestamps)-1].AsOfSystemTime(), + updated.AsOfSystemTime(), + timestampValueTuples[len(timestampValueTuples)-1].ts.AsOfSystemTime(), )) } if !seen && updated.Less(v.resolved[partition]) { @@ -92,8 +162,12 @@ func (v *orderValidator) NoteRow( } if !seen { - v.keyTimestamps[key] = append( - append(timestamps[:timestampsIdx], updated), timestamps[timestampsIdx:]...) + v.keyTimestampAndValues[key] = append( + append(timestampValueTuples[:timestampsIdx], timestampValue{ + ts: updated, + value: value, + }), + timestampValueTuples[timestampsIdx:]...) } return nil } diff --git a/pkg/ccl/streamingccl/addresses.go b/pkg/ccl/streamingccl/addresses.go index 8153fd8d3f66..cb358eba22a0 100644 --- a/pkg/ccl/streamingccl/addresses.go +++ b/pkg/ccl/streamingccl/addresses.go @@ -25,6 +25,11 @@ func (sa StreamAddress) URL() (*url.URL, error) { // Each partition will emit events for a fixed span of keys. type PartitionAddress string +// URL parses the partition address as a URL. +func (pa PartitionAddress) URL() (*url.URL, error) { + return url.Parse(string(pa)) +} + // Topology is a configuration of stream partitions. These are particular to a // stream. It specifies the number and addresses of partitions of the stream. // diff --git a/pkg/ccl/streamingccl/event.go b/pkg/ccl/streamingccl/event.go index b326d61d1fd5..4afcfb8b833d 100644 --- a/pkg/ccl/streamingccl/event.go +++ b/pkg/ccl/streamingccl/event.go @@ -37,11 +37,15 @@ type Event interface { // CheckpointEvent. The resolved timestamp indicates that all KV events until // this time have been emitted. GetResolved() *hlc.Timestamp + // GetPartitionAddress returns the PartitionAddress of the partition from which the + // event was emitted from. + GetPartitionAddress() *PartitionAddress } // kvEvent is a key value pair that needs to be ingested. type kvEvent struct { - kv roachpb.KeyValue + kv roachpb.KeyValue + partitionAddress PartitionAddress } var _ Event = kvEvent{} @@ -61,10 +65,16 @@ func (kve kvEvent) GetResolved() *hlc.Timestamp { return nil } +// GetPartitionAddress implements the Event interface. +func (kve kvEvent) GetPartitionAddress() *PartitionAddress { + return &kve.partitionAddress +} + // checkpointEvent indicates that the stream has emitted every change for all // keys in the span it is responsible for up until this timestamp. type checkpointEvent struct { resolvedTimestamp hlc.Timestamp + partitionAddress PartitionAddress } var _ Event = checkpointEvent{} @@ -84,12 +94,17 @@ func (ce checkpointEvent) GetResolved() *hlc.Timestamp { return &ce.resolvedTimestamp } +// GetPartitionAddress implements the Event interface. +func (ce checkpointEvent) GetPartitionAddress() *PartitionAddress { + return &ce.partitionAddress +} + // MakeKVEvent creates an Event from a KV. -func MakeKVEvent(kv roachpb.KeyValue) Event { - return kvEvent{kv: kv} +func MakeKVEvent(kv roachpb.KeyValue, address PartitionAddress) Event { + return kvEvent{kv: kv, partitionAddress: address} } // MakeCheckpointEvent creates an Event from a resolved timestamp. -func MakeCheckpointEvent(resolvedTimestamp hlc.Timestamp) Event { - return checkpointEvent{resolvedTimestamp: resolvedTimestamp} +func MakeCheckpointEvent(resolvedTimestamp hlc.Timestamp, address PartitionAddress) Event { + return checkpointEvent{resolvedTimestamp: resolvedTimestamp, partitionAddress: address} } diff --git a/pkg/ccl/streamingccl/streamclient/BUILD.bazel b/pkg/ccl/streamingccl/streamclient/BUILD.bazel index 33b33eae133a..fbf260b75c24 100644 --- a/pkg/ccl/streamingccl/streamclient/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamclient/BUILD.bazel @@ -14,12 +14,14 @@ go_library( "//pkg/keys", "//pkg/roachpb", "//pkg/sql", + "//pkg/sql/catalog/catalogkeys", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/systemschema", "//pkg/sql/catalog/tabledesc", "//pkg/sql/rowenc", "//pkg/sql/sem/tree", "//pkg/util/hlc", + "//pkg/util/randutil", "//pkg/util/syncutil", "//pkg/util/timeutil", ], diff --git a/pkg/ccl/streamingccl/streamclient/client_test.go b/pkg/ccl/streamingccl/streamclient/client_test.go index 4aea27ee9979..bdde34c67c1c 100644 --- a/pkg/ccl/streamingccl/streamclient/client_test.go +++ b/pkg/ccl/streamingccl/streamclient/client_test.go @@ -38,7 +38,7 @@ func (sc testStreamClient) GetTopology( // ConsumePartition implements the Client interface. func (sc testStreamClient) ConsumePartition( - _ context.Context, _ streamingccl.PartitionAddress, _ time.Time, + _ context.Context, pa streamingccl.PartitionAddress, _ time.Time, ) (chan streamingccl.Event, error) { sampleKV := roachpb.KeyValue{ Key: []byte("key_1"), @@ -49,8 +49,8 @@ func (sc testStreamClient) ConsumePartition( } events := make(chan streamingccl.Event, 2) - events <- streamingccl.MakeKVEvent(sampleKV) - events <- streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 100}) + events <- streamingccl.MakeKVEvent(sampleKV, pa) + events <- streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 100}, pa) close(events) return events, nil diff --git a/pkg/ccl/streamingccl/streamclient/random_stream_client.go b/pkg/ccl/streamingccl/streamclient/random_stream_client.go index 61e7a5b34bdb..bf9341396804 100644 --- a/pkg/ccl/streamingccl/streamclient/random_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/random_stream_client.go @@ -10,6 +10,7 @@ package streamclient import ( "context" + "fmt" "math/rand" "net/url" "strconv" @@ -19,20 +20,22 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) const ( - // RandomStreamSchema is the schema of the KVs emitted by the random stream - // client. - RandomStreamSchema = "CREATE TABLE test (k INT PRIMARY KEY, v INT)" + // RandomStreamSchemaPlaceholder is the schema of the KVs emitted by the + // random stream client. + RandomStreamSchemaPlaceholder = "CREATE TABLE %s (k INT PRIMARY KEY, v INT)" // TestScheme is the URI scheme used to create a test load. TestScheme = "test" @@ -45,14 +48,37 @@ const ( // KVsPerCheckpoint controls approximately how many KV events should be emitted // between checkpoint events. KVsPerCheckpoint = "KVS_PER_CHECKPOINT" + // NumPartitions controls the number of partitions the client will stream data + // back on. Each partition will encompass a single table span. + NumPartitions = "NUM_PARTITIONS" + // DupProbability controls the probability with which we emit duplicate KV + // events. + DupProbability = "DUP_PROBABILITY" + // IngestionDatabaseID is the ID used in the generated table descriptor. + IngestionDatabaseID = 50 /* defaultDB */ + // IngestionTablePrefix is the prefix of the table name used in the generated + // table descriptor. + IngestionTablePrefix = "foo" ) +type interceptFn func(event streamingccl.Event) + +// InterceptableStreamClient wraps a Client, and provides a method to register +// interceptor methods that are run on every streamed Event. +type InterceptableStreamClient interface { + Client + + RegisterInterception(fn interceptFn) +} + // randomStreamConfig specifies the variables that controls the rate and type of // events that the generated stream emits. type randomStreamConfig struct { valueRange int kvFrequency time.Duration kvsPerCheckpoint int + numPartitions int + dupProbability float64 } func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) { @@ -60,6 +86,8 @@ func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) { valueRange: 100, kvFrequency: 10 * time.Microsecond, kvsPerCheckpoint: 100, + numPartitions: 1, + dupProbability: 0.5, } var err error @@ -85,21 +113,33 @@ func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) { } } + if numPartitionsStr := streamURL.Query().Get(NumPartitions); numPartitionsStr != "" { + c.numPartitions, err = strconv.Atoi(numPartitionsStr) + if err != nil { + return c, err + } + } + + if dupProbStr := streamURL.Query().Get(DupProbability); dupProbStr != "" { + c.dupProbability, err = strconv.ParseFloat(dupProbStr, 32) + if err != nil { + return c, err + } + } return c, nil } // randomStreamClient is a temporary stream client implementation that generates // random events. // -// It expects a table with the schema `RandomStreamSchema` to already exist, -// with table ID `` to be used in the URI. Opening the stream client -// on the URI 'test://' will generate random events into this table. +// The client can be configured to return more than one partition via the stream +// URL. Each partition covers a single table span. // // TODO: Move this over to a _test file in the ingestion package when there is a // real stream client implementation. type randomStreamClient struct { - baseDesc *tabledesc.Mutable - config randomStreamConfig + tableIDToDesc map[descpb.ID]*tabledesc.Mutable + config randomStreamConfig // interceptors can be registered to peek at every event generated by this // client. @@ -111,55 +151,116 @@ type randomStreamClient struct { } var _ Client = &randomStreamClient{} +var _ InterceptableStreamClient = &randomStreamClient{} // newRandomStreamClient returns a stream client that generates a random set of // events on a table with an integer key and integer value for the table with // the given ID. func newRandomStreamClient(streamURL *url.URL) (Client, error) { - tableID, err := strconv.Atoi(streamURL.Host) - if err != nil { - return nil, err - } - testTable, err := sql.CreateTestTableDescriptor( - context.Background(), - 50, /* defaultdb */ - descpb.ID(tableID), - RandomStreamSchema, - systemschema.JobsTable.Privileges, - ) - if err != nil { - return nil, err - } - streamConfig, err := parseRandomStreamConfig(streamURL) if err != nil { return nil, err } return &randomStreamClient{ - baseDesc: testTable, - config: streamConfig, + config: streamConfig, }, nil } +var testTableID = 52 + +func getNextTableID() int { + ret := testTableID + testTableID++ + return ret +} + // GetTopology implements the Client interface. func (m *randomStreamClient) GetTopology( _ streamingccl.StreamAddress, ) (streamingccl.Topology, error) { - panic("not yet implemented") + topology := streamingccl.Topology{Partitions: make([]streamingccl.PartitionAddress, + 0, m.config.numPartitions)} + + // Allocate table IDs and return one per partition address in the topology. + for i := 0; i < m.config.numPartitions; i++ { + tableID := descpb.ID(getNextTableID()) + partitionURI := url.URL{ + Scheme: TestScheme, + Host: strconv.Itoa(int(tableID)), + } + topology.Partitions = append(topology.Partitions, + streamingccl.PartitionAddress(partitionURI.String())) + } + + return topology, nil +} + +// getDescriptorAndNamespaceKVForTableID returns the namespace and descriptor +// KVs for the table with tableID. +func (m *randomStreamClient) getDescriptorAndNamespaceKVForTableID( + tableID descpb.ID, +) (*tabledesc.Mutable, []roachpb.KeyValue, error) { + tableName := fmt.Sprintf("%s%d", IngestionTablePrefix, tableID) + testTable, err := sql.CreateTestTableDescriptor( + context.Background(), + IngestionDatabaseID, + tableID, + fmt.Sprintf(RandomStreamSchemaPlaceholder, tableName), + systemschema.JobsTable.Privileges, + ) + if err != nil { + return nil, nil, err + } + + //// Generate namespace entry. + key := catalogkeys.NewTableKey(50, keys.PublicSchemaID, testTable.Name) + var value roachpb.Value + value.SetInt(int64(testTable.GetID())) + namespaceKV := roachpb.KeyValue{ + Key: key.Key(keys.TODOSQLCodec), + Value: value, + } + + // Generate descriptor entry. + descKey := catalogkeys.MakeDescMetadataKey(keys.TODOSQLCodec, testTable.GetID()) + descDesc := testTable.DescriptorProto() + var descValue roachpb.Value + if err := descValue.SetProto(descDesc); err != nil { + panic(err) + } + descKV := roachpb.KeyValue{ + Key: descKey, + Value: descValue, + } + + return testTable, []roachpb.KeyValue{namespaceKV, descKV}, nil } // ConsumePartition implements the Client interface. func (m *randomStreamClient) ConsumePartition( - ctx context.Context, _ streamingccl.PartitionAddress, startTime time.Time, + ctx context.Context, partitionAddress streamingccl.PartitionAddress, startTime time.Time, ) (chan streamingccl.Event, error) { eventCh := make(chan streamingccl.Event) now := timeutil.Now() if startTime.After(now) { panic("cannot start random stream client event stream in the future") } - lastResolvedTime := startTime + partitionURL, err := partitionAddress.URL() + if err != nil { + return nil, err + } + var partitionTableID int + partitionTableID, err = strconv.Atoi(partitionURL.Host) + if err != nil { + return nil, err + } + + tableDesc, systemKVs, err := m.getDescriptorAndNamespaceKVForTableID(descpb.ID(partitionTableID)) + if err != nil { + return nil, err + } go func() { defer close(eventCh) @@ -176,24 +277,40 @@ func (m *randomStreamClient) ConsumePartition( resolvedTimer.Reset(0) defer resolvedTimer.Stop() + rng, _ := randutil.NewPseudoRand() + var dupKVEvent streamingccl.Event + for { var event streamingccl.Event select { case <-kvTimer.C: kvTimer.Read = true - event = streamingccl.MakeKVEvent(m.makeRandomKey(r, lastResolvedTime)) + // If there are system KVs to emit, prioritize those. + if len(systemKVs) > 0 { + systemKV := systemKVs[0] + systemKV.Value.Timestamp = hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + event = streamingccl.MakeKVEvent(systemKV, partitionAddress) + systemKVs = systemKVs[1:] + } else { + // Generate a duplicate KVEvent, and update its timestamp to now(). + if rng.Float64() < m.config.dupProbability && dupKVEvent != nil { + dupKV := dupKVEvent.GetKV() + dupKV.Value.Timestamp = hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + event = streamingccl.MakeKVEvent(*dupKV, partitionAddress) + } else { + event = streamingccl.MakeKVEvent(m.makeRandomKey(r, tableDesc), partitionAddress) + dupKVEvent = event + } + } kvTimer.Reset(kvInterval) case <-resolvedTimer.C: resolvedTimer.Read = true resolvedTime := timeutil.Now() hlcResolvedTime := hlc.Timestamp{WallTime: resolvedTime.UnixNano()} - event = streamingccl.MakeCheckpointEvent(hlcResolvedTime) - lastResolvedTime = resolvedTime + event = streamingccl.MakeCheckpointEvent(hlcResolvedTime, partitionAddress) resolvedTimer.Reset(resolvedInterval) } - // TODO: Consider keeping an in-memory copy so that tests can verify - // that the data we've ingested is correct. select { case eventCh <- event: case <-ctx.Done(): @@ -215,9 +332,9 @@ func (m *randomStreamClient) ConsumePartition( return eventCh, nil } -func (m *randomStreamClient) makeRandomKey(r *rand.Rand, minTs time.Time) roachpb.KeyValue { - tableDesc := m.baseDesc - +func (m *randomStreamClient) makeRandomKey( + r *rand.Rand, tableDesc *tabledesc.Mutable, +) roachpb.KeyValue { // Create a key holding a random integer. k, err := rowenc.TestingMakePrimaryIndexKey(tableDesc, r.Intn(m.config.valueRange)) if err != nil { @@ -237,10 +354,7 @@ func (m *randomStreamClient) makeRandomKey(r *rand.Rand, minTs time.Time) roachp v.ClearChecksum() v.InitChecksum(k) - // Generate a timestamp between minTs and now(). - randOffset := int(timeutil.Now().UnixNano()) - int(minTs.UnixNano()) - newTimestamp := rand.Intn(randOffset) + int(minTs.UnixNano()) - v.Timestamp = hlc.Timestamp{WallTime: int64(newTimestamp)} + v.Timestamp = hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} return roachpb.KeyValue{ Key: k, @@ -249,8 +363,8 @@ func (m *randomStreamClient) makeRandomKey(r *rand.Rand, minTs time.Time) roachp } // RegisterInterception implements streamingest.interceptableStreamClient. -func (m *randomStreamClient) RegisterInterception(f func(event streamingccl.Event)) { +func (m *randomStreamClient) RegisterInterception(fn interceptFn) { m.mu.Lock() defer m.mu.Unlock() - m.mu.interceptors = append(m.mu.interceptors, f) + m.mu.interceptors = append(m.mu.interceptors, fn) } diff --git a/pkg/ccl/streamingccl/streamingest/BUILD.bazel b/pkg/ccl/streamingccl/streamingest/BUILD.bazel index 10e641f42fd3..ff47cc0255e9 100644 --- a/pkg/ccl/streamingccl/streamingest/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamingest/BUILD.bazel @@ -56,6 +56,7 @@ go_test( embed = [":streamingest"], deps = [ "//pkg/base", + "//pkg/ccl/changefeedccl/cdctest", "//pkg/ccl/storageccl", "//pkg/ccl/streamingccl", "//pkg/ccl/streamingccl/streamclient", @@ -69,9 +70,11 @@ go_test( "//pkg/security/securitytest", "//pkg/server", "//pkg/settings/cluster", + "//pkg/sql", "//pkg/sql/execinfra", "//pkg/sql/execinfrapb", "//pkg/sql/sem/tree", + "//pkg/storage", "//pkg/testutils", "//pkg/testutils/distsqlutils", "//pkg/testutils/serverutils", diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go index 7eda9458777d..f05e3c7b83a8 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go @@ -63,8 +63,8 @@ func TestStreamIngestionFrontierProcessor(t *testing.T) { post := execinfrapb.PostProcessSpec{} var spec execinfrapb.StreamIngestionDataSpec - pa1 := streamingccl.PartitionAddress("s3://my_streams/stream/partition1") - pa2 := streamingccl.PartitionAddress("s3://my_streams/stream/partition2") + pa1 := streamingccl.PartitionAddress("partition1") + pa2 := streamingccl.PartitionAddress("partition2") v := roachpb.MakeValueFromString("value_1") v.Timestamp = hlc.Timestamp{WallTime: 1} @@ -79,11 +79,11 @@ func TestStreamIngestionFrontierProcessor(t *testing.T) { { name: "same-resolved-ts-across-partitions", events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}), - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}, pa1), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}, pa1), }, pa2: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}), - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}, pa2), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}, pa2), }}, expectedFrontierTimestamp: hlc.Timestamp{WallTime: 4}, }, @@ -92,9 +92,9 @@ func TestStreamIngestionFrontierProcessor(t *testing.T) { // emitted a resolved ts. name: "no-checkpoints", events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeKVEvent(sampleKV), + streamingccl.MakeKVEvent(sampleKV, pa1), }, pa2: []streamingccl.Event{ - streamingccl.MakeKVEvent(sampleKV), + streamingccl.MakeKVEvent(sampleKV, pa2), }}, }, { @@ -102,39 +102,39 @@ func TestStreamIngestionFrontierProcessor(t *testing.T) { // emitted a resolved ts. name: "no-checkpoint-from-one-partition", events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}), - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}, pa1), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}, pa1), }, pa2: []streamingccl.Event{}}, }, { name: "one-partition-ahead-of-the-other", events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}), - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}, pa1), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}, pa1), }, pa2: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}, pa2), }}, expectedFrontierTimestamp: hlc.Timestamp{WallTime: 1}, }, { name: "some-interleaved-timestamps", events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 2}), - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 2}, pa1), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}, pa1), }, pa2: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 3}), - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 5}), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 3}, pa2), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 5}, pa2), }}, expectedFrontierTimestamp: hlc.Timestamp{WallTime: 4}, }, { name: "some-interleaved-logical-timestamps", events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1, Logical: 2}), - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1, Logical: 4}), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1, Logical: 2}, pa1), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1, Logical: 4}, pa1), }, pa2: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1, Logical: 1}), - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 2}), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1, Logical: 1}, pa2), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 2}, pa2), }}, expectedFrontierTimestamp: hlc.Timestamp{WallTime: 1, Logical: 4}, }, @@ -143,9 +143,9 @@ func TestStreamIngestionFrontierProcessor(t *testing.T) { // lower than its start time. name: "checkpoint-lower-than-start-ts", events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1, Logical: 4}), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1, Logical: 4}, pa1), }, pa2: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1, Logical: 2}), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1, Logical: 2}, pa2), }}, frontierStartTime: hlc.Timestamp{WallTime: 1, Logical: 3}, }, diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index 05b5786100c9..bb312222a296 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv/bulk" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" @@ -66,14 +67,7 @@ type streamIngestionProcessor struct { ingestionErr error // eventCh is the merged event channel of all of the partition event streams. - eventCh chan partitionEvent -} - -// partitionEvent augments a normal event with the partition it came from. -type partitionEvent struct { - streamingccl.Event - - partition streamingccl.PartitionAddress + eventCh chan streamingccl.Event } var _ execinfra.Processor = &streamIngestionProcessor{} @@ -93,6 +87,20 @@ func newStreamIngestionDataProcessor( return nil, err } + // Check if there are any interceptor methods that need to be registered with + // the stream client. + // These methods are invoked on every emitted Event. + if knobs, ok := flowCtx.Cfg.TestingKnobs.StreamIngestionTestingKnobs.(*sql. + StreamIngestionTestingKnobs); ok { + if knobs.Interceptors != nil { + if interceptable, ok := streamClient.(streamclient.InterceptableStreamClient); ok { + for _, interceptor := range knobs.Interceptors { + interceptable.RegisterInterception(interceptor) + } + } + } + } + sip := &streamIngestionProcessor{ flowCtx: flowCtx, spec: spec, @@ -211,8 +219,8 @@ func (sip *streamIngestionProcessor) flush() error { // channel. func merge( ctx context.Context, partitionStreams map[streamingccl.PartitionAddress]chan streamingccl.Event, -) chan partitionEvent { - merged := make(chan partitionEvent) +) chan streamingccl.Event { + merged := make(chan streamingccl.Event) var wg sync.WaitGroup wg.Add(len(partitionStreams)) @@ -221,13 +229,8 @@ func merge( go func(partition streamingccl.PartitionAddress, eventCh <-chan streamingccl.Event) { defer wg.Done() for event := range eventCh { - pe := partitionEvent{ - Event: event, - partition: partition, - } - select { - case merged <- pe: + case merged <- event: case <-ctx.Done(): // TODO: Add ctx.Err() to an error channel once ConsumePartition // supports an error ch. @@ -283,7 +286,7 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpan, erro // Each partition is represented by a span defined by the // partition address. - spanStartKey := roachpb.Key(event.partition) + spanStartKey := roachpb.Key(*event.GetPartitionAddress()) return &jobspb.ResolvedSpan{ Span: roachpb.Span{Key: spanStartKey, EndKey: spanStartKey.Next()}, Timestamp: resolvedTime, diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go index 314d0aaa942d..1d07f011c7f8 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go @@ -16,15 +16,19 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -38,12 +42,6 @@ import ( "github.com/stretchr/testify/require" ) -type interceptableStreamClient interface { - streamclient.Client - - RegisterInterception(func(event streamingccl.Event)) -} - // mockStreamClient will return the slice of events associated to the stream // partition being consumed. Stream partitions are identified by unique // partition addresses. @@ -93,17 +91,17 @@ func TestStreamIngestionProcessor(t *testing.T) { defer tc.Stopper().Stop(ctx) kvDB := tc.Server(0).DB() - // Inject a mock client. v := roachpb.MakeValueFromString("value_1") v.Timestamp = hlc.Timestamp{WallTime: 1} sampleKV := roachpb.KeyValue{Key: roachpb.Key("key_1"), Value: v} + unusedPartitionAddress := streamingccl.PartitionAddress("") events := []streamingccl.Event{ - streamingccl.MakeKVEvent(sampleKV), - streamingccl.MakeKVEvent(sampleKV), - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}), - streamingccl.MakeKVEvent(sampleKV), - streamingccl.MakeKVEvent(sampleKV), - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}), + streamingccl.MakeKVEvent(sampleKV, unusedPartitionAddress), + streamingccl.MakeKVEvent(sampleKV, unusedPartitionAddress), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}, unusedPartitionAddress), + streamingccl.MakeKVEvent(sampleKV, unusedPartitionAddress), + streamingccl.MakeKVEvent(sampleKV, unusedPartitionAddress), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}, unusedPartitionAddress), } pa1 := streamingccl.PartitionAddress("partition1") pa2 := streamingccl.PartitionAddress("partition2") @@ -112,8 +110,9 @@ func TestStreamIngestionProcessor(t *testing.T) { } startTime := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} - out, err := runStreamIngestionProcessor(ctx, t, kvDB, "some://stream", startTime, - nil /* interceptors */, mockClient) + partitionAddresses := []streamingccl.PartitionAddress{"partition1", "partition2"} + out, err := runStreamIngestionProcessor(ctx, t, kvDB, "some://stream", partitionAddresses, + startTime, nil /* interceptEvents */, mockClient) require.NoError(t, err) // Compare the set of results since the ordering is not guaranteed. @@ -142,6 +141,90 @@ func TestStreamIngestionProcessor(t *testing.T) { require.Equal(t, expectedRows, actualRows) } +func getPartitionSpanToTableID( + t *testing.T, partitionAddresses []streamingccl.PartitionAddress, +) map[string]int { + pSpanToTableID := make(map[string]int) + + // Aggregate the table IDs which should have been ingested. + for _, pa := range partitionAddresses { + pKey := roachpb.Key(pa) + pSpan := roachpb.Span{pKey, pKey.Next()} + paURL, err := pa.URL() + require.NoError(t, err) + id, err := strconv.Atoi(paURL.Host) + require.NoError(t, err) + pSpanToTableID[pSpan.String()] = id + } + return pSpanToTableID +} + +// assertEqualKVs iterates over the store in `tc` and compares the MVCC KVs +// against the in-memory copy of events stored in the `streamValidator`. This +// ensures that the stream ingestion processor ingested at least as much data as +// was streamed up until partitionTimestamp. +func assertEqualKVs( + t *testing.T, + tc *testcluster.TestCluster, + streamValidator cdctest.StreamClientValidatorWrapper, + tableID int, + partitionTimestamp hlc.Timestamp, +) { + key := keys.TODOSQLCodec.TablePrefix(uint32(tableID)) + + // Iterate over the store. + store := tc.GetFirstStoreFromServer(t, 0) + it := store.Engine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ + LowerBound: key, + UpperBound: key.PrefixEnd(), + }) + defer it.Close() + var prevKey roachpb.Key + var valueTimestampTuples []roachpb.KeyValue + var err error + for it.SeekGE(storage.MVCCKey{}); ; it.Next() { + if ok, err := it.Valid(); !ok { + if err != nil { + t.Fatal(err) + } + break + } + + // We only want to process MVCC KVs with a ts less than or equal to the max + // resolved ts for this partition. + if partitionTimestamp.Less(it.Key().Timestamp) { + continue + } + + newKey := (prevKey != nil && !it.Key().Key.Equal(prevKey)) || prevKey == nil + prevKey = it.Key().Key + + if newKey { + // All value ts should have been drained at this point, otherwise there is + // a mismatch between the streamed and ingested data. + require.Equal(t, 0, len(valueTimestampTuples)) + valueTimestampTuples, err = streamValidator.GetValuesForKeyBelowTimestamp( + string(it.Key().Key), partitionTimestamp) + require.NoError(t, err) + } + + require.Greater(t, len(valueTimestampTuples), 0) + // Since the iterator goes from latest to older versions, we compare + // starting from the end of the slice that is sorted by timestamp. + latestVersionInChain := valueTimestampTuples[len(valueTimestampTuples)-1] + require.Equal(t, roachpb.KeyValue{ + Key: it.Key().Key, + Value: roachpb.Value{ + RawBytes: it.Value(), + Timestamp: it.Key().Timestamp, + }, + }, latestVersionInChain) + // Truncate the latest version which we just checked against in preparation + // for the next iteration. + valueTimestampTuples = valueTimestampTuples[0 : len(valueTimestampTuples)-1] + } +} + // TestRandomClientGeneration tests the ingestion processor against a random // stream workload. func TestRandomClientGeneration(t *testing.T) { @@ -151,13 +234,14 @@ func TestRandomClientGeneration(t *testing.T) { ctx := context.Background() makeTestStreamURI := func( - tableID string, - valueRange, kvsPerResolved int, - kvFrequency time.Duration, + valueRange, kvsPerResolved, numPartitions int, + kvFrequency time.Duration, dupProbability float64, ) string { - return "test://" + tableID + "?VALUE_RANGE=" + strconv.Itoa(valueRange) + + return "test:///" + "?VALUE_RANGE=" + strconv.Itoa(valueRange) + "&KV_FREQUENCY=" + strconv.Itoa(int(kvFrequency)) + - "&KVS_PER_RESOLVED=" + strconv.Itoa(kvsPerResolved) + "&KVS_PER_CHECKPOINT=" + strconv.Itoa(kvsPerResolved) + + "&NUM_PARTITIONS=" + strconv.Itoa(numPartitions) + + "&DUP_PROBABILITY=" + strconv.FormatFloat(dupProbability, 'f', -1, 32) } tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{}) @@ -166,30 +250,37 @@ func TestRandomClientGeneration(t *testing.T) { conn := tc.Conns[0] sqlDB := sqlutils.MakeSQLRunner(conn) - // Create the expected table for the random stream to ingest into. - sqlDB.Exec(t, streamclient.RandomStreamSchema) - tableID := sqlDB.QueryStr(t, `SELECT id FROM system.namespace WHERE name = 'test'`)[0][0] - // TODO: Consider testing variations on these parameters. valueRange := 100 kvsPerResolved := 1_000 kvFrequency := 50 * time.Nanosecond - streamAddr := makeTestStreamURI(tableID, valueRange, kvsPerResolved, kvFrequency) + numPartitions := 4 + dupProbability := 0.2 + streamAddr := makeTestStreamURI(valueRange, kvsPerResolved, numPartitions, kvFrequency, + dupProbability) + + // The random client returns system and table data partitions. + streamClient, err := streamclient.NewStreamClient(streamingccl.StreamAddress(streamAddr)) + require.NoError(t, err) + topo, err := streamClient.GetTopology(streamingccl.StreamAddress(streamAddr)) + require.NoError(t, err) + // One system and two table data partitions. + require.Equal(t, numPartitions, len(topo.Partitions)) startTime := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} ctx, cancel := context.WithCancel(ctx) // Cancel the flow after emitting 1000 checkpoint events from the client. - cancelAfterCheckpoints := makeCheckpointEventCounter(1_000, cancel) - out, err := runStreamIngestionProcessor(ctx, t, kvDB, streamAddr, startTime, - cancelAfterCheckpoints, nil /* mockClient */) + cancelAfterCheckpoints := makeCheckpointEventCounter(1000, cancel) + streamValidator := cdctest.NewStreamClientValidatorWrapper() + validator := registerValidator(streamValidator.GetValidator()) + out, err := runStreamIngestionProcessor(ctx, t, kvDB, streamAddr, topo.Partitions, + startTime, []func(streamingccl.Event){cancelAfterCheckpoints, validator}, nil /* mockClient */) require.NoError(t, err) - p1Key := roachpb.Key("partition1") - p2Key := roachpb.Key("partition2") - p1Span := roachpb.Span{Key: p1Key, EndKey: p1Key.Next()} - p2Span := roachpb.Span{Key: p2Key, EndKey: p2Key.Next()} + partitionSpanToTableID := getPartitionSpanToTableID(t, topo.Partitions) numResolvedEvents := 0 + maxResolvedTimestampPerPartition := make(map[string]hlc.Timestamp) for { row, meta := out.Next() if meta != nil { @@ -209,20 +300,38 @@ func TestRandomClientGeneration(t *testing.T) { var resolvedSpan jobspb.ResolvedSpan require.NoError(t, protoutil.Unmarshal([]byte(*protoBytes), &resolvedSpan)) - if resolvedSpan.Span.String() != p1Span.String() && resolvedSpan.Span.String() != p2Span.String() { - t.Fatalf("expected resolved span %v to be either %v or %v", resolvedSpan.Span, p1Span, p2Span) + if _, ok := partitionSpanToTableID[resolvedSpan.Span.String()]; !ok { + t.Fatalf("expected resolved span %v to be either in one of the supplied partition"+ + " addresses %v", resolvedSpan.Span, topo.Partitions) } // All resolved timestamp events should be greater than the start time. require.Less(t, startTime.WallTime, resolvedSpan.Timestamp.WallTime) + + // Track the max resolved timestamp per partition. + if ts, ok := maxResolvedTimestampPerPartition[resolvedSpan.Span.String()]; !ok || + ts.Less(resolvedSpan.Timestamp) { + maxResolvedTimestampPerPartition[resolvedSpan.Span.String()] = resolvedSpan.Timestamp + } numResolvedEvents++ } - // Check that some rows have been ingested and that we've emitted some resolved events. - numRows, err := strconv.Atoi(sqlDB.QueryStr(t, `SELECT count(*) FROM defaultdb.test`)[0][0]) - require.NoError(t, err) - require.Greater(t, numRows, 0, "at least 1 row ingested expected") + // Ensure that no errors were reported to the validator. + for _, failure := range streamValidator.GetValidator().Failures() { + t.Error(failure) + } + + for pSpan, id := range partitionSpanToTableID { + numRows, err := strconv.Atoi(sqlDB.QueryStr(t, fmt.Sprintf( + `SELECT count(*) FROM defaultdb.%s%d`, streamclient.IngestionTablePrefix, id))[0][0]) + require.NoError(t, err) + require.Greater(t, numRows, 0, "at least 1 row ingested expected") + // Scan the store for KVs ingested by this partition, and compare the MVCC + // KVs against the KVEvents streamed up to the max ingested timestamp for + // the partition. + assertEqualKVs(t, tc, streamValidator, id, maxResolvedTimestampPerPartition[pSpan]) + } require.Greater(t, numResolvedEvents, 0, "at least 1 resolved event expected") } @@ -231,8 +340,9 @@ func runStreamIngestionProcessor( t *testing.T, kvDB *kv.DB, streamAddr string, + partitionAddresses []streamingccl.PartitionAddress, startTime hlc.Timestamp, - interceptEvents func(streamingccl.Event), + interceptEvents []func(streamingccl.Event), mockClient streamclient.Client, ) (*distsqlutils.RowBuffer, error) { st := cluster.MakeTestingClusterSettings() @@ -249,6 +359,8 @@ func runStreamIngestionProcessor( }, EvalCtx: &evalCtx, } + flowCtx.Cfg.TestingKnobs.StreamIngestionTestingKnobs = &sql.StreamIngestionTestingKnobs{ + Interceptors: interceptEvents} out := &distsqlutils.RowBuffer{} post := execinfrapb.PostProcessSpec{} @@ -256,7 +368,7 @@ func runStreamIngestionProcessor( var spec execinfrapb.StreamIngestionDataSpec spec.StreamAddress = streamingccl.StreamAddress(streamAddr) - spec.PartitionAddresses = []streamingccl.PartitionAddress{"partition1", "partition2"} + spec.PartitionAddresses = partitionAddresses spec.StartTime = startTime processorID := int32(0) proc, err := newStreamIngestionDataProcessor(&flowCtx, processorID, spec, &post, out) @@ -270,15 +382,6 @@ func runStreamIngestionProcessor( sip.client = mockClient } - if interceptableClient, ok := sip.client.(interceptableStreamClient); ok { - interceptableClient.RegisterInterception(interceptEvents) - // TODO: Inject an interceptor here that keeps track of generated events so - // we can compare. - } else if interceptEvents != nil { - t.Fatalf("interceptor specified, but client %T does not implement interceptableStreamClient", - sip.client) - } - sip.Run(ctx) // Ensure that all the outputs are properly closed. @@ -288,6 +391,29 @@ func runStreamIngestionProcessor( return out, err } +func registerValidator(validator cdctest.Validator) func(event streamingccl.Event) { + return func(event streamingccl.Event) { + switch event.Type() { + case streamingccl.CheckpointEvent: + pa := *event.GetPartitionAddress() + resolvedTS := *event.GetResolved() + err := validator.NoteResolved(string(pa), resolvedTS) + if err != nil { + panic(err.Error()) + } + case streamingccl.KVEvent: + pa := *event.GetPartitionAddress() + kv := *event.GetKV() + + err := validator.NoteRow(string(pa), string(kv.Key), string(kv.Value.RawBytes), + kv.Value.Timestamp) + if err != nil { + panic(err.Error()) + } + } + } +} + // makeCheckpointEventCounter runs f after seeing `threshold` number of // checkpoint events. func makeCheckpointEventCounter(threshold int, f func()) func(streamingccl.Event) { diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 2c5d82cc2a77..234abad345a6 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -225,6 +225,7 @@ go_library( deps = [ "//pkg/base", "//pkg/build", + "//pkg/ccl/streamingccl", "//pkg/clusterversion", "//pkg/config", "//pkg/config/zonepb", diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 20bf1e0412a3..aa234da8b6de 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/apd/v2" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/config/zonepb" @@ -958,6 +959,16 @@ var _ base.ModuleTestingKnobs = &BackupRestoreTestingKnobs{} // ModuleTestingKnobs implements the base.ModuleTestingKnobs interface. func (*BackupRestoreTestingKnobs) ModuleTestingKnobs() {} +// StreamIngestionTestingKnobs contains knobs for stream ingestion behavior. +type StreamIngestionTestingKnobs struct { + Interceptors []func(event streamingccl.Event) +} + +var _ base.ModuleTestingKnobs = &StreamIngestionTestingKnobs{} + +// ModuleTestingKnobs implements the base.ModuleTestingKnobs interface. +func (*StreamIngestionTestingKnobs) ModuleTestingKnobs() {} + func shouldDistributeGivenRecAndMode( rec distRecommendation, mode sessiondata.DistSQLExecMode, ) bool { diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index c5b840724e71..152787ad40ab 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -239,6 +239,9 @@ type TestingKnobs struct { // BackupRestoreTestingKnobs are backup and restore specific testing knobs. BackupRestoreTestingKnobs base.ModuleTestingKnobs + + // BackupRestoreTestingKnobs are stream ingestion specific testing knobs. + StreamIngestionTestingKnobs base.ModuleTestingKnobs } // MetadataTestLevel represents the types of queries where metadata test