diff --git a/pkg/ccl/changefeedccl/alter_changefeed_stmt.go b/pkg/ccl/changefeedccl/alter_changefeed_stmt.go index be0f447f039d..7baba9cd3605 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_stmt.go @@ -102,15 +102,17 @@ func alterChangefeedPlanHook( targetDescs = append(targetDescs, descs...) } - newTargets, err := getTargets(ctx, p, targetDescs, details.Opts) + newTargets, newTables, err := getTargetsAndTables(ctx, p, targetDescs, details.Opts) if err != nil { return err } // add old targets - for id, target := range details.Targets { - newTargets[id] = target + for id, table := range details.Tables { + newTables[id] = table } - details.Targets = newTargets + details.Tables = newTables + details.TargetSpecifications = append(details.TargetSpecifications, newTargets...) + } if opts.DropTargets != nil { @@ -129,12 +131,21 @@ func alterChangefeedPlanHook( if err := p.CheckPrivilege(ctx, desc, privilege.SELECT); err != nil { return err } - delete(details.Targets, table.GetID()) + delete(details.Tables, table.GetID()) } } + + newTargetSpecifications := make([]jobspb.ChangefeedTargetSpecification, len(details.TargetSpecifications)-len(opts.DropTargets)) + for _, ts := range details.TargetSpecifications { + if _, stillThere := details.Tables[ts.TableID]; stillThere { + newTargetSpecifications = append(newTargetSpecifications, ts) + } + } + details.TargetSpecifications = newTargetSpecifications + } - if len(details.Targets) == 0 { + if len(details.Tables) == 0 { return errors.Errorf("cannot drop all targets for changefeed job %d", jobID) } @@ -152,7 +163,7 @@ func alterChangefeedPlanHook( } var targets tree.TargetList - for _, target := range details.Targets { + for _, target := range details.Tables { targetName := tree.MakeTableNameFromPrefix(tree.ObjectNamePrefix{}, tree.Name(target.StatementTimeName)) targets.Tables = append(targets.Tables, &targetName) } diff --git a/pkg/ccl/changefeedccl/bench_test.go b/pkg/ccl/changefeedccl/bench_test.go index 910853144374..90af0d8cae48 100644 --- a/pkg/ccl/changefeedccl/bench_test.go +++ b/pkg/ccl/changefeedccl/bench_test.go @@ -196,7 +196,7 @@ func createBenchmarkChangefeed( tableDesc := desctestutils.TestingGetPublicTableDescriptor(s.DB(), keys.SystemSQLCodec, database, table) spans := []roachpb.Span{tableDesc.PrimaryIndexSpan(keys.SystemSQLCodec)} details := jobspb.ChangefeedDetails{ - Targets: jobspb.ChangefeedTargets{tableDesc.GetID(): jobspb.ChangefeedTarget{ + Tables: jobspb.ChangefeedTargets{tableDesc.GetID(): jobspb.ChangefeedTargetTable{ StatementTimeName: tableDesc.GetName(), }}, Opts: map[string]string{ @@ -204,7 +204,7 @@ func createBenchmarkChangefeed( }, } initialHighWater := hlc.Timestamp{} - encoder, err := makeJSONEncoder(details.Opts, details.Targets) + encoder, err := makeJSONEncoder(details.Opts, AllTargets(details)) if err != nil { return nil, nil, err } @@ -228,7 +228,7 @@ func createBenchmarkChangefeed( Clock: feedClock, Gossip: gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)), Spans: spans, - Targets: details.Targets, + Targets: AllTargets(details), Writer: buf, Metrics: &metrics.KVFeedMetrics, MM: mm, diff --git a/pkg/ccl/changefeedccl/changefeed.go b/pkg/ccl/changefeedccl/changefeed.go index a4acc8557a39..a9a4bb4aebc2 100644 --- a/pkg/ccl/changefeedccl/changefeed.go +++ b/pkg/ccl/changefeedccl/changefeed.go @@ -55,7 +55,7 @@ func createProtectedTimestampRecord( ctx context.Context, codec keys.SQLCodec, jobID jobspb.JobID, - targets jobspb.ChangefeedTargets, + targets []jobspb.ChangefeedTargetSpecification, resolved hlc.Timestamp, progress *jobspb.ChangefeedProgress, ) *ptpb.Record { @@ -69,19 +69,21 @@ func createProtectedTimestampRecord( jobsprotectedts.Jobs, targetToProtect) } -func makeTargetToProtect(targets jobspb.ChangefeedTargets) *ptpb.Target { +func makeTargetToProtect(targets []jobspb.ChangefeedTargetSpecification) *ptpb.Target { // NB: We add 1 because we're also going to protect system.descriptors. // We protect system.descriptors because a changefeed needs all of the history // of table descriptors to version data. tablesToProtect := make(descpb.IDs, 0, len(targets)+1) - for t := range targets { - tablesToProtect = append(tablesToProtect, t) + for _, t := range targets { + tablesToProtect = append(tablesToProtect, t.TableID) } tablesToProtect = append(tablesToProtect, keys.DescriptorTableID) return ptpb.MakeSchemaObjectsTarget(tablesToProtect) } -func makeSpansToProtect(codec keys.SQLCodec, targets jobspb.ChangefeedTargets) []roachpb.Span { +func makeSpansToProtect( + codec keys.SQLCodec, targets []jobspb.ChangefeedTargetSpecification, +) []roachpb.Span { // NB: We add 1 because we're also going to protect system.descriptors. // We protect system.descriptors because a changefeed needs all of the history // of table descriptors to version data. @@ -93,8 +95,8 @@ func makeSpansToProtect(codec keys.SQLCodec, targets jobspb.ChangefeedTargets) [ EndKey: tablePrefix.PrefixEnd(), }) } - for t := range targets { - addTablePrefix(uint32(t)) + for _, t := range targets { + addTablePrefix(uint32(t.TableID)) } addTablePrefix(keys.DescriptorTableID) return spansToProtect diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index 2a0142dac991..abec5d01fc2d 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -104,7 +104,7 @@ func distChangefeedFlow( spansTS = spansTS.Next() } var err error - trackedSpans, err = fetchSpansForTargets(ctx, execCfg, details.Targets, spansTS) + trackedSpans, err = fetchSpansForTargets(ctx, execCfg, AllTargets(details), spansTS) if err != nil { return err } @@ -121,7 +121,7 @@ func distChangefeedFlow( func fetchSpansForTargets( ctx context.Context, execCfg *sql.ExecutorConfig, - targets jobspb.ChangefeedTargets, + targets []jobspb.ChangefeedTargetSpecification, ts hlc.Timestamp, ) ([]roachpb.Span, error) { var spans []roachpb.Span @@ -133,10 +133,10 @@ func fetchSpansForTargets( return err } // Note that all targets are currently guaranteed to be tables. - for tableID := range targets { + for _, table := range targets { flags := tree.ObjectLookupFlagsWithRequired() flags.AvoidLeased = true - tableDesc, err := descriptors.GetImmutableTableByID(ctx, txn, tableID, flags) + tableDesc, err := descriptors.GetImmutableTableByID(ctx, txn, table.TableID, flags) if err != nil { return err } diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 539d9131a32c..e78fcaa85df1 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -159,7 +159,7 @@ func newChangeAggregatorProcessor( } var err error - if ca.encoder, err = getEncoder(ca.spec.Feed.Opts, ca.spec.Feed.Targets); err != nil { + if ca.encoder, err = getEncoder(ca.spec.Feed.Opts, AllTargets(ca.spec.Feed)); err != nil { return nil, err } @@ -345,7 +345,7 @@ func (ca *changeAggregator) makeKVFeedCfg( if schemaChangePolicy == changefeedbase.OptSchemaChangePolicyIgnore { sf = schemafeed.DoNothingSchemaFeed } else { - sf = schemafeed.New(ctx, cfg, schemaChangeEvents, ca.spec.Feed.Targets, + sf = schemafeed.New(ctx, cfg, schemaChangeEvents, AllTargets(ca.spec.Feed), initialHighWater, &ca.metrics.SchemaFeedMetrics) } @@ -358,7 +358,7 @@ func (ca *changeAggregator) makeKVFeedCfg( Gossip: cfg.Gossip, Spans: spans, BackfillCheckpoint: ca.spec.Checkpoint.Spans, - Targets: ca.spec.Feed.Targets, + Targets: AllTargets(ca.spec.Feed), Metrics: &ca.metrics.KVFeedMetrics, OnBackfillCallback: ca.sliMetrics.getBackfillCallback(), MM: ca.kvFeedMemMon, @@ -1085,7 +1085,7 @@ func newChangeFrontierProcessor( cf.freqEmitResolved = emitNoResolved } - if cf.encoder, err = getEncoder(spec.Feed.Opts, spec.Feed.Targets); err != nil { + if cf.encoder, err = getEncoder(spec.Feed.Opts, AllTargets(spec.Feed)); err != nil { return nil, err } @@ -1454,7 +1454,7 @@ func (cf *changeFrontier) manageProtectedTimestamps( recordID := progress.ProtectedTimestampRecord if recordID == uuid.Nil { - ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, cf.spec.Feed.Targets, highWater, progress) + ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), highWater, progress) if err := pts.Protect(ctx, txn, ptr); err != nil { return err } diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index b88eca4886c3..745d350deaa5 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -181,16 +181,17 @@ func changefeedPlanHook( return err } - targets, err := getTargets(ctx, p, targetDescs, opts) + targets, tables, err := getTargetsAndTables(ctx, p, targetDescs, opts) if err != nil { return err } details := jobspb.ChangefeedDetails{ - Targets: targets, - Opts: opts, - SinkURI: sinkURI, - StatementTime: statementTime, + Tables: tables, + Opts: opts, + SinkURI: sinkURI, + StatementTime: statementTime, + TargetSpecifications: targets, } progress := jobspb.Progress{ Progress: &jobspb.Progress_HighWater{}, @@ -243,7 +244,7 @@ func changefeedPlanHook( return err } - if _, err := getEncoder(details.Opts, details.Targets); err != nil { + if _, err := getEncoder(details.Opts, AllTargets(details)); err != nil { return err } @@ -269,7 +270,7 @@ func changefeedPlanHook( } telemetry.Count(`changefeed.create.sink.` + telemetrySink) telemetry.Count(`changefeed.create.format.` + details.Opts[changefeedbase.OptFormat]) - telemetry.CountBucketed(`changefeed.create.num_tables`, int64(len(targets))) + telemetry.CountBucketed(`changefeed.create.num_tables`, int64(len(tables))) if scope, ok := opts[changefeedbase.OptMetricsScope]; ok { if err := utilccl.CheckEnterpriseEnabled( @@ -332,7 +333,7 @@ func changefeedPlanHook( var protectedTimestampID uuid.UUID codec := p.ExecCfg().Codec if shouldProtectTimestamps(codec) { - ptr = createProtectedTimestampRecord(ctx, codec, jobID, details.Targets, statementTime, progress.GetChangefeed()) + ptr = createProtectedTimestampRecord(ctx, codec, jobID, AllTargets(details), statementTime, progress.GetChangefeed()) protectedTimestampID = ptr.ID.GetUUID() } @@ -470,35 +471,41 @@ func getTableDescriptors( return targetDescs, err } -func getTargets( +func getTargetsAndTables( ctx context.Context, p sql.PlanHookState, targetDescs []catalog.Descriptor, opts map[string]string, -) (jobspb.ChangefeedTargets, error) { - targets := make(jobspb.ChangefeedTargets, len(targetDescs)) +) ([]jobspb.ChangefeedTargetSpecification, jobspb.ChangefeedTargets, error) { + tables := make(jobspb.ChangefeedTargets, len(targetDescs)) + targets := make([]jobspb.ChangefeedTargetSpecification, len(targetDescs)) for _, desc := range targetDescs { if table, isTable := desc.(catalog.TableDescriptor); isTable { if err := p.CheckPrivilege(ctx, desc, privilege.SELECT); err != nil { - return nil, err + return nil, nil, err } _, qualified := opts[changefeedbase.OptFullTableName] name, err := getChangefeedTargetName(ctx, table, p.ExecCfg(), p.ExtendedEvalContext().Txn, qualified) if err != nil { - return nil, err + return nil, nil, err } - targets[table.GetID()] = jobspb.ChangefeedTarget{ + tables[table.GetID()] = jobspb.ChangefeedTargetTable{ StatementTimeName: name, } + ts := jobspb.ChangefeedTargetSpecification{ + Type: jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY, + TableID: table.GetID(), + } + targets = append(targets, ts) if err := changefeedbase.ValidateTable(targets, table); err != nil { - return nil, err + return nil, nil, err } - for _, warning := range changefeedbase.WarningsForTable(targets, table, opts) { + for _, warning := range changefeedbase.WarningsForTable(tables, table, opts) { p.BufferClientNotice(ctx, pgnotice.Newf("%s", warning)) } } } - return targets, nil + return targets, tables, nil } func validateSink( @@ -953,3 +960,28 @@ func getChangefeedTargetName( } return desc.GetName(), nil } + +// AllTargets gets all the targets listed in a ChangefeedDetails, +// from the statement time name map in old protos +// or the TargetSpecifications in new ones. +func AllTargets(cd jobspb.ChangefeedDetails) (targets []jobspb.ChangefeedTargetSpecification) { + //TODO: Use a version gate for this once we have CDC version gates + if len(cd.TargetSpecifications) > 0 { + for _, ts := range cd.TargetSpecifications { + if ts.TableID > 0 { + ts.StatementTimeName = cd.Tables[ts.TableID].StatementTimeName + targets = append(targets, ts) + } + } + } else { + for id, t := range cd.Tables { + ct := jobspb.ChangefeedTargetSpecification{ + Type: jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY, + TableID: id, + StatementTimeName: t.StatementTimeName, + } + targets = append(targets, ct) + } + } + return +} diff --git a/pkg/ccl/changefeedccl/changefeedbase/validate.go b/pkg/ccl/changefeedccl/changefeedbase/validate.go index 3027919bf8a0..056cab49ed66 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/validate.go +++ b/pkg/ccl/changefeedccl/changefeedbase/validate.go @@ -15,9 +15,19 @@ import ( ) // ValidateTable validates that a table descriptor can be watched by a CHANGEFEED. -func ValidateTable(targets jobspb.ChangefeedTargets, tableDesc catalog.TableDescriptor) error { - t, ok := targets[tableDesc.GetID()] - if !ok { +func ValidateTable( + targets []jobspb.ChangefeedTargetSpecification, tableDesc catalog.TableDescriptor, +) error { + var t jobspb.ChangefeedTargetSpecification + var found bool + for _, cts := range targets { + if cts.TableID == tableDesc.GetID() { + t = cts + found = true + break + } + } + if !found { return errors.Errorf(`unwatched table: %s`, tableDesc.GetName()) } diff --git a/pkg/ccl/changefeedccl/encoder.go b/pkg/ccl/changefeedccl/encoder.go index e619f98f575a..7aef94121f47 100644 --- a/pkg/ccl/changefeedccl/encoder.go +++ b/pkg/ccl/changefeedccl/encoder.go @@ -82,7 +82,9 @@ type Encoder interface { EncodeResolvedTimestamp(context.Context, string, hlc.Timestamp) ([]byte, error) } -func getEncoder(opts map[string]string, targets jobspb.ChangefeedTargets) (Encoder, error) { +func getEncoder( + opts map[string]string, targets []jobspb.ChangefeedTargetSpecification, +) (Encoder, error) { switch changefeedbase.FormatType(opts[changefeedbase.OptFormat]) { case ``, changefeedbase.OptFormatJSON: return makeJSONEncoder(opts, targets) @@ -102,7 +104,7 @@ func getEncoder(opts map[string]string, targets jobspb.ChangefeedTargets) (Encod type jsonEncoder struct { updatedField, mvccTimestampField, beforeField, wrapped, keyOnly, keyInValue, topicInValue bool - targets jobspb.ChangefeedTargets + targets []jobspb.ChangefeedTargetSpecification alloc tree.DatumAlloc buf bytes.Buffer virtualColumnVisibility string @@ -111,7 +113,7 @@ type jsonEncoder struct { var _ Encoder = &jsonEncoder{} func makeJSONEncoder( - opts map[string]string, targets jobspb.ChangefeedTargets, + opts map[string]string, targets []jobspb.ChangefeedTargetSpecification, ) (*jsonEncoder, error) { e := &jsonEncoder{ targets: targets, @@ -184,12 +186,13 @@ func (e *jsonEncoder) encodeKeyRaw(row encodeRow) ([]interface{}, error) { func (e *jsonEncoder) encodeTopicRaw(row encodeRow) (interface{}, error) { descID := row.tableDesc.GetID() // use the target list since row.tableDesc.GetName() will not have fully qualified names - topicName, ok := e.targets[descID] - if !ok { - return nil, fmt.Errorf("table with name %s and descriptor ID %d not found in changefeed target list", - row.tableDesc.GetName(), descID) + for _, topic := range e.targets { + if topic.TableID == descID { + return topic.StatementTimeName, nil + } } - return topicName.StatementTimeName, nil + return nil, fmt.Errorf("table with name %s and descriptor ID %d not found in changefeed target list", + row.tableDesc.GetName(), descID) } // EncodeValue implements the Encoder interface. @@ -328,8 +331,8 @@ type confluentAvroEncoder struct { schemaRegistry schemaRegistry schemaPrefix string updatedField, beforeField, keyOnly bool - targets jobspb.ChangefeedTargets virtualColumnVisibility string + targets []jobspb.ChangefeedTargetSpecification keyCache *cache.UnorderedCache // [tableIDAndVersion]confluentRegisteredKeySchema valueCache *cache.UnorderedCache // [tableIDAndVersionPair]confluentRegisteredEnvelopeSchema @@ -367,7 +370,7 @@ var encoderCacheConfig = cache.Config{ } func newConfluentAvroEncoder( - opts map[string]string, targets jobspb.ChangefeedTargets, + opts map[string]string, targets []jobspb.ChangefeedTargetSpecification, ) (*confluentAvroEncoder, error) { e := &confluentAvroEncoder{ schemaPrefix: opts[changefeedbase.OptAvroSchemaPrefix], @@ -422,7 +425,13 @@ func newConfluentAvroEncoder( // Get the raw SQL-formatted string for a table name // and apply full_table_name and avro_schema_prefix options func (e *confluentAvroEncoder) rawTableName(desc catalog.TableDescriptor) string { - return e.schemaPrefix + e.targets[desc.GetID()].StatementTimeName + for _, spec := range e.targets { + if spec.TableID == desc.GetID() { + return e.schemaPrefix + spec.StatementTimeName + } + } + //TODO (zinger): error here + return desc.GetName() } // EncodeKey implements the Encoder interface. diff --git a/pkg/ccl/changefeedccl/encoder_test.go b/pkg/ccl/changefeedccl/encoder_test.go index c85c34bb6237..69b0fb8407b2 100644 --- a/pkg/ccl/changefeedccl/encoder_test.go +++ b/pkg/ccl/changefeedccl/encoder_test.go @@ -209,11 +209,12 @@ func TestEncoders(t *testing.T) { t.Fatalf(`unknown format: %s`, o[changefeedbase.OptFormat]) } - target := jobspb.ChangefeedTarget{ + target := jobspb.ChangefeedTargetSpecification{ + Type: jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY, + TableID: tableDesc.GetID(), StatementTimeName: tableDesc.GetName(), } - targets := jobspb.ChangefeedTargets{} - targets[tableDesc.GetID()] = target + targets := []jobspb.ChangefeedTargetSpecification{target} e, err := getEncoder(o, targets) if len(expected.err) > 0 { @@ -360,11 +361,12 @@ func TestAvroEncoderWithTLS(t *testing.T) { return string(avroToJSON(t, reg, r)) } - target := jobspb.ChangefeedTarget{ + target := jobspb.ChangefeedTargetSpecification{ + Type: jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY, + TableID: tableDesc.GetID(), StatementTimeName: tableDesc.GetName(), } - targets := jobspb.ChangefeedTargets{} - targets[tableDesc.GetID()] = target + targets := []jobspb.ChangefeedTargetSpecification{target} e, err := getEncoder(opts, targets) require.NoError(t, err) diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 96af723b2c4b..55751564d995 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -43,7 +43,7 @@ type Config struct { Gossip gossip.OptionalGossip Spans []roachpb.Span BackfillCheckpoint []roachpb.Span - Targets jobspb.ChangefeedTargets + Targets []jobspb.ChangefeedTargetSpecification Writer kvevent.Writer Metrics *kvevent.Metrics OnBackfillCallback func() func() diff --git a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go index 70fb4ee5f90b..2761e653b6f0 100644 --- a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go +++ b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go @@ -78,7 +78,7 @@ func New( ctx context.Context, cfg *execinfra.ServerConfig, events changefeedbase.SchemaChangeEventClass, - targets jobspb.ChangefeedTargets, + targets []jobspb.ChangefeedTargetSpecification, initialHighwater hlc.Timestamp, metrics *Metrics, ) SchemaFeed { @@ -114,7 +114,7 @@ type schemaFeed struct { db *kv.DB clock *hlc.Clock settings *cluster.Settings - targets jobspb.ChangefeedTargets + targets []jobspb.ChangefeedTargetSpecification ie sqlutil.InternalExecutor metrics *Metrics @@ -278,10 +278,10 @@ func (tf *schemaFeed) primeInitialTableDescs(ctx context.Context) error { return err } // Note that all targets are currently guaranteed to be tables. - for tableID := range tf.targets { + for _, table := range tf.targets { flags := tree.ObjectLookupFlagsWithRequired() flags.AvoidLeased = true - tableDesc, err := descriptors.GetImmutableTableByID(ctx, txn, tableID, flags) + tableDesc, err := descriptors.GetImmutableTableByID(ctx, txn, table.TableID, flags) if err != nil { return err } @@ -636,8 +636,15 @@ func (tf *schemaFeed) fetchDescriptorVersions( if err != nil { return err } - - origName, isTable := tf.targets[descpb.ID(id)] + var origName string + var isTable bool + for _, cts := range tf.targets { + if cts.TableID == descpb.ID(id) { + origName = cts.StatementTimeName + isTable = true + break + } + } isType := tf.mu.typeDeps.containsType(descpb.ID(id)) // Check if the descriptor is an interesting table or type. if !(isTable || isType) { @@ -647,7 +654,7 @@ func (tf *schemaFeed) fetchDescriptorVersions( unsafeValue := it.UnsafeValue() if unsafeValue == nil { - name := origName.StatementTimeName + name := origName if name == "" { name = fmt.Sprintf("desc(%d)", id) } diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index b84e7f3e7bc2..5aea47105877 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -109,7 +109,7 @@ func getSink( return makeNullSink(sinkURL{URL: u}, m) case u.Scheme == changefeedbase.SinkSchemeKafka: return validateOptionsAndMakeSink(changefeedbase.KafkaValidOptions, func() (Sink, error) { - return makeKafkaSink(ctx, sinkURL{URL: u}, feedCfg.Targets, feedCfg.Opts, m) + return makeKafkaSink(ctx, sinkURL{URL: u}, AllTargets(feedCfg), feedCfg.Opts, m) }) case isWebhookSink(u): return validateOptionsAndMakeSink(changefeedbase.WebhookValidOptions, func() (Sink, error) { @@ -119,7 +119,7 @@ func getSink( case isPubsubSink(u): // TODO: add metrics to pubsubsink return validateOptionsAndMakeSink(changefeedbase.PubsubValidOptions, func() (Sink, error) { - return MakePubsubSink(ctx, u, feedCfg.Opts, feedCfg.Targets) + return MakePubsubSink(ctx, u, feedCfg.Opts, AllTargets(feedCfg)) }) case isCloudStorageSink(u): return validateOptionsAndMakeSink(changefeedbase.CloudStorageValidOptions, func() (Sink, error) { @@ -130,7 +130,7 @@ func getSink( }) case u.Scheme == changefeedbase.SinkSchemeExperimentalSQL: return validateOptionsAndMakeSink(changefeedbase.SQLValidOptions, func() (Sink, error) { - return makeSQLSink(sinkURL{URL: u}, sqlSinkTableName, feedCfg.Targets, m) + return makeSQLSink(sinkURL{URL: u}, sqlSinkTableName, AllTargets(feedCfg), m) }) case u.Scheme == "": return nil, errors.Errorf(`no scheme found for sink URL %q`, feedCfg.SinkURI) diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go index a9a6b2a0350a..64ea1e83d756 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go @@ -145,7 +145,7 @@ func TestCloudStorageSink(t *testing.T) { changefeedbase.OptCompression: ``, // NB: overridden in single-node subtest. } ts := func(i int64) hlc.Timestamp { return hlc.Timestamp{WallTime: i} } - e, err := makeJSONEncoder(opts, jobspb.ChangefeedTargets{}) + e, err := makeJSONEncoder(opts, []jobspb.ChangefeedTargetSpecification{}) require.NoError(t, err) clientFactory := blobs.TestBlobServiceClient(settings.ExternalIODir) diff --git a/pkg/ccl/changefeedccl/sink_kafka.go b/pkg/ccl/changefeedccl/sink_kafka.go index 0e346e5595fe..273578cfcba4 100644 --- a/pkg/ccl/changefeedccl/sink_kafka.go +++ b/pkg/ccl/changefeedccl/sink_kafka.go @@ -415,18 +415,18 @@ func (p *changefeedPartitioner) Partition( } func makeTopicsMap( - prefix string, name string, targets jobspb.ChangefeedTargets, + prefix string, name string, targets []jobspb.ChangefeedTargetSpecification, ) map[descpb.ID]string { topics := make(map[descpb.ID]string) useSingleName := name != "" if useSingleName { name = prefix + SQLNameToKafkaName(name) } - for id, t := range targets { + for _, t := range targets { if useSingleName { - topics[id] = name + topics[t.TableID] = name } else { - topics[id] = prefix + SQLNameToKafkaName(t.StatementTimeName) + topics[t.TableID] = prefix + SQLNameToKafkaName(t.StatementTimeName) } } return topics @@ -649,7 +649,7 @@ func buildKafkaConfig(u sinkURL, opts map[string]string) (*sarama.Config, error) func makeKafkaSink( ctx context.Context, u sinkURL, - targets jobspb.ChangefeedTargets, + targets []jobspb.ChangefeedTargetSpecification, opts map[string]string, m *sliMetrics, ) (Sink, error) { diff --git a/pkg/ccl/changefeedccl/sink_pubsub.go b/pkg/ccl/changefeedccl/sink_pubsub.go index 0fa5f912f41f..110e721b7abc 100644 --- a/pkg/ccl/changefeedccl/sink_pubsub.go +++ b/pkg/ccl/changefeedccl/sink_pubsub.go @@ -139,7 +139,10 @@ func getGCPCredentials(ctx context.Context, u sinkURL) (*google.Credentials, err // MakePubsubSink returns the corresponding pubsub sink based on the url given func MakePubsubSink( - ctx context.Context, u *url.URL, opts map[string]string, targets jobspb.ChangefeedTargets, + ctx context.Context, + u *url.URL, + opts map[string]string, + targets []jobspb.ChangefeedTargetSpecification, ) (Sink, error) { pubsubURL := sinkURL{URL: u, q: u.Query()} @@ -294,19 +297,19 @@ func (p *gcpPubsubClient) getTopicClient(topicID descpb.ID) (*pubsub.Topic, erro } func (p *pubsubSink) getTopicsMap( - targets jobspb.ChangefeedTargets, pubsubTopicName string, + targets []jobspb.ChangefeedTargetSpecification, pubsubTopicName string, ) map[descpb.ID]*topicStruct { topics := make(map[descpb.ID]*topicStruct) //creates a topic for each target - for id, target := range targets { + for _, target := range targets { var topicName string if pubsubTopicName != "" { topicName = pubsubTopicName } else { topicName = target.StatementTimeName } - topics[id] = &topicStruct{topicName: topicName} + topics[target.TableID] = &topicStruct{topicName: topicName} } return topics } diff --git a/pkg/ccl/changefeedccl/sink_sql.go b/pkg/ccl/changefeedccl/sink_sql.go index e10fff92251e..9a54dbee9989 100644 --- a/pkg/ccl/changefeedccl/sink_sql.go +++ b/pkg/ccl/changefeedccl/sink_sql.go @@ -72,7 +72,7 @@ type sqlSink struct { const sqlSinkTableName = `sqlsink` func makeSQLSink( - u sinkURL, tableName string, targets jobspb.ChangefeedTargets, m *sliMetrics, + u sinkURL, tableName string, targets []jobspb.ChangefeedTargetSpecification, m *sliMetrics, ) (Sink, error) { // Swap the changefeed prefix for the sql connection one that sqlSink // expects. @@ -84,9 +84,9 @@ func makeSQLSink( topics := make(map[string]struct{}) targetNames := make(map[descpb.ID]string) - for id, t := range targets { + for _, t := range targets { topics[t.StatementTimeName] = struct{}{} - targetNames[id] = t.StatementTimeName + targetNames[t.TableID] = t.StatementTimeName } uri := u.String() diff --git a/pkg/ccl/changefeedccl/sink_test.go b/pkg/ccl/changefeedccl/sink_test.go index 9a441e685bc6..1c6eb2059e9e 100644 --- a/pkg/ccl/changefeedccl/sink_test.go +++ b/pkg/ccl/changefeedccl/sink_test.go @@ -203,10 +203,13 @@ func makeTestKafkaSink( } } -func makeChangefeedTargets(targetNames ...string) jobspb.ChangefeedTargets { - targets := make(jobspb.ChangefeedTargets, len(targetNames)) +func makeChangefeedTargets(targetNames ...string) []jobspb.ChangefeedTargetSpecification { + targets := make([]jobspb.ChangefeedTargetSpecification, len(targetNames)) for i, name := range targetNames { - targets[descpb.ID(i)] = jobspb.ChangefeedTarget{StatementTimeName: name} + targets[i] = jobspb.ChangefeedTargetSpecification{ + TableID: descpb.ID(i), + StatementTimeName: name, + } } return targets } @@ -402,9 +405,9 @@ func TestSQLSink(t *testing.T) { fooTopic := overrideTopic(`foo`) barTopic := overrideTopic(`bar`) - targets := jobspb.ChangefeedTargets{ - fooTopic.GetID(): jobspb.ChangefeedTarget{StatementTimeName: `foo`}, - barTopic.GetID(): jobspb.ChangefeedTarget{StatementTimeName: `bar`}, + targets := []jobspb.ChangefeedTargetSpecification{ + {TableID: fooTopic.GetID(), StatementTimeName: `foo`}, + {TableID: barTopic.GetID(), StatementTimeName: `bar`}, } const testTableName = `sink` sink, err := makeSQLSink(sinkURL{URL: &pgURL}, testTableName, targets, nil) diff --git a/pkg/ccl/changefeedccl/sink_webhook_test.go b/pkg/ccl/changefeedccl/sink_webhook_test.go index 98a0a396f164..cbb9690edc38 100644 --- a/pkg/ccl/changefeedccl/sink_webhook_test.go +++ b/pkg/ccl/changefeedccl/sink_webhook_test.go @@ -99,7 +99,7 @@ func testSendAndReceiveRows(t *testing.T, sinkSrc Sink, sinkDest *cdctest.MockWe "sink %s expected to receive message %s", sinkDest.URL(), "{\"payload\":[{\"after\":null,\"key\":[1002],\"topic:\":\"foo\"}],\"length\":1}") - enc, err := makeJSONEncoder(getGenericWebhookSinkOptions(), jobspb.ChangefeedTargets{}) + enc, err := makeJSONEncoder(getGenericWebhookSinkOptions(), []jobspb.ChangefeedTargetSpecification{}) require.NoError(t, err) // test a resolved timestamp entry diff --git a/pkg/ccl/streamingccl/streamproducer/replication_stream_planning.go b/pkg/ccl/streamingccl/streamproducer/replication_stream_planning.go index 5217583677d0..16503e269810 100644 --- a/pkg/ccl/streamingccl/streamproducer/replication_stream_planning.go +++ b/pkg/ccl/streamingccl/streamproducer/replication_stream_planning.go @@ -93,7 +93,7 @@ func streamKVs( } details := jobspb.ChangefeedDetails{ - Targets: nil, // Not interested in schema changes + Tables: nil, // Not interested in schema changes Opts: cfOpts, SinkURI: "", // TODO(yevgeniy): Support sinks StatementTime: statementTime, diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index cad1b66d0fa5..1f95670ecee6 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -692,31 +692,42 @@ message SchemaChangeGCProgress { bool ranges_unsplit_done = 4; } -message ChangefeedTarget { +message ChangefeedTargetTable { string statement_time_name = 1; +} + +message ChangefeedTargetSpecification { + enum TargetType { + // The primary index of the table with table_id descriptor id. + // Fail if there are ever multiple column families. + PRIMARY_FAMILY_ONLY = 0; + + // The primary index of the table with table_id descriptor id. + // Each column family gets its own record schema and events. + EACH_FAMILY = 1; + + // Column family family_name of table table_id. + COLUMN_FAMILY = 2; + + // Add TargetTypes for database, secondary index, etc. when implemented + + } + + TargetType type = 1; + uint32 table_id = 2 [(gogoproto.customname) = "TableID", + (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID"]; + string family_name = 3; + string statement_time_name = 4; - // TODO(dan): Add partition name, ranges of primary keys. } message ChangefeedDetails { - // Targets contains the user-specified tables and databases to watch, mapping - // the descriptor id to the name at the time of changefeed creating. There is - // a 1:1 correspondence between unique targets in the original sql query and - // entries in this map. - // - // - A watched table is stored here under its table id - // - TODO(dan): A watched database is stored here under its database id - // - TODO(dan): A db.* expansion is treated identically to watching the - // database - // - // Note that the TODOs mean this field currently is guaranteed to only hold - // table ids and a cluster version check will be added when this changes. - // + // Targets contains the user-specified tables to watch, mapping + // the descriptor id to the name at the time of changefeed creation. // The names at resolution time are included so that table and database - // renames can be detected. They are also used to construct an error message - // if the descriptor id no longer exists when the jobs is unpaused (which can - // happen if it was dropped or truncated). - map targets = 6 [ + // renames can be tolerated and derived topic names remain immutable. + // + map tables = 6 [ (gogoproto.castkey) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID", (gogoproto.casttype) = "ChangefeedTargets", (gogoproto.nullable) = false @@ -724,8 +735,10 @@ message ChangefeedDetails { string sink_uri = 3 [(gogoproto.customname) = "SinkURI"]; map opts = 4; util.hlc.Timestamp statement_time = 7 [(gogoproto.nullable) = false]; + repeated ChangefeedTargetSpecification target_specifications = 8 [(gogoproto.nullable) = false]; reserved 1, 2, 5; + reserved "targets"; } message ResolvedSpan { diff --git a/pkg/jobs/jobspb/wrap.go b/pkg/jobs/jobspb/wrap.go index 24bfea0f345f..2e977e22c1ab 100644 --- a/pkg/jobs/jobspb/wrap.go +++ b/pkg/jobs/jobspb/wrap.go @@ -305,7 +305,7 @@ func WrapPayloadDetails(details Details) interface { } // ChangefeedTargets is a set of id targets with metadata. -type ChangefeedTargets map[descpb.ID]ChangefeedTarget +type ChangefeedTargets map[descpb.ID]ChangefeedTargetTable // SchemaChangeDetailsFormatVersion is the format version for // SchemaChangeDetails.