diff --git a/pkg/ccl/changefeedccl/cdctest/validator.go b/pkg/ccl/changefeedccl/cdctest/validator.go index 77c7ed060a07..bef55c6c0302 100644 --- a/pkg/ccl/changefeedccl/cdctest/validator.go +++ b/pkg/ccl/changefeedccl/cdctest/validator.go @@ -33,65 +33,13 @@ type Validator interface { Failures() []string } -// StreamClientValidatorWrapper wraps a Validator and exposes additional methods -// used by stream ingestion to check for correctness. -type StreamClientValidatorWrapper interface { - GetValuesForKeyBelowTimestamp(key string, timestamp hlc.Timestamp) ([]roachpb.KeyValue, error) - GetValidator() Validator -} - -type streamValidator struct { +// StreamValidator wraps a Validator and exposes additional methods for +// introspection. +type StreamValidator interface { Validator -} - -var _ StreamClientValidatorWrapper = &streamValidator{} - -// NewStreamClientValidatorWrapper returns a wrapped Validator, that can be used -// to validate the events emitted by the cluster to cluster streaming client. -// The wrapper currently only "wraps" an orderValidator, but can be built out -// to utilize other Validator's. -// The wrapper also allows querying the orderValidator to retrieve streamed -// events from an in-memory store. -func NewStreamClientValidatorWrapper() StreamClientValidatorWrapper { - ov := NewOrderValidator("unusedC2C") - return &streamValidator{ - ov, - } -} - -// GetValidator implements the StreamClientValidatorWrapper interface. -func (sv *streamValidator) GetValidator() Validator { - return sv.Validator -} - -// GetValuesForKeyBelowTimestamp implements the StreamClientValidatorWrapper -// interface. -// It returns the streamed KV updates for `key` with a ts less than equal to -// `timestamp`. -func (sv *streamValidator) GetValuesForKeyBelowTimestamp( - key string, timestamp hlc.Timestamp, -) ([]roachpb.KeyValue, error) { - orderValidator, ok := sv.GetValidator().(*orderValidator) - if !ok { - return nil, errors.Newf("unknown validator %T: ", sv.GetValidator()) - } - timestampValueTuples := orderValidator.keyTimestampAndValues[key] - timestampsIdx := sort.Search(len(timestampValueTuples), func(i int) bool { - return timestamp.Less(timestampValueTuples[i].ts) - }) - var kv []roachpb.KeyValue - for _, tsValue := range timestampValueTuples[:timestampsIdx] { - byteRep := []byte(key) - kv = append(kv, roachpb.KeyValue{ - Key: byteRep, - Value: roachpb.Value{ - RawBytes: []byte(tsValue.value), - Timestamp: tsValue.ts, - }, - }) - } - - return kv, nil + // GetValuesForKeyBelowTimestamp returns the streamed KV updates for `key` + // with a ts less than equal to timestamp`. + GetValuesForKeyBelowTimestamp(key string, timestamp hlc.Timestamp) ([]roachpb.KeyValue, error) } type timestampValue struct { @@ -109,6 +57,7 @@ type orderValidator struct { } var _ Validator = &orderValidator{} +var _ StreamValidator = &orderValidator{} // NewOrderValidator returns a Validator that checks the row and resolved // timestamp ordering guarantees. It also asserts that keys have an affinity to @@ -128,6 +77,40 @@ func NewOrderValidator(topic string) Validator { } } +// NewStreamOrderValidator wraps and orderValidator as described above, and +// exposes additional methods for introspection. +func NewStreamOrderValidator() StreamValidator { + return &orderValidator{ + topic: "unused", + partitionForKey: make(map[string]string), + keyTimestampAndValues: make(map[string][]timestampValue), + resolved: make(map[string]hlc.Timestamp), + } +} + +// GetValuesForKeyBelowTimestamp implements the StreamValidator interface. +func (v *orderValidator) GetValuesForKeyBelowTimestamp( + key string, timestamp hlc.Timestamp, +) ([]roachpb.KeyValue, error) { + timestampValueTuples := v.keyTimestampAndValues[key] + timestampsIdx := sort.Search(len(timestampValueTuples), func(i int) bool { + return timestamp.Less(timestampValueTuples[i].ts) + }) + var kv []roachpb.KeyValue + for _, tsValue := range timestampValueTuples[:timestampsIdx] { + byteRep := []byte(key) + kv = append(kv, roachpb.KeyValue{ + Key: byteRep, + Value: roachpb.Value{ + RawBytes: []byte(tsValue.value), + Timestamp: tsValue.ts, + }, + }) + } + + return kv, nil +} + // NoteRow implements the Validator interface. func (v *orderValidator) NoteRow(partition string, key, value string, updated hlc.Timestamp) error { if prev, ok := v.partitionForKey[key]; ok && prev != partition { diff --git a/pkg/ccl/streamingccl/streamclient/BUILD.bazel b/pkg/ccl/streamingccl/streamclient/BUILD.bazel index 0b24249f1236..f94d1b63917b 100644 --- a/pkg/ccl/streamingccl/streamclient/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamclient/BUILD.bazel @@ -16,7 +16,6 @@ go_library( "//pkg/sql", "//pkg/sql/catalog/catalogkeys", "//pkg/sql/catalog/descpb", - "//pkg/sql/catalog/systemschema", "//pkg/sql/catalog/tabledesc", "//pkg/sql/rowenc", "//pkg/sql/sem/tree", diff --git a/pkg/ccl/streamingccl/streamclient/random_stream_client.go b/pkg/ccl/streamingccl/streamclient/random_stream_client.go index 4012c97a6a47..6813e5a4afd0 100644 --- a/pkg/ccl/streamingccl/streamclient/random_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/random_stream_client.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -54,6 +53,10 @@ const ( // DupProbability controls the probability with which we emit duplicate KV // events. DupProbability = "DUP_PROBABILITY" + // TenantID specifies the ID of the tenant we are ingesting data into. This + // allows the client to prefix the generated KVs with the appropriate tenant + // prefix. + TenantID = "TENANT_ID" // IngestionDatabaseID is the ID used in the generated table descriptor. IngestionDatabaseID = 50 /* defaultDB */ // IngestionTablePrefix is the prefix of the table name used in the generated @@ -79,6 +82,7 @@ type randomStreamConfig struct { kvsPerCheckpoint int numPartitions int dupProbability float64 + tenantID roachpb.TenantID } func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) { @@ -88,6 +92,7 @@ func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) { kvsPerCheckpoint: 100, numPartitions: 1, dupProbability: 0.5, + tenantID: roachpb.SystemTenantID, } var err error @@ -126,6 +131,14 @@ func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) { return c, err } } + + if tenantIDStr := streamURL.Query().Get(TenantID); tenantIDStr != "" { + id, err := strconv.Atoi(tenantIDStr) + if err != nil { + return c, err + } + c.tenantID = roachpb.MakeTenantID(uint64(id)) + } return c, nil } @@ -147,6 +160,7 @@ type randomStreamClient struct { // interceptors can be registered to peek at every event generated by this // client. interceptors []func(streamingccl.Event, streamingccl.PartitionAddress) + tableID int } } @@ -162,16 +176,18 @@ func newRandomStreamClient(streamURL *url.URL) (Client, error) { return nil, err } - return &randomStreamClient{ - config: streamConfig, - }, nil + client := randomStreamClient{config: streamConfig} + client.mu.Lock() + defer client.mu.Unlock() + client.mu.tableID = 52 + return &client, nil } -var testTableID = 52 - -func getNextTableID() int { - ret := testTableID - testTableID++ +func (m *randomStreamClient) getNextTableID() int { + m.mu.Lock() + defer m.mu.Unlock() + ret := m.mu.tableID + m.mu.tableID++ return ret } @@ -184,7 +200,7 @@ func (m *randomStreamClient) GetTopology( // Allocate table IDs and return one per partition address in the topology. for i := 0; i < m.config.numPartitions; i++ { - tableID := descpb.ID(getNextTableID()) + tableID := descpb.ID(m.getNextTableID()) partitionURI := url.URL{ Scheme: TestScheme, Host: strconv.Itoa(int(tableID)), @@ -207,7 +223,7 @@ func (m *randomStreamClient) getDescriptorAndNamespaceKVForTableID( IngestionDatabaseID, tableID, fmt.Sprintf(RandomStreamSchemaPlaceholder, tableName), - systemschema.JobsTable.GetPrivileges(), + &descpb.PrivilegeDescriptor{}, ) if err != nil { return nil, nil, err @@ -215,20 +231,24 @@ func (m *randomStreamClient) getDescriptorAndNamespaceKVForTableID( // Generate namespace entry. key := catalogkeys.NewTableKey(50, keys.PublicSchemaID, testTable.Name) + k := rekey(m.config.tenantID, key.Key(keys.TODOSQLCodec)) var value roachpb.Value value.SetInt(int64(testTable.GetID())) + value.InitChecksum(k) namespaceKV := roachpb.KeyValue{ - Key: key.Key(keys.TODOSQLCodec), + Key: k, Value: value, } // Generate descriptor entry. descKey := catalogkeys.MakeDescMetadataKey(keys.TODOSQLCodec, testTable.GetID()) + descKey = rekey(m.config.tenantID, descKey) descDesc := testTable.DescriptorProto() var descValue roachpb.Value if err := descValue.SetProto(descDesc); err != nil { panic(err) } + descValue.InitChecksum(descKey) descKV := roachpb.KeyValue{ Key: descKey, Value: descValue, @@ -280,6 +300,7 @@ func (m *randomStreamClient) ConsumePartition( resolvedTime := timeutil.Now() hlcResolvedTime := hlc.Timestamp{WallTime: resolvedTime.UnixNano()} event = streamingccl.MakeCheckpointEvent(hlcResolvedTime) + dupKVEvent = nil numKVEventsSinceLastResolved = 0 } else { @@ -328,6 +349,19 @@ func (m *randomStreamClient) ConsumePartition( return eventCh, nil } +func rekey(tenantID roachpb.TenantID, k roachpb.Key) roachpb.Key { + // Strip old prefix. + tenantPrefix := keys.MakeTenantPrefix(tenantID) + noTenantPrefix, _, err := keys.DecodeTenantPrefix(k) + if err != nil { + panic(err) + } + + // Prepend tenant prefix. + rekeyedKey := append(tenantPrefix, noTenantPrefix...) + return rekeyedKey +} + func (m *randomStreamClient) makeRandomKey( r *rand.Rand, tableDesc *tabledesc.Mutable, ) roachpb.KeyValue { @@ -338,6 +372,8 @@ func (m *randomStreamClient) makeRandomKey( } k = keys.MakeFamilyKey(k, uint32(tableDesc.Families[0].ID)) + k = rekey(m.config.tenantID, k) + // Create a value holding a random integer. valueDatum := tree.NewDInt(tree.DInt(r.Intn(m.config.valueRange))) valueBuf, err := rowenc.EncodeTableValue( diff --git a/pkg/ccl/streamingccl/streamingest/BUILD.bazel b/pkg/ccl/streamingccl/streamingest/BUILD.bazel index 9844eaa1ed24..04ca6fde0896 100644 --- a/pkg/ccl/streamingccl/streamingest/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamingest/BUILD.bazel @@ -8,10 +8,12 @@ go_library( "stream_ingestion_planning.go", "stream_ingestion_processor.go", "stream_ingestion_processor_planning.go", + "stream_ingestion_test_utils.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingest", visibility = ["//visibility:public"], deps = [ + "//pkg/ccl/changefeedccl/cdctest", "//pkg/ccl/storageccl", "//pkg/ccl/streamingccl", "//pkg/ccl/streamingccl/streamclient", @@ -38,6 +40,7 @@ go_library( "//pkg/util/log", "//pkg/util/protoutil", "//pkg/util/span", + "//pkg/util/syncutil", "//pkg/util/timeutil", "//pkg/util/tracing", "@com_github_cockroachdb_errors//:errors", @@ -55,11 +58,12 @@ go_test( "stream_ingestion_frontier_processor_test.go", "stream_ingestion_job_test.go", "stream_ingestion_processor_test.go", + "stream_ingestion_test.go", ], embed = [":streamingest"], deps = [ "//pkg/base", - "//pkg/ccl/changefeedccl/cdctest", + "//pkg/ccl/kvccl/kvtenantccl", "//pkg/ccl/storageccl", "//pkg/ccl/streamingccl", "//pkg/ccl/streamingccl/streamclient", @@ -68,6 +72,7 @@ go_test( "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/kv", + "//pkg/kv/kvserver", "//pkg/roachpb", "//pkg/security", "//pkg/security/securitytest", @@ -80,7 +85,9 @@ go_test( "//pkg/storage", "//pkg/testutils", "//pkg/testutils/distsqlutils", + "//pkg/testutils/jobutils", "//pkg/testutils/serverutils", + "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util/hlc", @@ -88,6 +95,7 @@ go_test( "//pkg/util/log", "//pkg/util/protoutil", "//pkg/util/randutil", + "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index b9dbfdf6da13..a586a8a8e7ee 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/errors" ) @@ -48,6 +49,10 @@ func ingest( // KVs. We can skip to ingesting after this resolved ts. Plumb the // initialHighwatermark to the ingestion processor spec based on what we read // from the job progress. + var initialHighWater hlc.Timestamp + if h := progress.GetHighWater(); h != nil && !h.IsEmpty() { + initialHighWater = *h + } evalCtx := execCtx.ExtendedEvalContext() dsp := execCtx.DistSQLPlanner() @@ -59,14 +64,12 @@ func ingest( // Construct stream ingestion processor specs. streamIngestionSpecs, streamIngestionFrontierSpec, err := distStreamIngestionPlanSpecs( - streamAddress, topology, nodes, jobID) + streamAddress, topology, nodes, initialHighWater) if err != nil { return err } // Plan and run the DistSQL flow. - // TODO: Updates from this flow need to feed back into the job to update the - // progress. err = distStreamIngest(ctx, execCtx, nodes, jobID, planCtx, dsp, streamIngestionSpecs, streamIngestionFrontierSpec) if err != nil { @@ -109,6 +112,14 @@ func (s *streamIngestionResumer) OnFailOrCancel(ctx context.Context, execCtx int } resolvedTime = *highWatermark } + + // TODO(adityamaru): If the job progress was not set then we should + // probably ClearRange. Take this into account when writing the ClearRange + // OnFailOrCancel(). + if resolvedTime.IsEmpty() { + return nil + } + var b kv.Batch b.AddRawRequest(&roachpb.RevertRangeRequest{ RequestHeader: roachpb.RequestHeader{ diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go index 1444cee45d4f..9560d657859c 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go @@ -27,7 +27,7 @@ import ( "github.com/stretchr/testify/require" ) -// TestStreamIngestionJobRollBack tests that the job rollsback the data to the +// TestStreamIngestionJobRollBack tests that the job rolls back the data to the // start time if there are no progress updates. This test should be expanded // after the job's progress field is updated as the job runs. func TestStreamIngestionJobRollBack(t *testing.T) { diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go index dc4b2bff4afe..2aaed0cf2b88 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go @@ -80,7 +80,7 @@ func ingestionPlanHook( prefix := keys.MakeTenantPrefix(ingestionStmt.Targets.Tenant) streamIngestionDetails := jobspb.StreamIngestionDetails{ StreamAddress: streamingccl.StreamAddress(from[0]), - Span: roachpb.Span{Key: prefix, EndKey: prefix.Next()}, + Span: roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()}, // TODO: Figure out what the initial ts should be. StartTime: hlc.Timestamp{}, } @@ -110,7 +110,10 @@ func ingestionPlanHook( return err } - return sj.Start(ctx) + if err := sj.Start(ctx); err != nil { + return err + } + return sj.AwaitCompletion(ctx) } return fn, utilccl.BulkJobExecutionResultHeader, nil, false, nil diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go index 7f75cd543136..77130f01a52f 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go @@ -30,7 +30,7 @@ func distStreamIngestionPlanSpecs( streamAddress streamingccl.StreamAddress, topology streamingccl.Topology, nodes []roachpb.NodeID, - jobID int64, + initialHighWater hlc.Timestamp, ) ([]*execinfrapb.StreamIngestionDataSpec, *execinfrapb.StreamIngestionFrontierSpec, error) { // For each stream partition in the topology, assign it to a node. @@ -63,8 +63,8 @@ func distStreamIngestionPlanSpecs( // Create a spec for the StreamIngestionFrontier processor on the coordinator // node. - // TODO: set HighWaterAtStart once the job progress logic has been hooked up. - streamIngestionFrontierSpec := &execinfrapb.StreamIngestionFrontierSpec{TrackedSpans: trackedSpans} + streamIngestionFrontierSpec := &execinfrapb.StreamIngestionFrontierSpec{ + HighWaterAtStart: initialHighWater, TrackedSpans: trackedSpans} return streamIngestionSpecs, streamIngestionFrontierSpec, nil } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go index f3648d724373..d96cb96b85c1 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go @@ -16,7 +16,6 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -37,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" @@ -196,7 +196,7 @@ func getPartitionSpanToTableID( func assertEqualKVs( t *testing.T, tc *testcluster.TestCluster, - streamValidator cdctest.StreamClientValidatorWrapper, + streamValidator *streamClientValidator, tableID int, partitionTimestamp hlc.Timestamp, ) { @@ -233,7 +233,7 @@ func assertEqualKVs( // All value ts should have been drained at this point, otherwise there is // a mismatch between the streamed and ingested data. require.Equal(t, 0, len(valueTimestampTuples)) - valueTimestampTuples, err = streamValidator.GetValuesForKeyBelowTimestamp( + valueTimestampTuples, err = streamValidator.getValuesForKeyBelowTimestamp( string(it.Key().Key), partitionTimestamp) require.NoError(t, err) } @@ -255,6 +255,19 @@ func assertEqualKVs( } } +func makeTestStreamURI( + valueRange, kvsPerResolved, numPartitions, tenantID int, + kvFrequency time.Duration, + dupProbability float64, +) string { + return "test:///" + "?VALUE_RANGE=" + strconv.Itoa(valueRange) + + "&EVENT_FREQUENCY=" + strconv.Itoa(int(kvFrequency)) + + "&KVS_PER_CHECKPOINT=" + strconv.Itoa(kvsPerResolved) + + "&NUM_PARTITIONS=" + strconv.Itoa(numPartitions) + + "&DUP_PROBABILITY=" + strconv.FormatFloat(dupProbability, 'f', -1, 32) + + "&TENANT_ID=" + strconv.Itoa(tenantID) +} + // TestRandomClientGeneration tests the ingestion processor against a random // stream workload. func TestRandomClientGeneration(t *testing.T) { @@ -263,17 +276,6 @@ func TestRandomClientGeneration(t *testing.T) { ctx := context.Background() - makeTestStreamURI := func( - valueRange, kvsPerResolved, numPartitions int, - kvFrequency time.Duration, dupProbability float64, - ) string { - return "test:///" + "?VALUE_RANGE=" + strconv.Itoa(valueRange) + - "&EVENT_FREQUENCY=" + strconv.Itoa(int(kvFrequency)) + - "&KVS_PER_CHECKPOINT=" + strconv.Itoa(kvsPerResolved) + - "&NUM_PARTITIONS=" + strconv.Itoa(numPartitions) + - "&DUP_PROBABILITY=" + strconv.FormatFloat(dupProbability, 'f', -1, 32) - } - tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{}) defer tc.Stopper().Stop(ctx) kvDB := tc.Server(0).DB() @@ -286,8 +288,8 @@ func TestRandomClientGeneration(t *testing.T) { kvFrequency := 50 * time.Nanosecond numPartitions := 4 dupProbability := 0.2 - streamAddr := makeTestStreamURI(valueRange, kvsPerResolved, numPartitions, kvFrequency, - dupProbability) + streamAddr := makeTestStreamURI(valueRange, kvsPerResolved, numPartitions, + int(roachpb.SystemTenantID.ToUint64()), kvFrequency, dupProbability) // The random client returns system and table data partitions. streamClient, err := streamclient.NewStreamClient(streamingccl.StreamAddress(streamAddr)) @@ -301,9 +303,10 @@ func TestRandomClientGeneration(t *testing.T) { ctx, cancel := context.WithCancel(ctx) // Cancel the flow after emitting 1000 checkpoint events from the client. - cancelAfterCheckpoints := makeCheckpointEventCounter(1000, cancel) - streamValidator := cdctest.NewStreamClientValidatorWrapper() - validator := registerValidator(streamValidator.GetValidator()) + mu := syncutil.Mutex{} + cancelAfterCheckpoints := makeCheckpointEventCounter(&mu, 1000, cancel) + streamValidator := newStreamClientValidator() + validator := registerValidatorWithClient(streamValidator) out, err := runStreamIngestionProcessor(ctx, t, kvDB, streamAddr, topo.Partitions, startTime, []func(streamingccl.Event, streamingccl.PartitionAddress){cancelAfterCheckpoints, validator}, nil /* mockClient */) @@ -350,7 +353,7 @@ func TestRandomClientGeneration(t *testing.T) { } // Ensure that no errors were reported to the validator. - for _, failure := range streamValidator.GetValidator().Failures() { + for _, failure := range streamValidator.Failures() { t.Error(failure) } @@ -424,21 +427,21 @@ func runStreamIngestionProcessor( return out, err } -func registerValidator( - validator cdctest.Validator, +func registerValidatorWithClient( + validator *streamClientValidator, ) func(event streamingccl.Event, pa streamingccl.PartitionAddress) { return func(event streamingccl.Event, pa streamingccl.PartitionAddress) { switch event.Type() { case streamingccl.CheckpointEvent: resolvedTS := *event.GetResolved() - err := validator.NoteResolved(string(pa), resolvedTS) + err := validator.noteResolved(string(pa), resolvedTS) if err != nil { panic(err.Error()) } case streamingccl.KVEvent: kv := *event.GetKV() - err := validator.NoteRow(string(pa), string(kv.Key), string(kv.Value.RawBytes), + err := validator.noteRow(string(pa), string(kv.Key), string(kv.Value.RawBytes), kv.Value.Timestamp) if err != nil { panic(err.Error()) @@ -450,14 +453,18 @@ func registerValidator( // makeCheckpointEventCounter runs f after seeing `threshold` number of // checkpoint events. func makeCheckpointEventCounter( - threshold int, f func(), + mu *syncutil.Mutex, threshold int, f func(), ) func(streamingccl.Event, streamingccl.PartitionAddress) { + mu.Lock() + defer mu.Unlock() numCheckpointEventsGenerated := 0 return func(event streamingccl.Event, _ streamingccl.PartitionAddress) { + mu.Lock() + defer mu.Unlock() switch event.Type() { case streamingccl.CheckpointEvent: numCheckpointEventsGenerated++ - if numCheckpointEventsGenerated > threshold { + if numCheckpointEventsGenerated == threshold { f() } } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go new file mode 100644 index 000000000000..90cca8516301 --- /dev/null +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_test.go @@ -0,0 +1,245 @@ +// Copyright 2020 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streamingest + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" + "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// TestStreamIngestionJobWithRandomClient creates a stream ingestion job that is +// fed KVs from the random stream client. After receiving a certain number of +// resolved timestamp events the test cancels the job to tear down the flow, and +// rollback to the latest resolved frontier timestamp. +// The test scans the KV store to compare all MVCC KVs against the relevant +// streamed KV Events, thereby ensuring that we end up in a consistent state. +func TestStreamIngestionJobWithRandomClient(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + skip.UnderRaceWithIssue(t, 60710) + + ctx := context.Background() + defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond) + + cancelJobCh := make(chan struct{}) + threshold := 10 + mu := syncutil.Mutex{} + cancelJobAfterCheckpoints := makeCheckpointEventCounter(&mu, threshold, func() { + cancelJobCh <- struct{}{} + }) + streamValidator := newStreamClientValidator() + registerValidator := registerValidatorWithClient(streamValidator) + knobs := base.TestingKnobs{ + DistSQL: &execinfra.TestingKnobs{StreamIngestionTestingKnobs: &sql.StreamIngestionTestingKnobs{ + Interceptors: []func(event streamingccl.Event, pa streamingccl.PartitionAddress){cancelJobAfterCheckpoints, + registerValidator}, + }, + }, + } + serverArgs := base.TestServerArgs{} + serverArgs.Knobs = knobs + + var allowResponse chan struct{} + params := base.TestClusterArgs{ServerArgs: serverArgs} + params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{ + TestingResponseFilter: jobutils.BulkOpResponseFilter(&allowResponse), + } + + numNodes := 3 + tc := testcluster.StartTestCluster(t, numNodes, params) + defer tc.Stopper().Stop(ctx) + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + conn := tc.Conns[0] + + tenantID := 10 + valueRange := 100 + kvsPerResolved := 200 + kvFrequency := 50 * time.Nanosecond + numPartitions := 2 + dupProbability := 0.2 + streamAddr := makeTestStreamURI(valueRange, kvsPerResolved, numPartitions, tenantID, kvFrequency, + dupProbability) + + // Start the ingestion stream and wait for at least one AddSSTable to ensure the job is running. + allowResponse = make(chan struct{}) + errCh := make(chan error) + defer close(errCh) + _, err := conn.Exec(`SET CLUSTER SETTING bulkio.stream_ingestion.minimum_flush_interval= '0.0005ms'`) + require.NoError(t, err) + query := fmt.Sprintf(`RESTORE TENANT 10 FROM REPLICATION STREAM FROM '%s'`, streamAddr) + go func() { + _, err := conn.Exec(query) + errCh <- err + }() + select { + case allowResponse <- struct{}{}: + case err := <-errCh: + t.Fatalf("%s: query returned before expected: %s", err, query) + } + close(allowResponse) + + var streamJobID string + testutils.SucceedsSoon(t, func() error { + row := conn.QueryRow("SELECT id FROM system.jobs ORDER BY created DESC LIMIT 1") + return row.Scan(&streamJobID) + }) + + // Wait for the job to signal that it is ready to be canceled. + <-cancelJobCh + close(cancelJobCh) + + // Canceling the job should shutdown the ingestion processors via a context + // cancellation, and subsequently rollback data above our frontier + // timestamp. + // TODO(adityamaru): Change this to cutover once we have cutover logic in + // place. + _, err = conn.Exec(`CANCEL JOB $1`, streamJobID) + require.NoError(t, err) + // We expect the statement to fail. + if err := <-errCh; err == nil { + t.Fatal(err) + } + + // Wait for the ingestion job to have been canceled. + testutils.SucceedsSoon(t, func() error { + var status string + sqlDB.QueryRow(t, `SELECT status FROM system.jobs WHERE id = $1`, streamJobID).Scan(&status) + if jobs.Status(status) != jobs.StatusCanceled { + return errors.New("job not in canceled state") + } + return nil + }) + + progress := &jobspb.Progress{} + var streamProgress []byte + sqlDB.QueryRow( + t, `SELECT progress FROM system.jobs WHERE id=$1`, streamJobID, + ).Scan(&streamProgress) + + if err := protoutil.Unmarshal(streamProgress, progress); err != nil { + t.Fatal("cannot unmarshal job progress from system.jobs") + } + highWaterTimestamp := progress.GetHighWater() + if highWaterTimestamp == nil { + t.Fatal(errors.New("expected the highWaterTimestamp written to progress to be non-nil")) + } + ts := *highWaterTimestamp + require.True(t, !ts.IsEmpty()) + + // Check the validator for any failures. + for _, err := range streamValidator.failures() { + t.Fatal(err) + } + + tenantPrefix := keys.MakeTenantPrefix(roachpb.MakeTenantID(uint64(tenantID))) + maxIngestedTS := assertExactlyEqualKVs(t, tc, streamValidator, ts, tenantPrefix) + + //Sanity check that the max ts in the store is less than the ts stored in the + //job progress. + require.True(t, maxIngestedTS.LessEq(ts)) +} + +// assertExactlyEqualKVs runs an incremental iterator on the underlying store. +// At every key the method polls the `streamValidator` to return the KVEvents +// for that particular key, and have a timestamp less than equal to the +// `frontierTimestamp`. The key and value must be identical between the two. +func assertExactlyEqualKVs( + t *testing.T, + tc *testcluster.TestCluster, + streamValidator *streamClientValidator, + frontierTimestamp hlc.Timestamp, + tenantPrefix roachpb.Key, +) hlc.Timestamp { + // Iterate over the store. + store := tc.GetFirstStoreFromServer(t, 0) + it := store.Engine().NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{ + LowerBound: tenantPrefix, + UpperBound: tenantPrefix.PrefixEnd(), + }) + defer it.Close() + var prevKey roachpb.Key + var valueTimestampTuples []roachpb.KeyValue + var err error + var maxKVTimestampSeen hlc.Timestamp + var matchingKVs int + for it.SeekGE(storage.MVCCKey{}); ; it.Next() { + if ok, err := it.Valid(); !ok { + if err != nil { + t.Fatal(err) + } + break + } + if maxKVTimestampSeen.Less(it.Key().Timestamp) { + maxKVTimestampSeen = it.Key().Timestamp + } + newKey := (prevKey != nil && !it.Key().Key.Equal(prevKey)) || prevKey == nil + prevKey = it.Key().Key + + if newKey { + // All value ts should have been drained at this point, otherwise there is + // a mismatch between the streamed and ingested data. + require.Equal(t, 0, len(valueTimestampTuples)) + valueTimestampTuples, err = streamValidator.getValuesForKeyBelowTimestamp( + string(it.Key().Key), frontierTimestamp) + require.NoError(t, err) + } + + // If there are no values stored in the validator to match against the + // current key then we skip to the next key. + if len(valueTimestampTuples) == 0 { + continue + } + + require.Greater(t, len(valueTimestampTuples), 0) + // Since the iterator goes from latest to older versions, we compare + // starting from the end of the slice that is sorted by timestamp. + latestVersionInChain := valueTimestampTuples[len(valueTimestampTuples)-1] + require.Equal(t, roachpb.KeyValue{ + Key: it.Key().Key, + Value: roachpb.Value{ + RawBytes: it.Value(), + Timestamp: it.Key().Timestamp, + }, + }, latestVersionInChain) + matchingKVs++ + // Truncate the latest version which we just checked against in preparation + // for the next iteration. + valueTimestampTuples = valueTimestampTuples[0 : len(valueTimestampTuples)-1] + } + // Sanity check that we have compared a non-zero number of KVs. + require.Greater(t, matchingKVs, 0) + return maxKVTimestampSeen +} diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_test_utils.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_test_utils.go new file mode 100644 index 000000000000..0caa7966fc1e --- /dev/null +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_test_utils.go @@ -0,0 +1,65 @@ +// Copyright 2020 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streamingest + +import ( + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" +) + +// streamClientValidatorWrapper wraps a Validator and exposes additional methods +// used by stream ingestion to check for correctness. +type streamClientValidator struct { + cdctest.StreamValidator + + mu syncutil.Mutex +} + +// newStreamClientValidator returns a wrapped Validator, that can be used +// to validate the events emitted by the cluster to cluster streaming client. +// The wrapper currently only "wraps" an orderValidator, but can be built out +// to utilize other Validator's. +// The wrapper also allows querying the orderValidator to retrieve streamed +// events from an in-memory store. +func newStreamClientValidator() *streamClientValidator { + ov := cdctest.NewStreamOrderValidator() + return &streamClientValidator{ + StreamValidator: ov, + } +} + +func (sv *streamClientValidator) noteRow( + partition string, key, value string, updated hlc.Timestamp, +) error { + sv.mu.Lock() + defer sv.mu.Unlock() + return sv.NoteRow(partition, key, value, updated) +} + +func (sv *streamClientValidator) noteResolved(partition string, resolved hlc.Timestamp) error { + sv.mu.Lock() + defer sv.mu.Unlock() + return sv.NoteResolved(partition, resolved) +} + +func (sv *streamClientValidator) failures() []string { + sv.mu.Lock() + defer sv.mu.Unlock() + return sv.Failures() +} + +func (sv *streamClientValidator) getValuesForKeyBelowTimestamp( + key string, timestamp hlc.Timestamp, +) ([]roachpb.KeyValue, error) { + sv.mu.Lock() + defer sv.mu.Unlock() + return sv.GetValuesForKeyBelowTimestamp(key, timestamp) +} diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index e5a483903139..f5b31c1db79a 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -208,6 +208,7 @@ type proposerRaft interface { func (b *propBuf) Init(p proposer) { b.p = p b.full.L = p.rlocker() + b.liBase = p.leaseAppliedIndex() } // Len returns the number of proposals currently in the buffer. diff --git a/pkg/sql/app_stats.go b/pkg/sql/app_stats.go index 0e192675a05f..c2e9acaf9ea6 100644 --- a/pkg/sql/app_stats.go +++ b/pkg/sql/app_stats.go @@ -454,7 +454,7 @@ func (a *appStats) recordTransaction( if collectedExecStats { s.mu.data.ExecStats.Count++ s.mu.data.ExecStats.NetworkBytes.Record(s.mu.data.ExecStats.Count, float64(execStats.NetworkBytesSent)) - s.mu.data.ExecStats.NetworkBytes.Record(s.mu.data.ExecStats.Count, float64(execStats.NetworkBytesSent)) + s.mu.data.ExecStats.MaxMemUsage.Record(s.mu.data.ExecStats.Count, float64(execStats.MaxMemUsage)) s.mu.data.ExecStats.ContentionTime.Record(s.mu.data.ExecStats.Count, execStats.ContentionTime.Seconds()) } } diff --git a/pkg/sql/instrumentation_test.go b/pkg/sql/instrumentation_test.go index e3482eefad85..568aab4a95f4 100644 --- a/pkg/sql/instrumentation_test.go +++ b/pkg/sql/instrumentation_test.go @@ -80,6 +80,7 @@ func TestSampledStatsCollection(t *testing.T) { require.Equal(t, int64(2), stats.mu.data.Count, "expected to have collected two sets of general stats") require.Equal(t, int64(1), stats.mu.data.ExecStatCollectionCount, "expected to have collected exactly one set of execution stats") require.Greater(t, stats.mu.data.RowsRead.Mean, float64(0), "expected statement to have read at least one row") + require.Greater(t, stats.mu.data.MaxMemUsage.Mean, float64(0), "expected statement to have used RAM") }) t.Run("ExplicitTxn", func(t *testing.T) { @@ -107,12 +108,14 @@ func TestSampledStatsCollection(t *testing.T) { require.Equal(t, int64(2), aggStats.mu.data.Count, "expected to have collected two sets of general stats") require.Equal(t, int64(1), aggStats.mu.data.ExecStatCollectionCount, "expected to have collected exactly one set of execution stats") require.Greater(t, aggStats.mu.data.RowsRead.Mean, float64(0), "expected statement to have read at least one row") + require.Greater(t, aggStats.mu.data.MaxMemUsage.Mean, float64(0), "expected statement to have used RAM") selectStats.mu.Lock() defer selectStats.mu.Unlock() require.Equal(t, int64(2), selectStats.mu.data.Count, "expected to have collected two sets of general stats") require.Equal(t, int64(1), selectStats.mu.data.ExecStatCollectionCount, "expected to have collected exactly one set of execution stats") require.Greater(t, selectStats.mu.data.RowsRead.Mean, float64(0), "expected statement to have read at least one row") + require.Greater(t, selectStats.mu.data.MaxMemUsage.Mean, float64(0), "expected statement to have used RAM") key := util.MakeFNV64() key.Add(uint64(aggID)) @@ -134,5 +137,6 @@ func TestSampledStatsCollection(t *testing.T) { txStats.mu.data.RowsRead.Mean, "expected txn to report having read the sum of rows read in both its statements", ) + require.Greater(t, txStats.mu.data.ExecStats.MaxMemUsage.Mean, float64(0), "expected MaxMemUsage to be set on the txn") }) } diff --git a/pkg/sql/parser/parse_test.go b/pkg/sql/parser/parse_test.go index 8e0b644b50fa..02e9219a602a 100644 --- a/pkg/sql/parser/parse_test.go +++ b/pkg/sql/parser/parse_test.go @@ -1730,6 +1730,14 @@ func TestParse(t *testing.T) { {`RESTORE TENANT 123 FROM REPLICATION STREAM FROM 'bar'`}, {`RESTORE TENANT 123 FROM REPLICATION STREAM FROM $1`}, + // Currently, we only support TENANT as a target. We have grammar rules for + // all targets supported by RESTORE but these will error out during the + // planning phase. + {`RESTORE TABLE foo FROM REPLICATION STREAM FROM 'bar'`}, + {`RESTORE TABLE foo FROM REPLICATION STREAM FROM $1`}, + {`RESTORE DATABASE foodb FROM REPLICATION STREAM FROM 'baz'`}, + {`RESTORE DATABASE foodb FROM REPLICATION STREAM FROM $1`}, + {`BACKUP TABLE foo TO 'bar' WITH revision_history, detached`}, {`RESTORE TABLE foo FROM 'bar' WITH skip_missing_foreign_keys, skip_missing_sequences, detached`},