Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
96567: changefeedccl: add unordered flag r=[miretskiy] a=HonoreDB

This PR adds the WITH unordered changefeed option, which relaxes our constraints on configuration meant to preserve end-to-end ordering guarantees. Followup PRs will use this in different ways, but this one just removes the requirement to specify a region in gcpubsub.

Release note (enterprise change): Changefeeds with the WITH unordered flag may use multiregion Google Cloud pubsub topics.

Addresses cockroachdb#80884. This is also motivated by cockroachdb#54461.

Co-authored-by: Aaron Zinger <zinger@cockroachlabs.com>
  • Loading branch information
craig[bot] and HonoreDB committed Feb 13, 2023
2 parents 18c5a38 + a79eaf3 commit cdaf027
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 6 deletions.
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4536,6 +4536,10 @@ func TestChangefeedErrors(t *testing.T) {
`kafka://nope`,
)

// Unordered flag required for some options, disallowed for others.
sqlDB.ExpectErr(t, `resolved timestamps cannot be guaranteed to be correct in unordered mode`, `CREATE CHANGEFEED FOR foo WITH resolved, unordered`)
sqlDB.ExpectErr(t, `Use of gcpubsub without specifying a region requires the WITH unordered option.`, `CREATE CHANGEFEED FOR foo INTO "gcpubsub://foo"`)

// The topics option should not be exposed to users since it is used
// internally to display topics in the show changefeed jobs query
sqlDB.ExpectErr(
Expand Down
30 changes: 29 additions & 1 deletion pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ const (
OptWebhookClientTimeout = `webhook_client_timeout`
OptOnError = `on_error`
OptMetricsScope = `metrics_label`
OptUnordered = `unordered`
OptVirtualColumns = `virtual_columns`

OptVirtualColumnsOmitted VirtualColumnVisibility = `omitted`
Expand Down Expand Up @@ -325,6 +326,7 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{
OptWebhookClientTimeout: durationOption,
OptOnError: enum("pause", "fail"),
OptMetricsScope: stringOption,
OptUnordered: flagOption,
OptVirtualColumns: enum("omitted", "null"),
}

Expand All @@ -336,7 +338,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEndTime, OptEnvelope,
OptMVCCTimestamps, OptDiff, OptSplitColumnFamilies,
OptSchemaChangeEvents, OptSchemaChangePolicy,
OptProtectDataFromGCOnPause, OptOnError,
OptInitialScan, OptNoInitialScan, OptInitialScanOnly,
OptInitialScan, OptNoInitialScan, OptInitialScanOnly, OptUnordered,
OptMinCheckpointFrequency, OptMetricsScope, OptVirtualColumns, Topics)

// SQLValidOptions is options exclusive to SQL sink
Expand Down Expand Up @@ -436,6 +438,25 @@ var AlterChangefeedTargetOptions = map[string]OptionPermittedValues{
OptNoInitialScan: flagOption,
}

type incompatibleOptions struct {
opt1 string
opt2 string
reason string
}

func makeInvertedIndex(pairs []incompatibleOptions) map[string][]incompatibleOptions {
m := make(map[string][]incompatibleOptions, len(pairs)*2)
for _, p := range pairs {
m[p.opt1] = append(m[p.opt1], p)
m[p.opt2] = append(m[p.opt2], p)
}
return m
}

var incompatibleOptionsMap = makeInvertedIndex([]incompatibleOptions{
{opt1: OptUnordered, opt2: OptResolvedTimestamps, reason: `resolved timestamps cannot be guaranteed to be correct in unordered mode`},
})

// MakeStatementOptions wraps and canonicalizes the options we get
// from TypeAsStringOpts or the job record.
func MakeStatementOptions(opts map[string]string) StatementOptions {
Expand Down Expand Up @@ -982,6 +1003,13 @@ func (s StatementOptions) ValidateForCreateChangefeed(isPredicateChangefeed bool
}
}
}
for o := range s.m {
for _, pair := range incompatibleOptionsMap[o] {
if s.IsSet(pair.opt1) && s.IsSet(pair.opt2) {
return errors.Newf(`%s is not usable with %s because %s`, pair.opt1, pair.opt2, pair.reason)
}
}
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func getSink(
})
case isPubsubSink(u):
// TODO: add metrics to pubsubsink
return MakePubsubSink(ctx, u, encodingOpts, AllTargets(feedCfg))
return MakePubsubSink(ctx, u, encodingOpts, AllTargets(feedCfg), opts.IsSet(changefeedbase.OptUnordered))
case isCloudStorageSink(u):
return validateOptionsAndMakeSink(changefeedbase.CloudStorageValidOptions, func() (Sink, error) {
// Placeholder id for canary sink
Expand Down
19 changes: 15 additions & 4 deletions pkg/ccl/changefeedccl/sink_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const credentialsParam = "CREDENTIALS"
const GcpScheme = "gcpubsub"
const gcpScope = "https://www.googleapis.com/auth/pubsub"
const cloudPlatformScope = "https://www.googleapis.com/auth/cloud-platform"
const globalGCPEndpoint = "pubsub.googleapis.com:443"

// TODO: make numOfWorkers configurable
const numOfWorkers = 128
Expand Down Expand Up @@ -78,7 +79,7 @@ type gcpPubsubClient struct {
client *pubsub.Client
ctx context.Context
projectID string
region string
endpoint string
topicNamer *TopicNamer
url sinkURL

Expand Down Expand Up @@ -186,6 +187,7 @@ func MakePubsubSink(
u *url.URL,
encodingOpts changefeedbase.EncodingOptions,
targets changefeedbase.Targets,
unordered bool,
) (Sink, error) {

pubsubURL := sinkURL{URL: u, q: u.Query()}
Expand Down Expand Up @@ -226,8 +228,17 @@ func MakePubsubSink(
return nil, errors.New("missing project name")
}
region := pubsubURL.consumeParam(regionParam)
var endpoint string
if region == "" {
return nil, errors.New("region query parameter not found")
if unordered {
endpoint = globalGCPEndpoint
} else {
return nil, errors.WithHintf(errors.New("region query parameter not found"),
"Use of gcpubsub without specifying a region requires the WITH %s option.",
changefeedbase.OptUnordered)
}
} else {
endpoint = gcpEndpointForRegion(region)
}
tn, err := MakeTopicNamer(targets, WithSingleName(pubsubTopicName))
if err != nil {
Expand All @@ -237,7 +248,7 @@ func MakePubsubSink(
topicNamer: tn,
ctx: ctx,
projectID: projectID,
region: gcpEndpointForRegion(region),
endpoint: endpoint,
url: pubsubURL,
}
p.client = g
Expand Down Expand Up @@ -512,7 +523,7 @@ func (p *gcpPubsubClient) init() error {
p.ctx,
p.projectID,
creds,
option.WithEndpoint(p.region),
option.WithEndpoint(p.endpoint),
)

if err != nil {
Expand Down

0 comments on commit cdaf027

Please sign in to comment.