Skip to content

Commit

Permalink
cdc: refactor to replace map of changefeed targets with array
Browse files Browse the repository at this point in the history
Necessary intermediate step towards changefeeds on things other than full tables.
Previously, changefeed targets were serialized as a map of TableID->StatementTimeName.
This keeps that for now for compatibility but adds in a list of target specifications
that's more extendable, and changes the API for everything that will eventually need
to care (for example, an encoder will need to know that it's encoding a column family).

Tested manually that an old-style payload still works, and some existing unit tests
create an old-style ChangefeedDetails.

Release note: None
  • Loading branch information
HonoreDB authored and RajivTS committed Mar 6, 2022
1 parent 6f53d3b commit 7612dfe
Show file tree
Hide file tree
Showing 21 changed files with 207 additions and 115 deletions.
25 changes: 18 additions & 7 deletions pkg/ccl/changefeedccl/alter_changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,17 @@ func alterChangefeedPlanHook(
targetDescs = append(targetDescs, descs...)
}

newTargets, err := getTargets(ctx, p, targetDescs, details.Opts)
newTargets, newTables, err := getTargetsAndTables(ctx, p, targetDescs, details.Opts)
if err != nil {
return err
}
// add old targets
for id, target := range details.Targets {
newTargets[id] = target
for id, table := range details.Tables {
newTables[id] = table
}
details.Targets = newTargets
details.Tables = newTables
details.TargetSpecifications = append(details.TargetSpecifications, newTargets...)

}

if opts.DropTargets != nil {
Expand All @@ -129,12 +131,21 @@ func alterChangefeedPlanHook(
if err := p.CheckPrivilege(ctx, desc, privilege.SELECT); err != nil {
return err
}
delete(details.Targets, table.GetID())
delete(details.Tables, table.GetID())
}
}

newTargetSpecifications := make([]jobspb.ChangefeedTargetSpecification, len(details.TargetSpecifications)-len(opts.DropTargets))
for _, ts := range details.TargetSpecifications {
if _, stillThere := details.Tables[ts.TableID]; stillThere {
newTargetSpecifications = append(newTargetSpecifications, ts)
}
}
details.TargetSpecifications = newTargetSpecifications

}

if len(details.Targets) == 0 {
if len(details.Tables) == 0 {
return errors.Errorf("cannot drop all targets for changefeed job %d", jobID)
}

Expand All @@ -152,7 +163,7 @@ func alterChangefeedPlanHook(
}

var targets tree.TargetList
for _, target := range details.Targets {
for _, target := range details.Tables {
targetName := tree.MakeTableNameFromPrefix(tree.ObjectNamePrefix{}, tree.Name(target.StatementTimeName))
targets.Tables = append(targets.Tables, &targetName)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,15 @@ func createBenchmarkChangefeed(
tableDesc := desctestutils.TestingGetPublicTableDescriptor(s.DB(), keys.SystemSQLCodec, database, table)
spans := []roachpb.Span{tableDesc.PrimaryIndexSpan(keys.SystemSQLCodec)}
details := jobspb.ChangefeedDetails{
Targets: jobspb.ChangefeedTargets{tableDesc.GetID(): jobspb.ChangefeedTarget{
Tables: jobspb.ChangefeedTargets{tableDesc.GetID(): jobspb.ChangefeedTargetTable{
StatementTimeName: tableDesc.GetName(),
}},
Opts: map[string]string{
changefeedbase.OptEnvelope: string(changefeedbase.OptEnvelopeRow),
},
}
initialHighWater := hlc.Timestamp{}
encoder, err := makeJSONEncoder(details.Opts, details.Targets)
encoder, err := makeJSONEncoder(details.Opts, AllTargets(details))
if err != nil {
return nil, nil, err
}
Expand All @@ -228,7 +228,7 @@ func createBenchmarkChangefeed(
Clock: feedClock,
Gossip: gossip.MakeOptionalGossip(s.GossipI().(*gossip.Gossip)),
Spans: spans,
Targets: details.Targets,
Targets: AllTargets(details),
Writer: buf,
Metrics: &metrics.KVFeedMetrics,
MM: mm,
Expand Down
16 changes: 9 additions & 7 deletions pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func createProtectedTimestampRecord(
ctx context.Context,
codec keys.SQLCodec,
jobID jobspb.JobID,
targets jobspb.ChangefeedTargets,
targets []jobspb.ChangefeedTargetSpecification,
resolved hlc.Timestamp,
progress *jobspb.ChangefeedProgress,
) *ptpb.Record {
Expand All @@ -69,19 +69,21 @@ func createProtectedTimestampRecord(
jobsprotectedts.Jobs, targetToProtect)
}

func makeTargetToProtect(targets jobspb.ChangefeedTargets) *ptpb.Target {
func makeTargetToProtect(targets []jobspb.ChangefeedTargetSpecification) *ptpb.Target {
// NB: We add 1 because we're also going to protect system.descriptors.
// We protect system.descriptors because a changefeed needs all of the history
// of table descriptors to version data.
tablesToProtect := make(descpb.IDs, 0, len(targets)+1)
for t := range targets {
tablesToProtect = append(tablesToProtect, t)
for _, t := range targets {
tablesToProtect = append(tablesToProtect, t.TableID)
}
tablesToProtect = append(tablesToProtect, keys.DescriptorTableID)
return ptpb.MakeSchemaObjectsTarget(tablesToProtect)
}

func makeSpansToProtect(codec keys.SQLCodec, targets jobspb.ChangefeedTargets) []roachpb.Span {
func makeSpansToProtect(
codec keys.SQLCodec, targets []jobspb.ChangefeedTargetSpecification,
) []roachpb.Span {
// NB: We add 1 because we're also going to protect system.descriptors.
// We protect system.descriptors because a changefeed needs all of the history
// of table descriptors to version data.
Expand All @@ -93,8 +95,8 @@ func makeSpansToProtect(codec keys.SQLCodec, targets jobspb.ChangefeedTargets) [
EndKey: tablePrefix.PrefixEnd(),
})
}
for t := range targets {
addTablePrefix(uint32(t))
for _, t := range targets {
addTablePrefix(uint32(t.TableID))
}
addTablePrefix(keys.DescriptorTableID)
return spansToProtect
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func distChangefeedFlow(
spansTS = spansTS.Next()
}
var err error
trackedSpans, err = fetchSpansForTargets(ctx, execCfg, details.Targets, spansTS)
trackedSpans, err = fetchSpansForTargets(ctx, execCfg, AllTargets(details), spansTS)
if err != nil {
return err
}
Expand All @@ -121,7 +121,7 @@ func distChangefeedFlow(
func fetchSpansForTargets(
ctx context.Context,
execCfg *sql.ExecutorConfig,
targets jobspb.ChangefeedTargets,
targets []jobspb.ChangefeedTargetSpecification,
ts hlc.Timestamp,
) ([]roachpb.Span, error) {
var spans []roachpb.Span
Expand All @@ -133,10 +133,10 @@ func fetchSpansForTargets(
return err
}
// Note that all targets are currently guaranteed to be tables.
for tableID := range targets {
for _, table := range targets {
flags := tree.ObjectLookupFlagsWithRequired()
flags.AvoidLeased = true
tableDesc, err := descriptors.GetImmutableTableByID(ctx, txn, tableID, flags)
tableDesc, err := descriptors.GetImmutableTableByID(ctx, txn, table.TableID, flags)
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func newChangeAggregatorProcessor(
}

var err error
if ca.encoder, err = getEncoder(ca.spec.Feed.Opts, ca.spec.Feed.Targets); err != nil {
if ca.encoder, err = getEncoder(ca.spec.Feed.Opts, AllTargets(ca.spec.Feed)); err != nil {
return nil, err
}

Expand Down Expand Up @@ -345,7 +345,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
if schemaChangePolicy == changefeedbase.OptSchemaChangePolicyIgnore {
sf = schemafeed.DoNothingSchemaFeed
} else {
sf = schemafeed.New(ctx, cfg, schemaChangeEvents, ca.spec.Feed.Targets,
sf = schemafeed.New(ctx, cfg, schemaChangeEvents, AllTargets(ca.spec.Feed),
initialHighWater, &ca.metrics.SchemaFeedMetrics)
}

Expand All @@ -358,7 +358,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
Gossip: cfg.Gossip,
Spans: spans,
BackfillCheckpoint: ca.spec.Checkpoint.Spans,
Targets: ca.spec.Feed.Targets,
Targets: AllTargets(ca.spec.Feed),
Metrics: &ca.metrics.KVFeedMetrics,
OnBackfillCallback: ca.sliMetrics.getBackfillCallback(),
MM: ca.kvFeedMemMon,
Expand Down Expand Up @@ -1085,7 +1085,7 @@ func newChangeFrontierProcessor(
cf.freqEmitResolved = emitNoResolved
}

if cf.encoder, err = getEncoder(spec.Feed.Opts, spec.Feed.Targets); err != nil {
if cf.encoder, err = getEncoder(spec.Feed.Opts, AllTargets(spec.Feed)); err != nil {
return nil, err
}

Expand Down Expand Up @@ -1454,7 +1454,7 @@ func (cf *changeFrontier) manageProtectedTimestamps(

recordID := progress.ProtectedTimestampRecord
if recordID == uuid.Nil {
ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, cf.spec.Feed.Targets, highWater, progress)
ptr := createProtectedTimestampRecord(ctx, cf.flowCtx.Codec(), cf.spec.JobID, AllTargets(cf.spec.Feed), highWater, progress)
if err := pts.Protect(ctx, txn, ptr); err != nil {
return err
}
Expand Down
66 changes: 49 additions & 17 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,16 +181,17 @@ func changefeedPlanHook(
return err
}

targets, err := getTargets(ctx, p, targetDescs, opts)
targets, tables, err := getTargetsAndTables(ctx, p, targetDescs, opts)
if err != nil {
return err
}

details := jobspb.ChangefeedDetails{
Targets: targets,
Opts: opts,
SinkURI: sinkURI,
StatementTime: statementTime,
Tables: tables,
Opts: opts,
SinkURI: sinkURI,
StatementTime: statementTime,
TargetSpecifications: targets,
}
progress := jobspb.Progress{
Progress: &jobspb.Progress_HighWater{},
Expand Down Expand Up @@ -243,7 +244,7 @@ func changefeedPlanHook(
return err
}

if _, err := getEncoder(details.Opts, details.Targets); err != nil {
if _, err := getEncoder(details.Opts, AllTargets(details)); err != nil {
return err
}

Expand All @@ -269,7 +270,7 @@ func changefeedPlanHook(
}
telemetry.Count(`changefeed.create.sink.` + telemetrySink)
telemetry.Count(`changefeed.create.format.` + details.Opts[changefeedbase.OptFormat])
telemetry.CountBucketed(`changefeed.create.num_tables`, int64(len(targets)))
telemetry.CountBucketed(`changefeed.create.num_tables`, int64(len(tables)))

if scope, ok := opts[changefeedbase.OptMetricsScope]; ok {
if err := utilccl.CheckEnterpriseEnabled(
Expand Down Expand Up @@ -332,7 +333,7 @@ func changefeedPlanHook(
var protectedTimestampID uuid.UUID
codec := p.ExecCfg().Codec
if shouldProtectTimestamps(codec) {
ptr = createProtectedTimestampRecord(ctx, codec, jobID, details.Targets, statementTime, progress.GetChangefeed())
ptr = createProtectedTimestampRecord(ctx, codec, jobID, AllTargets(details), statementTime, progress.GetChangefeed())
protectedTimestampID = ptr.ID.GetUUID()
}

Expand Down Expand Up @@ -470,35 +471,41 @@ func getTableDescriptors(
return targetDescs, err
}

func getTargets(
func getTargetsAndTables(
ctx context.Context,
p sql.PlanHookState,
targetDescs []catalog.Descriptor,
opts map[string]string,
) (jobspb.ChangefeedTargets, error) {
targets := make(jobspb.ChangefeedTargets, len(targetDescs))
) ([]jobspb.ChangefeedTargetSpecification, jobspb.ChangefeedTargets, error) {
tables := make(jobspb.ChangefeedTargets, len(targetDescs))
targets := make([]jobspb.ChangefeedTargetSpecification, len(targetDescs))
for _, desc := range targetDescs {
if table, isTable := desc.(catalog.TableDescriptor); isTable {
if err := p.CheckPrivilege(ctx, desc, privilege.SELECT); err != nil {
return nil, err
return nil, nil, err
}
_, qualified := opts[changefeedbase.OptFullTableName]
name, err := getChangefeedTargetName(ctx, table, p.ExecCfg(), p.ExtendedEvalContext().Txn, qualified)
if err != nil {
return nil, err
return nil, nil, err
}
targets[table.GetID()] = jobspb.ChangefeedTarget{
tables[table.GetID()] = jobspb.ChangefeedTargetTable{
StatementTimeName: name,
}
ts := jobspb.ChangefeedTargetSpecification{
Type: jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY,
TableID: table.GetID(),
}
targets = append(targets, ts)
if err := changefeedbase.ValidateTable(targets, table); err != nil {
return nil, err
return nil, nil, err
}
for _, warning := range changefeedbase.WarningsForTable(targets, table, opts) {
for _, warning := range changefeedbase.WarningsForTable(tables, table, opts) {
p.BufferClientNotice(ctx, pgnotice.Newf("%s", warning))
}
}
}
return targets, nil
return targets, tables, nil
}

func validateSink(
Expand Down Expand Up @@ -953,3 +960,28 @@ func getChangefeedTargetName(
}
return desc.GetName(), nil
}

// AllTargets gets all the targets listed in a ChangefeedDetails,
// from the statement time name map in old protos
// or the TargetSpecifications in new ones.
func AllTargets(cd jobspb.ChangefeedDetails) (targets []jobspb.ChangefeedTargetSpecification) {
//TODO: Use a version gate for this once we have CDC version gates
if len(cd.TargetSpecifications) > 0 {
for _, ts := range cd.TargetSpecifications {
if ts.TableID > 0 {
ts.StatementTimeName = cd.Tables[ts.TableID].StatementTimeName
targets = append(targets, ts)
}
}
} else {
for id, t := range cd.Tables {
ct := jobspb.ChangefeedTargetSpecification{
Type: jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY,
TableID: id,
StatementTimeName: t.StatementTimeName,
}
targets = append(targets, ct)
}
}
return
}
16 changes: 13 additions & 3 deletions pkg/ccl/changefeedccl/changefeedbase/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,19 @@ import (
)

// ValidateTable validates that a table descriptor can be watched by a CHANGEFEED.
func ValidateTable(targets jobspb.ChangefeedTargets, tableDesc catalog.TableDescriptor) error {
t, ok := targets[tableDesc.GetID()]
if !ok {
func ValidateTable(
targets []jobspb.ChangefeedTargetSpecification, tableDesc catalog.TableDescriptor,
) error {
var t jobspb.ChangefeedTargetSpecification
var found bool
for _, cts := range targets {
if cts.TableID == tableDesc.GetID() {
t = cts
found = true
break
}
}
if !found {
return errors.Errorf(`unwatched table: %s`, tableDesc.GetName())
}

Expand Down
Loading

0 comments on commit 7612dfe

Please sign in to comment.