From a2e1ef84471f7a1a3956a8f51fc22403aa05d42b Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Wed, 20 Jan 2021 19:18:33 -0500 Subject: [PATCH] streamingccl: improvements to the random stream test client This change improves on the random stream client to allow for better testing of the various components of the stream ingestion job. Specifically: - Adds support for specifying number of partitions. For simplicity, a partition generates KVs for a particular table span. - Generates system KVs (descriptor and namespace) KVs, as the first two KVs on the partition stream. I played around with the idea of having a separate "system" and "table data" partition, but the code and tests became more convoluted, compared to the current approach. - Hookup the CDC orderValidator to the random stream client's output. This gives us some guarantees that the data being generated is semantically correct. - Maintain an in-memory copy of all the streamed events, that can be efficiently queried. This allows us to compare the ingested KVs to the streamed KVs and gain more confidence in our pipeline. Release note: None --- pkg/ccl/changefeedccl/cdctest/BUILD.bazel | 1 + pkg/ccl/changefeedccl/cdctest/validator.go | 112 +++++++-- pkg/ccl/streamingccl/addresses.go | 5 + pkg/ccl/streamingccl/event.go | 25 +- pkg/ccl/streamingccl/streamclient/BUILD.bazel | 2 + .../streamingccl/streamclient/client_test.go | 6 +- .../streamclient/random_stream_client.go | 198 ++++++++++++---- pkg/ccl/streamingccl/streamingest/BUILD.bazel | 3 + ...tream_ingestion_frontier_processor_test.go | 46 ++-- .../stream_ingestion_processor.go | 37 +-- .../stream_ingestion_processor_test.go | 224 ++++++++++++++---- pkg/sql/BUILD.bazel | 1 + pkg/sql/exec_util.go | 11 + pkg/sql/execinfra/server_config.go | 3 + 14 files changed, 516 insertions(+), 158 deletions(-) diff --git a/pkg/ccl/changefeedccl/cdctest/BUILD.bazel b/pkg/ccl/changefeedccl/cdctest/BUILD.bazel index f61ecd134167..fe6307fd9e57 100644 --- a/pkg/ccl/changefeedccl/cdctest/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdctest/BUILD.bazel @@ -12,6 +12,7 @@ go_library( deps = [ "//pkg/jobs", "//pkg/jobs/jobspb", + "//pkg/roachpb", "//pkg/sql", "//pkg/sql/parser", "//pkg/sql/sem/tree", diff --git a/pkg/ccl/changefeedccl/cdctest/validator.go b/pkg/ccl/changefeedccl/cdctest/validator.go index 8025e299f35d..77c7ed060a07 100644 --- a/pkg/ccl/changefeedccl/cdctest/validator.go +++ b/pkg/ccl/changefeedccl/cdctest/validator.go @@ -16,6 +16,7 @@ import ( "sort" "strings" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/errors" @@ -32,15 +33,83 @@ type Validator interface { Failures() []string } +// StreamClientValidatorWrapper wraps a Validator and exposes additional methods +// used by stream ingestion to check for correctness. +type StreamClientValidatorWrapper interface { + GetValuesForKeyBelowTimestamp(key string, timestamp hlc.Timestamp) ([]roachpb.KeyValue, error) + GetValidator() Validator +} + +type streamValidator struct { + Validator +} + +var _ StreamClientValidatorWrapper = &streamValidator{} + +// NewStreamClientValidatorWrapper returns a wrapped Validator, that can be used +// to validate the events emitted by the cluster to cluster streaming client. +// The wrapper currently only "wraps" an orderValidator, but can be built out +// to utilize other Validator's. +// The wrapper also allows querying the orderValidator to retrieve streamed +// events from an in-memory store. +func NewStreamClientValidatorWrapper() StreamClientValidatorWrapper { + ov := NewOrderValidator("unusedC2C") + return &streamValidator{ + ov, + } +} + +// GetValidator implements the StreamClientValidatorWrapper interface. +func (sv *streamValidator) GetValidator() Validator { + return sv.Validator +} + +// GetValuesForKeyBelowTimestamp implements the StreamClientValidatorWrapper +// interface. +// It returns the streamed KV updates for `key` with a ts less than equal to +// `timestamp`. +func (sv *streamValidator) GetValuesForKeyBelowTimestamp( + key string, timestamp hlc.Timestamp, +) ([]roachpb.KeyValue, error) { + orderValidator, ok := sv.GetValidator().(*orderValidator) + if !ok { + return nil, errors.Newf("unknown validator %T: ", sv.GetValidator()) + } + timestampValueTuples := orderValidator.keyTimestampAndValues[key] + timestampsIdx := sort.Search(len(timestampValueTuples), func(i int) bool { + return timestamp.Less(timestampValueTuples[i].ts) + }) + var kv []roachpb.KeyValue + for _, tsValue := range timestampValueTuples[:timestampsIdx] { + byteRep := []byte(key) + kv = append(kv, roachpb.KeyValue{ + Key: byteRep, + Value: roachpb.Value{ + RawBytes: []byte(tsValue.value), + Timestamp: tsValue.ts, + }, + }) + } + + return kv, nil +} + +type timestampValue struct { + ts hlc.Timestamp + value string +} + type orderValidator struct { - topic string - partitionForKey map[string]string - keyTimestamps map[string][]hlc.Timestamp - resolved map[string]hlc.Timestamp + topic string + partitionForKey map[string]string + keyTimestampAndValues map[string][]timestampValue + resolved map[string]hlc.Timestamp failures []string } +var _ Validator = &orderValidator{} + // NewOrderValidator returns a Validator that checks the row and resolved // timestamp ordering guarantees. It also asserts that keys have an affinity to // a single partition. @@ -52,17 +121,15 @@ type orderValidator struct { // lower update timestamp will be emitted on that partition. func NewOrderValidator(topic string) Validator { return &orderValidator{ - topic: topic, - partitionForKey: make(map[string]string), - keyTimestamps: make(map[string][]hlc.Timestamp), - resolved: make(map[string]hlc.Timestamp), + topic: topic, + partitionForKey: make(map[string]string), + keyTimestampAndValues: make(map[string][]timestampValue), + resolved: make(map[string]hlc.Timestamp), } } // NoteRow implements the Validator interface. -func (v *orderValidator) NoteRow( - partition string, key, ignoredValue string, updated hlc.Timestamp, -) error { +func (v *orderValidator) NoteRow(partition string, key, value string, updated hlc.Timestamp) error { if prev, ok := v.partitionForKey[key]; ok && prev != partition { v.failures = append(v.failures, fmt.Sprintf( `key [%s] received on two partitions: %s and %s`, key, prev, partition, @@ -71,17 +138,20 @@ func (v *orderValidator) NoteRow( } v.partitionForKey[key] = partition - timestamps := v.keyTimestamps[key] - timestampsIdx := sort.Search(len(timestamps), func(i int) bool { - return updated.LessEq(timestamps[i]) + timestampValueTuples := v.keyTimestampAndValues[key] + timestampsIdx := sort.Search(len(timestampValueTuples), func(i int) bool { + return updated.LessEq(timestampValueTuples[i].ts) }) - seen := timestampsIdx < len(timestamps) && timestamps[timestampsIdx] == updated + seen := timestampsIdx < len(timestampValueTuples) && + timestampValueTuples[timestampsIdx].ts == updated - if !seen && len(timestamps) > 0 && updated.Less(timestamps[len(timestamps)-1]) { + if !seen && len(timestampValueTuples) > 0 && + updated.Less(timestampValueTuples[len(timestampValueTuples)-1].ts) { v.failures = append(v.failures, fmt.Sprintf( `topic %s partition %s: saw new row timestamp %s after %s was seen`, v.topic, partition, - updated.AsOfSystemTime(), timestamps[len(timestamps)-1].AsOfSystemTime(), + updated.AsOfSystemTime(), + timestampValueTuples[len(timestampValueTuples)-1].ts.AsOfSystemTime(), )) } if !seen && updated.Less(v.resolved[partition]) { @@ -92,8 +162,12 @@ func (v *orderValidator) NoteRow( } if !seen { - v.keyTimestamps[key] = append( - append(timestamps[:timestampsIdx], updated), timestamps[timestampsIdx:]...) + v.keyTimestampAndValues[key] = append( + append(timestampValueTuples[:timestampsIdx], timestampValue{ + ts: updated, + value: value, + }), + timestampValueTuples[timestampsIdx:]...) } return nil } diff --git a/pkg/ccl/streamingccl/addresses.go b/pkg/ccl/streamingccl/addresses.go index 8153fd8d3f66..cb358eba22a0 100644 --- a/pkg/ccl/streamingccl/addresses.go +++ b/pkg/ccl/streamingccl/addresses.go @@ -25,6 +25,11 @@ func (sa StreamAddress) URL() (*url.URL, error) { // Each partition will emit events for a fixed span of keys. type PartitionAddress string +// URL parses the partition address as a URL. +func (pa PartitionAddress) URL() (*url.URL, error) { + return url.Parse(string(pa)) +} + // Topology is a configuration of stream partitions. These are particular to a // stream. It specifies the number and addresses of partitions of the stream. // diff --git a/pkg/ccl/streamingccl/event.go b/pkg/ccl/streamingccl/event.go index b326d61d1fd5..4afcfb8b833d 100644 --- a/pkg/ccl/streamingccl/event.go +++ b/pkg/ccl/streamingccl/event.go @@ -37,11 +37,15 @@ type Event interface { // CheckpointEvent. The resolved timestamp indicates that all KV events until // this time have been emitted. GetResolved() *hlc.Timestamp + // GetPartitionAddress returns the PartitionAddress of the partition from which the + // event was emitted from. + GetPartitionAddress() *PartitionAddress } // kvEvent is a key value pair that needs to be ingested. type kvEvent struct { - kv roachpb.KeyValue + kv roachpb.KeyValue + partitionAddress PartitionAddress } var _ Event = kvEvent{} @@ -61,10 +65,16 @@ func (kve kvEvent) GetResolved() *hlc.Timestamp { return nil } +// GetPartitionAddress implements the Event interface. +func (kve kvEvent) GetPartitionAddress() *PartitionAddress { + return &kve.partitionAddress +} + // checkpointEvent indicates that the stream has emitted every change for all // keys in the span it is responsible for up until this timestamp. type checkpointEvent struct { resolvedTimestamp hlc.Timestamp + partitionAddress PartitionAddress } var _ Event = checkpointEvent{} @@ -84,12 +94,17 @@ func (ce checkpointEvent) GetResolved() *hlc.Timestamp { return &ce.resolvedTimestamp } +// GetPartitionAddress implements the Event interface. +func (ce checkpointEvent) GetPartitionAddress() *PartitionAddress { + return &ce.partitionAddress +} + // MakeKVEvent creates an Event from a KV. -func MakeKVEvent(kv roachpb.KeyValue) Event { - return kvEvent{kv: kv} +func MakeKVEvent(kv roachpb.KeyValue, address PartitionAddress) Event { + return kvEvent{kv: kv, partitionAddress: address} } // MakeCheckpointEvent creates an Event from a resolved timestamp. -func MakeCheckpointEvent(resolvedTimestamp hlc.Timestamp) Event { - return checkpointEvent{resolvedTimestamp: resolvedTimestamp} +func MakeCheckpointEvent(resolvedTimestamp hlc.Timestamp, address PartitionAddress) Event { + return checkpointEvent{resolvedTimestamp: resolvedTimestamp, partitionAddress: address} } diff --git a/pkg/ccl/streamingccl/streamclient/BUILD.bazel b/pkg/ccl/streamingccl/streamclient/BUILD.bazel index 33b33eae133a..fbf260b75c24 100644 --- a/pkg/ccl/streamingccl/streamclient/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamclient/BUILD.bazel @@ -14,12 +14,14 @@ go_library( "//pkg/keys", "//pkg/roachpb", "//pkg/sql", + "//pkg/sql/catalog/catalogkeys", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/systemschema", "//pkg/sql/catalog/tabledesc", "//pkg/sql/rowenc", "//pkg/sql/sem/tree", "//pkg/util/hlc", + "//pkg/util/randutil", "//pkg/util/syncutil", "//pkg/util/timeutil", ], diff --git a/pkg/ccl/streamingccl/streamclient/client_test.go b/pkg/ccl/streamingccl/streamclient/client_test.go index 4aea27ee9979..bdde34c67c1c 100644 --- a/pkg/ccl/streamingccl/streamclient/client_test.go +++ b/pkg/ccl/streamingccl/streamclient/client_test.go @@ -38,7 +38,7 @@ func (sc testStreamClient) GetTopology( // ConsumePartition implements the Client interface. func (sc testStreamClient) ConsumePartition( - _ context.Context, _ streamingccl.PartitionAddress, _ time.Time, + _ context.Context, pa streamingccl.PartitionAddress, _ time.Time, ) (chan streamingccl.Event, error) { sampleKV := roachpb.KeyValue{ Key: []byte("key_1"), @@ -49,8 +49,8 @@ func (sc testStreamClient) ConsumePartition( } events := make(chan streamingccl.Event, 2) - events <- streamingccl.MakeKVEvent(sampleKV) - events <- streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 100}) + events <- streamingccl.MakeKVEvent(sampleKV, pa) + events <- streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 100}, pa) close(events) return events, nil diff --git a/pkg/ccl/streamingccl/streamclient/random_stream_client.go b/pkg/ccl/streamingccl/streamclient/random_stream_client.go index 61e7a5b34bdb..bf9341396804 100644 --- a/pkg/ccl/streamingccl/streamclient/random_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/random_stream_client.go @@ -10,6 +10,7 @@ package streamclient import ( "context" + "fmt" "math/rand" "net/url" "strconv" @@ -19,20 +20,22 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) const ( - // RandomStreamSchema is the schema of the KVs emitted by the random stream - // client. - RandomStreamSchema = "CREATE TABLE test (k INT PRIMARY KEY, v INT)" + // RandomStreamSchemaPlaceholder is the schema of the KVs emitted by the + // random stream client. + RandomStreamSchemaPlaceholder = "CREATE TABLE %s (k INT PRIMARY KEY, v INT)" // TestScheme is the URI scheme used to create a test load. TestScheme = "test" @@ -45,14 +48,37 @@ const ( // KVsPerCheckpoint controls approximately how many KV events should be emitted // between checkpoint events. KVsPerCheckpoint = "KVS_PER_CHECKPOINT" + // NumPartitions controls the number of partitions the client will stream data + // back on. Each partition will encompass a single table span. + NumPartitions = "NUM_PARTITIONS" + // DupProbability controls the probability with which we emit duplicate KV + // events. + DupProbability = "DUP_PROBABILITY" + // IngestionDatabaseID is the ID used in the generated table descriptor. + IngestionDatabaseID = 50 /* defaultDB */ + // IngestionTablePrefix is the prefix of the table name used in the generated + // table descriptor. + IngestionTablePrefix = "foo" ) +type interceptFn func(event streamingccl.Event) + +// InterceptableStreamClient wraps a Client, and provides a method to register +// interceptor methods that are run on every streamed Event. +type InterceptableStreamClient interface { + Client + + RegisterInterception(fn interceptFn) +} + // randomStreamConfig specifies the variables that controls the rate and type of // events that the generated stream emits. type randomStreamConfig struct { valueRange int kvFrequency time.Duration kvsPerCheckpoint int + numPartitions int + dupProbability float64 } func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) { @@ -60,6 +86,8 @@ func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) { valueRange: 100, kvFrequency: 10 * time.Microsecond, kvsPerCheckpoint: 100, + numPartitions: 1, + dupProbability: 0.5, } var err error @@ -85,21 +113,33 @@ func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) { } } + if numPartitionsStr := streamURL.Query().Get(NumPartitions); numPartitionsStr != "" { + c.numPartitions, err = strconv.Atoi(numPartitionsStr) + if err != nil { + return c, err + } + } + + if dupProbStr := streamURL.Query().Get(DupProbability); dupProbStr != "" { + c.dupProbability, err = strconv.ParseFloat(dupProbStr, 32) + if err != nil { + return c, err + } + } return c, nil } // randomStreamClient is a temporary stream client implementation that generates // random events. // -// It expects a table with the schema `RandomStreamSchema` to already exist, -// with table ID `` to be used in the URI. Opening the stream client -// on the URI 'test://' will generate random events into this table. +// The client can be configured to return more than one partition via the stream +// URL. Each partition covers a single table span. // // TODO: Move this over to a _test file in the ingestion package when there is a // real stream client implementation. type randomStreamClient struct { - baseDesc *tabledesc.Mutable - config randomStreamConfig + tableIDToDesc map[descpb.ID]*tabledesc.Mutable + config randomStreamConfig // interceptors can be registered to peek at every event generated by this // client. @@ -111,55 +151,116 @@ type randomStreamClient struct { } var _ Client = &randomStreamClient{} +var _ InterceptableStreamClient = &randomStreamClient{} // newRandomStreamClient returns a stream client that generates a random set of // events on a table with an integer key and integer value for the table with // the given ID. func newRandomStreamClient(streamURL *url.URL) (Client, error) { - tableID, err := strconv.Atoi(streamURL.Host) - if err != nil { - return nil, err - } - testTable, err := sql.CreateTestTableDescriptor( - context.Background(), - 50, /* defaultdb */ - descpb.ID(tableID), - RandomStreamSchema, - systemschema.JobsTable.Privileges, - ) - if err != nil { - return nil, err - } - streamConfig, err := parseRandomStreamConfig(streamURL) if err != nil { return nil, err } return &randomStreamClient{ - baseDesc: testTable, - config: streamConfig, + config: streamConfig, }, nil } +var testTableID = 52 + +func getNextTableID() int { + ret := testTableID + testTableID++ + return ret +} + // GetTopology implements the Client interface. func (m *randomStreamClient) GetTopology( _ streamingccl.StreamAddress, ) (streamingccl.Topology, error) { - panic("not yet implemented") + topology := streamingccl.Topology{Partitions: make([]streamingccl.PartitionAddress, + 0, m.config.numPartitions)} + + // Allocate table IDs and return one per partition address in the topology. + for i := 0; i < m.config.numPartitions; i++ { + tableID := descpb.ID(getNextTableID()) + partitionURI := url.URL{ + Scheme: TestScheme, + Host: strconv.Itoa(int(tableID)), + } + topology.Partitions = append(topology.Partitions, + streamingccl.PartitionAddress(partitionURI.String())) + } + + return topology, nil +} + +// getDescriptorAndNamespaceKVForTableID returns the namespace and descriptor +// KVs for the table with tableID. +func (m *randomStreamClient) getDescriptorAndNamespaceKVForTableID( + tableID descpb.ID, +) (*tabledesc.Mutable, []roachpb.KeyValue, error) { + tableName := fmt.Sprintf("%s%d", IngestionTablePrefix, tableID) + testTable, err := sql.CreateTestTableDescriptor( + context.Background(), + IngestionDatabaseID, + tableID, + fmt.Sprintf(RandomStreamSchemaPlaceholder, tableName), + systemschema.JobsTable.Privileges, + ) + if err != nil { + return nil, nil, err + } + + //// Generate namespace entry. + key := catalogkeys.NewTableKey(50, keys.PublicSchemaID, testTable.Name) + var value roachpb.Value + value.SetInt(int64(testTable.GetID())) + namespaceKV := roachpb.KeyValue{ + Key: key.Key(keys.TODOSQLCodec), + Value: value, + } + + // Generate descriptor entry. + descKey := catalogkeys.MakeDescMetadataKey(keys.TODOSQLCodec, testTable.GetID()) + descDesc := testTable.DescriptorProto() + var descValue roachpb.Value + if err := descValue.SetProto(descDesc); err != nil { + panic(err) + } + descKV := roachpb.KeyValue{ + Key: descKey, + Value: descValue, + } + + return testTable, []roachpb.KeyValue{namespaceKV, descKV}, nil } // ConsumePartition implements the Client interface. func (m *randomStreamClient) ConsumePartition( - ctx context.Context, _ streamingccl.PartitionAddress, startTime time.Time, + ctx context.Context, partitionAddress streamingccl.PartitionAddress, startTime time.Time, ) (chan streamingccl.Event, error) { eventCh := make(chan streamingccl.Event) now := timeutil.Now() if startTime.After(now) { panic("cannot start random stream client event stream in the future") } - lastResolvedTime := startTime + partitionURL, err := partitionAddress.URL() + if err != nil { + return nil, err + } + var partitionTableID int + partitionTableID, err = strconv.Atoi(partitionURL.Host) + if err != nil { + return nil, err + } + + tableDesc, systemKVs, err := m.getDescriptorAndNamespaceKVForTableID(descpb.ID(partitionTableID)) + if err != nil { + return nil, err + } go func() { defer close(eventCh) @@ -176,24 +277,40 @@ func (m *randomStreamClient) ConsumePartition( resolvedTimer.Reset(0) defer resolvedTimer.Stop() + rng, _ := randutil.NewPseudoRand() + var dupKVEvent streamingccl.Event + for { var event streamingccl.Event select { case <-kvTimer.C: kvTimer.Read = true - event = streamingccl.MakeKVEvent(m.makeRandomKey(r, lastResolvedTime)) + // If there are system KVs to emit, prioritize those. + if len(systemKVs) > 0 { + systemKV := systemKVs[0] + systemKV.Value.Timestamp = hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + event = streamingccl.MakeKVEvent(systemKV, partitionAddress) + systemKVs = systemKVs[1:] + } else { + // Generate a duplicate KVEvent, and update its timestamp to now(). + if rng.Float64() < m.config.dupProbability && dupKVEvent != nil { + dupKV := dupKVEvent.GetKV() + dupKV.Value.Timestamp = hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + event = streamingccl.MakeKVEvent(*dupKV, partitionAddress) + } else { + event = streamingccl.MakeKVEvent(m.makeRandomKey(r, tableDesc), partitionAddress) + dupKVEvent = event + } + } kvTimer.Reset(kvInterval) case <-resolvedTimer.C: resolvedTimer.Read = true resolvedTime := timeutil.Now() hlcResolvedTime := hlc.Timestamp{WallTime: resolvedTime.UnixNano()} - event = streamingccl.MakeCheckpointEvent(hlcResolvedTime) - lastResolvedTime = resolvedTime + event = streamingccl.MakeCheckpointEvent(hlcResolvedTime, partitionAddress) resolvedTimer.Reset(resolvedInterval) } - // TODO: Consider keeping an in-memory copy so that tests can verify - // that the data we've ingested is correct. select { case eventCh <- event: case <-ctx.Done(): @@ -215,9 +332,9 @@ func (m *randomStreamClient) ConsumePartition( return eventCh, nil } -func (m *randomStreamClient) makeRandomKey(r *rand.Rand, minTs time.Time) roachpb.KeyValue { - tableDesc := m.baseDesc - +func (m *randomStreamClient) makeRandomKey( + r *rand.Rand, tableDesc *tabledesc.Mutable, +) roachpb.KeyValue { // Create a key holding a random integer. k, err := rowenc.TestingMakePrimaryIndexKey(tableDesc, r.Intn(m.config.valueRange)) if err != nil { @@ -237,10 +354,7 @@ func (m *randomStreamClient) makeRandomKey(r *rand.Rand, minTs time.Time) roachp v.ClearChecksum() v.InitChecksum(k) - // Generate a timestamp between minTs and now(). - randOffset := int(timeutil.Now().UnixNano()) - int(minTs.UnixNano()) - newTimestamp := rand.Intn(randOffset) + int(minTs.UnixNano()) - v.Timestamp = hlc.Timestamp{WallTime: int64(newTimestamp)} + v.Timestamp = hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} return roachpb.KeyValue{ Key: k, @@ -249,8 +363,8 @@ func (m *randomStreamClient) makeRandomKey(r *rand.Rand, minTs time.Time) roachp } // RegisterInterception implements streamingest.interceptableStreamClient. -func (m *randomStreamClient) RegisterInterception(f func(event streamingccl.Event)) { +func (m *randomStreamClient) RegisterInterception(fn interceptFn) { m.mu.Lock() defer m.mu.Unlock() - m.mu.interceptors = append(m.mu.interceptors, f) + m.mu.interceptors = append(m.mu.interceptors, fn) } diff --git a/pkg/ccl/streamingccl/streamingest/BUILD.bazel b/pkg/ccl/streamingccl/streamingest/BUILD.bazel index 10e641f42fd3..ff47cc0255e9 100644 --- a/pkg/ccl/streamingccl/streamingest/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamingest/BUILD.bazel @@ -56,6 +56,7 @@ go_test( embed = [":streamingest"], deps = [ "//pkg/base", + "//pkg/ccl/changefeedccl/cdctest", "//pkg/ccl/storageccl", "//pkg/ccl/streamingccl", "//pkg/ccl/streamingccl/streamclient", @@ -69,9 +70,11 @@ go_test( "//pkg/security/securitytest", "//pkg/server", "//pkg/settings/cluster", + "//pkg/sql", "//pkg/sql/execinfra", "//pkg/sql/execinfrapb", "//pkg/sql/sem/tree", + "//pkg/storage", "//pkg/testutils", "//pkg/testutils/distsqlutils", "//pkg/testutils/serverutils", diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go index 7eda9458777d..f05e3c7b83a8 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go @@ -63,8 +63,8 @@ func TestStreamIngestionFrontierProcessor(t *testing.T) { post := execinfrapb.PostProcessSpec{} var spec execinfrapb.StreamIngestionDataSpec - pa1 := streamingccl.PartitionAddress("s3://my_streams/stream/partition1") - pa2 := streamingccl.PartitionAddress("s3://my_streams/stream/partition2") + pa1 := streamingccl.PartitionAddress("partition1") + pa2 := streamingccl.PartitionAddress("partition2") v := roachpb.MakeValueFromString("value_1") v.Timestamp = hlc.Timestamp{WallTime: 1} @@ -79,11 +79,11 @@ func TestStreamIngestionFrontierProcessor(t *testing.T) { { name: "same-resolved-ts-across-partitions", events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}), - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}, pa1), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}, pa1), }, pa2: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}), - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}, pa2), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}, pa2), }}, expectedFrontierTimestamp: hlc.Timestamp{WallTime: 4}, }, @@ -92,9 +92,9 @@ func TestStreamIngestionFrontierProcessor(t *testing.T) { // emitted a resolved ts. name: "no-checkpoints", events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeKVEvent(sampleKV), + streamingccl.MakeKVEvent(sampleKV, pa1), }, pa2: []streamingccl.Event{ - streamingccl.MakeKVEvent(sampleKV), + streamingccl.MakeKVEvent(sampleKV, pa2), }}, }, { @@ -102,39 +102,39 @@ func TestStreamIngestionFrontierProcessor(t *testing.T) { // emitted a resolved ts. name: "no-checkpoint-from-one-partition", events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}), - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}, pa1), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}, pa1), }, pa2: []streamingccl.Event{}}, }, { name: "one-partition-ahead-of-the-other", events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}), - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}, pa1), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}, pa1), }, pa2: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}, pa2), }}, expectedFrontierTimestamp: hlc.Timestamp{WallTime: 1}, }, { name: "some-interleaved-timestamps", events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 2}), - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 2}, pa1), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}, pa1), }, pa2: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 3}), - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 5}), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 3}, pa2), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 5}, pa2), }}, expectedFrontierTimestamp: hlc.Timestamp{WallTime: 4}, }, { name: "some-interleaved-logical-timestamps", events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1, Logical: 2}), - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1, Logical: 4}), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1, Logical: 2}, pa1), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1, Logical: 4}, pa1), }, pa2: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1, Logical: 1}), - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 2}), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1, Logical: 1}, pa2), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 2}, pa2), }}, expectedFrontierTimestamp: hlc.Timestamp{WallTime: 1, Logical: 4}, }, @@ -143,9 +143,9 @@ func TestStreamIngestionFrontierProcessor(t *testing.T) { // lower than its start time. name: "checkpoint-lower-than-start-ts", events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1, Logical: 4}), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1, Logical: 4}, pa1), }, pa2: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1, Logical: 2}), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1, Logical: 2}, pa2), }}, frontierStartTime: hlc.Timestamp{WallTime: 1, Logical: 3}, }, diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index 05b5786100c9..bb312222a296 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv/bulk" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" @@ -66,14 +67,7 @@ type streamIngestionProcessor struct { ingestionErr error // eventCh is the merged event channel of all of the partition event streams. - eventCh chan partitionEvent -} - -// partitionEvent augments a normal event with the partition it came from. -type partitionEvent struct { - streamingccl.Event - - partition streamingccl.PartitionAddress + eventCh chan streamingccl.Event } var _ execinfra.Processor = &streamIngestionProcessor{} @@ -93,6 +87,20 @@ func newStreamIngestionDataProcessor( return nil, err } + // Check if there are any interceptor methods that need to be registered with + // the stream client. + // These methods are invoked on every emitted Event. + if knobs, ok := flowCtx.Cfg.TestingKnobs.StreamIngestionTestingKnobs.(*sql. + StreamIngestionTestingKnobs); ok { + if knobs.Interceptors != nil { + if interceptable, ok := streamClient.(streamclient.InterceptableStreamClient); ok { + for _, interceptor := range knobs.Interceptors { + interceptable.RegisterInterception(interceptor) + } + } + } + } + sip := &streamIngestionProcessor{ flowCtx: flowCtx, spec: spec, @@ -211,8 +219,8 @@ func (sip *streamIngestionProcessor) flush() error { // channel. func merge( ctx context.Context, partitionStreams map[streamingccl.PartitionAddress]chan streamingccl.Event, -) chan partitionEvent { - merged := make(chan partitionEvent) +) chan streamingccl.Event { + merged := make(chan streamingccl.Event) var wg sync.WaitGroup wg.Add(len(partitionStreams)) @@ -221,13 +229,8 @@ func merge( go func(partition streamingccl.PartitionAddress, eventCh <-chan streamingccl.Event) { defer wg.Done() for event := range eventCh { - pe := partitionEvent{ - Event: event, - partition: partition, - } - select { - case merged <- pe: + case merged <- event: case <-ctx.Done(): // TODO: Add ctx.Err() to an error channel once ConsumePartition // supports an error ch. @@ -283,7 +286,7 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpan, erro // Each partition is represented by a span defined by the // partition address. - spanStartKey := roachpb.Key(event.partition) + spanStartKey := roachpb.Key(*event.GetPartitionAddress()) return &jobspb.ResolvedSpan{ Span: roachpb.Span{Key: spanStartKey, EndKey: spanStartKey.Next()}, Timestamp: resolvedTime, diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go index 314d0aaa942d..1d07f011c7f8 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go @@ -16,15 +16,19 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -38,12 +42,6 @@ import ( "github.com/stretchr/testify/require" ) -type interceptableStreamClient interface { - streamclient.Client - - RegisterInterception(func(event streamingccl.Event)) -} - // mockStreamClient will return the slice of events associated to the stream // partition being consumed. Stream partitions are identified by unique // partition addresses. @@ -93,17 +91,17 @@ func TestStreamIngestionProcessor(t *testing.T) { defer tc.Stopper().Stop(ctx) kvDB := tc.Server(0).DB() - // Inject a mock client. v := roachpb.MakeValueFromString("value_1") v.Timestamp = hlc.Timestamp{WallTime: 1} sampleKV := roachpb.KeyValue{Key: roachpb.Key("key_1"), Value: v} + unusedPartitionAddress := streamingccl.PartitionAddress("") events := []streamingccl.Event{ - streamingccl.MakeKVEvent(sampleKV), - streamingccl.MakeKVEvent(sampleKV), - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}), - streamingccl.MakeKVEvent(sampleKV), - streamingccl.MakeKVEvent(sampleKV), - streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}), + streamingccl.MakeKVEvent(sampleKV, unusedPartitionAddress), + streamingccl.MakeKVEvent(sampleKV, unusedPartitionAddress), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 1}, unusedPartitionAddress), + streamingccl.MakeKVEvent(sampleKV, unusedPartitionAddress), + streamingccl.MakeKVEvent(sampleKV, unusedPartitionAddress), + streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 4}, unusedPartitionAddress), } pa1 := streamingccl.PartitionAddress("partition1") pa2 := streamingccl.PartitionAddress("partition2") @@ -112,8 +110,9 @@ func TestStreamIngestionProcessor(t *testing.T) { } startTime := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} - out, err := runStreamIngestionProcessor(ctx, t, kvDB, "some://stream", startTime, - nil /* interceptors */, mockClient) + partitionAddresses := []streamingccl.PartitionAddress{"partition1", "partition2"} + out, err := runStreamIngestionProcessor(ctx, t, kvDB, "some://stream", partitionAddresses, + startTime, nil /* interceptEvents */, mockClient) require.NoError(t, err) // Compare the set of results since the ordering is not guaranteed. @@ -142,6 +141,90 @@ func TestStreamIngestionProcessor(t *testing.T) { require.Equal(t, expectedRows, actualRows) } +func getPartitionSpanToTableID( + t *testing.T, partitionAddresses []streamingccl.PartitionAddress, +) map[string]int { + pSpanToTableID := make(map[string]int) + + // Aggregate the table IDs which should have been ingested. + for _, pa := range partitionAddresses { + pKey := roachpb.Key(pa) + pSpan := roachpb.Span{pKey, pKey.Next()} + paURL, err := pa.URL() + require.NoError(t, err) + id, err := strconv.Atoi(paURL.Host) + require.NoError(t, err) + pSpanToTableID[pSpan.String()] = id + } + return pSpanToTableID +} + +// assertEqualKVs iterates over the store in `tc` and compares the MVCC KVs +// against the in-memory copy of events stored in the `streamValidator`. This +// ensures that the stream ingestion processor ingested at least as much data as +// was streamed up until partitionTimestamp. +func assertEqualKVs( + t *testing.T, + tc *testcluster.TestCluster, + streamValidator cdctest.StreamClientValidatorWrapper, + tableID int, + partitionTimestamp hlc.Timestamp, +) { + key := keys.TODOSQLCodec.TablePrefix(uint32(tableID)) + + // Iterate over the store. + store := tc.GetFirstStoreFromServer(t, 0) + it := store.Engine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ + LowerBound: key, + UpperBound: key.PrefixEnd(), + }) + defer it.Close() + var prevKey roachpb.Key + var valueTimestampTuples []roachpb.KeyValue + var err error + for it.SeekGE(storage.MVCCKey{}); ; it.Next() { + if ok, err := it.Valid(); !ok { + if err != nil { + t.Fatal(err) + } + break + } + + // We only want to process MVCC KVs with a ts less than or equal to the max + // resolved ts for this partition. + if partitionTimestamp.Less(it.Key().Timestamp) { + continue + } + + newKey := (prevKey != nil && !it.Key().Key.Equal(prevKey)) || prevKey == nil + prevKey = it.Key().Key + + if newKey { + // All value ts should have been drained at this point, otherwise there is + // a mismatch between the streamed and ingested data. + require.Equal(t, 0, len(valueTimestampTuples)) + valueTimestampTuples, err = streamValidator.GetValuesForKeyBelowTimestamp( + string(it.Key().Key), partitionTimestamp) + require.NoError(t, err) + } + + require.Greater(t, len(valueTimestampTuples), 0) + // Since the iterator goes from latest to older versions, we compare + // starting from the end of the slice that is sorted by timestamp. + latestVersionInChain := valueTimestampTuples[len(valueTimestampTuples)-1] + require.Equal(t, roachpb.KeyValue{ + Key: it.Key().Key, + Value: roachpb.Value{ + RawBytes: it.Value(), + Timestamp: it.Key().Timestamp, + }, + }, latestVersionInChain) + // Truncate the latest version which we just checked against in preparation + // for the next iteration. + valueTimestampTuples = valueTimestampTuples[0 : len(valueTimestampTuples)-1] + } +} + // TestRandomClientGeneration tests the ingestion processor against a random // stream workload. func TestRandomClientGeneration(t *testing.T) { @@ -151,13 +234,14 @@ func TestRandomClientGeneration(t *testing.T) { ctx := context.Background() makeTestStreamURI := func( - tableID string, - valueRange, kvsPerResolved int, - kvFrequency time.Duration, + valueRange, kvsPerResolved, numPartitions int, + kvFrequency time.Duration, dupProbability float64, ) string { - return "test://" + tableID + "?VALUE_RANGE=" + strconv.Itoa(valueRange) + + return "test:///" + "?VALUE_RANGE=" + strconv.Itoa(valueRange) + "&KV_FREQUENCY=" + strconv.Itoa(int(kvFrequency)) + - "&KVS_PER_RESOLVED=" + strconv.Itoa(kvsPerResolved) + "&KVS_PER_CHECKPOINT=" + strconv.Itoa(kvsPerResolved) + + "&NUM_PARTITIONS=" + strconv.Itoa(numPartitions) + + "&DUP_PROBABILITY=" + strconv.FormatFloat(dupProbability, 'f', -1, 32) } tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{}) @@ -166,30 +250,37 @@ func TestRandomClientGeneration(t *testing.T) { conn := tc.Conns[0] sqlDB := sqlutils.MakeSQLRunner(conn) - // Create the expected table for the random stream to ingest into. - sqlDB.Exec(t, streamclient.RandomStreamSchema) - tableID := sqlDB.QueryStr(t, `SELECT id FROM system.namespace WHERE name = 'test'`)[0][0] - // TODO: Consider testing variations on these parameters. valueRange := 100 kvsPerResolved := 1_000 kvFrequency := 50 * time.Nanosecond - streamAddr := makeTestStreamURI(tableID, valueRange, kvsPerResolved, kvFrequency) + numPartitions := 4 + dupProbability := 0.2 + streamAddr := makeTestStreamURI(valueRange, kvsPerResolved, numPartitions, kvFrequency, + dupProbability) + + // The random client returns system and table data partitions. + streamClient, err := streamclient.NewStreamClient(streamingccl.StreamAddress(streamAddr)) + require.NoError(t, err) + topo, err := streamClient.GetTopology(streamingccl.StreamAddress(streamAddr)) + require.NoError(t, err) + // One system and two table data partitions. + require.Equal(t, numPartitions, len(topo.Partitions)) startTime := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} ctx, cancel := context.WithCancel(ctx) // Cancel the flow after emitting 1000 checkpoint events from the client. - cancelAfterCheckpoints := makeCheckpointEventCounter(1_000, cancel) - out, err := runStreamIngestionProcessor(ctx, t, kvDB, streamAddr, startTime, - cancelAfterCheckpoints, nil /* mockClient */) + cancelAfterCheckpoints := makeCheckpointEventCounter(1000, cancel) + streamValidator := cdctest.NewStreamClientValidatorWrapper() + validator := registerValidator(streamValidator.GetValidator()) + out, err := runStreamIngestionProcessor(ctx, t, kvDB, streamAddr, topo.Partitions, + startTime, []func(streamingccl.Event){cancelAfterCheckpoints, validator}, nil /* mockClient */) require.NoError(t, err) - p1Key := roachpb.Key("partition1") - p2Key := roachpb.Key("partition2") - p1Span := roachpb.Span{Key: p1Key, EndKey: p1Key.Next()} - p2Span := roachpb.Span{Key: p2Key, EndKey: p2Key.Next()} + partitionSpanToTableID := getPartitionSpanToTableID(t, topo.Partitions) numResolvedEvents := 0 + maxResolvedTimestampPerPartition := make(map[string]hlc.Timestamp) for { row, meta := out.Next() if meta != nil { @@ -209,20 +300,38 @@ func TestRandomClientGeneration(t *testing.T) { var resolvedSpan jobspb.ResolvedSpan require.NoError(t, protoutil.Unmarshal([]byte(*protoBytes), &resolvedSpan)) - if resolvedSpan.Span.String() != p1Span.String() && resolvedSpan.Span.String() != p2Span.String() { - t.Fatalf("expected resolved span %v to be either %v or %v", resolvedSpan.Span, p1Span, p2Span) + if _, ok := partitionSpanToTableID[resolvedSpan.Span.String()]; !ok { + t.Fatalf("expected resolved span %v to be either in one of the supplied partition"+ + " addresses %v", resolvedSpan.Span, topo.Partitions) } // All resolved timestamp events should be greater than the start time. require.Less(t, startTime.WallTime, resolvedSpan.Timestamp.WallTime) + + // Track the max resolved timestamp per partition. + if ts, ok := maxResolvedTimestampPerPartition[resolvedSpan.Span.String()]; !ok || + ts.Less(resolvedSpan.Timestamp) { + maxResolvedTimestampPerPartition[resolvedSpan.Span.String()] = resolvedSpan.Timestamp + } numResolvedEvents++ } - // Check that some rows have been ingested and that we've emitted some resolved events. - numRows, err := strconv.Atoi(sqlDB.QueryStr(t, `SELECT count(*) FROM defaultdb.test`)[0][0]) - require.NoError(t, err) - require.Greater(t, numRows, 0, "at least 1 row ingested expected") + // Ensure that no errors were reported to the validator. + for _, failure := range streamValidator.GetValidator().Failures() { + t.Error(failure) + } + + for pSpan, id := range partitionSpanToTableID { + numRows, err := strconv.Atoi(sqlDB.QueryStr(t, fmt.Sprintf( + `SELECT count(*) FROM defaultdb.%s%d`, streamclient.IngestionTablePrefix, id))[0][0]) + require.NoError(t, err) + require.Greater(t, numRows, 0, "at least 1 row ingested expected") + // Scan the store for KVs ingested by this partition, and compare the MVCC + // KVs against the KVEvents streamed up to the max ingested timestamp for + // the partition. + assertEqualKVs(t, tc, streamValidator, id, maxResolvedTimestampPerPartition[pSpan]) + } require.Greater(t, numResolvedEvents, 0, "at least 1 resolved event expected") } @@ -231,8 +340,9 @@ func runStreamIngestionProcessor( t *testing.T, kvDB *kv.DB, streamAddr string, + partitionAddresses []streamingccl.PartitionAddress, startTime hlc.Timestamp, - interceptEvents func(streamingccl.Event), + interceptEvents []func(streamingccl.Event), mockClient streamclient.Client, ) (*distsqlutils.RowBuffer, error) { st := cluster.MakeTestingClusterSettings() @@ -249,6 +359,8 @@ func runStreamIngestionProcessor( }, EvalCtx: &evalCtx, } + flowCtx.Cfg.TestingKnobs.StreamIngestionTestingKnobs = &sql.StreamIngestionTestingKnobs{ + Interceptors: interceptEvents} out := &distsqlutils.RowBuffer{} post := execinfrapb.PostProcessSpec{} @@ -256,7 +368,7 @@ func runStreamIngestionProcessor( var spec execinfrapb.StreamIngestionDataSpec spec.StreamAddress = streamingccl.StreamAddress(streamAddr) - spec.PartitionAddresses = []streamingccl.PartitionAddress{"partition1", "partition2"} + spec.PartitionAddresses = partitionAddresses spec.StartTime = startTime processorID := int32(0) proc, err := newStreamIngestionDataProcessor(&flowCtx, processorID, spec, &post, out) @@ -270,15 +382,6 @@ func runStreamIngestionProcessor( sip.client = mockClient } - if interceptableClient, ok := sip.client.(interceptableStreamClient); ok { - interceptableClient.RegisterInterception(interceptEvents) - // TODO: Inject an interceptor here that keeps track of generated events so - // we can compare. - } else if interceptEvents != nil { - t.Fatalf("interceptor specified, but client %T does not implement interceptableStreamClient", - sip.client) - } - sip.Run(ctx) // Ensure that all the outputs are properly closed. @@ -288,6 +391,29 @@ func runStreamIngestionProcessor( return out, err } +func registerValidator(validator cdctest.Validator) func(event streamingccl.Event) { + return func(event streamingccl.Event) { + switch event.Type() { + case streamingccl.CheckpointEvent: + pa := *event.GetPartitionAddress() + resolvedTS := *event.GetResolved() + err := validator.NoteResolved(string(pa), resolvedTS) + if err != nil { + panic(err.Error()) + } + case streamingccl.KVEvent: + pa := *event.GetPartitionAddress() + kv := *event.GetKV() + + err := validator.NoteRow(string(pa), string(kv.Key), string(kv.Value.RawBytes), + kv.Value.Timestamp) + if err != nil { + panic(err.Error()) + } + } + } +} + // makeCheckpointEventCounter runs f after seeing `threshold` number of // checkpoint events. func makeCheckpointEventCounter(threshold int, f func()) func(streamingccl.Event) { diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 2c5d82cc2a77..234abad345a6 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -225,6 +225,7 @@ go_library( deps = [ "//pkg/base", "//pkg/build", + "//pkg/ccl/streamingccl", "//pkg/clusterversion", "//pkg/config", "//pkg/config/zonepb", diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 20bf1e0412a3..aa234da8b6de 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/apd/v2" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/config/zonepb" @@ -958,6 +959,16 @@ var _ base.ModuleTestingKnobs = &BackupRestoreTestingKnobs{} // ModuleTestingKnobs implements the base.ModuleTestingKnobs interface. func (*BackupRestoreTestingKnobs) ModuleTestingKnobs() {} +// StreamIngestionTestingKnobs contains knobs for stream ingestion behavior. +type StreamIngestionTestingKnobs struct { + Interceptors []func(event streamingccl.Event) +} + +var _ base.ModuleTestingKnobs = &StreamIngestionTestingKnobs{} + +// ModuleTestingKnobs implements the base.ModuleTestingKnobs interface. +func (*StreamIngestionTestingKnobs) ModuleTestingKnobs() {} + func shouldDistributeGivenRecAndMode( rec distRecommendation, mode sessiondata.DistSQLExecMode, ) bool { diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index c5b840724e71..152787ad40ab 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -239,6 +239,9 @@ type TestingKnobs struct { // BackupRestoreTestingKnobs are backup and restore specific testing knobs. BackupRestoreTestingKnobs base.ModuleTestingKnobs + + // BackupRestoreTestingKnobs are stream ingestion specific testing knobs. + StreamIngestionTestingKnobs base.ModuleTestingKnobs } // MetadataTestLevel represents the types of queries where metadata test