Skip to content

Commit

Permalink
Merge #33647
Browse files Browse the repository at this point in the history
33647: changefeed: add experimental support for cloud storage sinks r=mrtracy a=danhhz

The data files are named `<timestamp>_<topic>_<schema_id>_<uniquer>.<ext>`.

`<timestamp>` is truncated to some bucket size, specified by the required
sink param `bucket_size`. Bucket size is a tradeoff between number of files
and the end-to-end latency of data being resolved.

`<topic>` corresponds to one SQL table.

`<schema_id>` changes whenever the SQL table schema changes, which allows us
to guarantee to users that _all entries in a given file have the same
schema_.

`<uniquer>` is used to keep nodes in a cluster from overwriting each other's
data and should be ignored by external users.

`<ext>` implies the format of the file: currently the only option is
`ndjson`, which means a text file conforming to the "Newline Delimited JSON"
spec.

Each record in the data files is a value, keys are not included, so the
`envelope` option must be set to `row`, which is the default. Within a file,
records are not guaranteed to be sorted by timestamp. A duplicate of some
record might exist in a different file or even in the same file.

The resolved timestamp files are named `<timestamp>.RESOLVED`. This is
carefully done so that we can offer the following external guarantee: At any
given time, if the the files are iterated in lexicographic filename order,
then encountering any filename containing `RESOLVED` means that everything
before it is finalized (and thus can be ingested into some other system and
deleted, included in hive queries, etc). A typical user of cloudStorageSink
would periodically do exactly this.

Still TODO is writing out data schemas, Avro support, bounding memory usage.
Eliminating duplicates would be great, but may not be immediately practical.

Partially completes #28675

Release note (enterprise change): `CHANGEFEED`s now experimentally
support writing to cloud storage, for easy use with analytics databases

Co-authored-by: Daniel Harrison <daniel.harrison@gmail.com>
  • Loading branch information
craig[bot] and danhhz committed Jan 15, 2019
2 parents 8465311 + 670a546 commit 0c7e491
Show file tree
Hide file tree
Showing 10 changed files with 764 additions and 132 deletions.
31 changes: 20 additions & 11 deletions pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func kvsToRows(
func emitEntries(
settings *cluster.Settings,
details jobspb.ChangefeedDetails,
watchedSpans []roachpb.Span,
encoder Encoder,
sink Sink,
inputFn func(context.Context) ([]emitEntry, error),
Expand All @@ -182,15 +183,16 @@ func emitEntries(
emitRowFn := func(ctx context.Context, row emitRow) error {
var keyCopy, valueCopy []byte

encodedKey, err := encoder.EncodeKey(row.tableDesc, row.datums)
if err != nil {
return err
if envelopeType(details.Opts[optEnvelope]) != optEnvelopeValueOnly {
encodedKey, err := encoder.EncodeKey(row.tableDesc, row.datums)
if err != nil {
return err
}
scratch, keyCopy = scratch.Copy(encodedKey, 0 /* extraCap */)
}
scratch, keyCopy = scratch.Copy(encodedKey, 0 /* extraCap */)

if !row.deleted && envelopeType(details.Opts[optEnvelope]) == optEnvelopeRow {
var encodedValue []byte
encodedValue, err = encoder.EncodeValue(row.tableDesc, row.datums, row.timestamp)
if !row.deleted && envelopeType(details.Opts[optEnvelope]) != optEnvelopeKeyOnly {
encodedValue, err := encoder.EncodeValue(row.tableDesc, row.datums, row.timestamp)
if err != nil {
return err
}
Expand All @@ -202,7 +204,9 @@ func emitEntries(
return err
}
}
if err := sink.EmitRow(ctx, row.tableDesc.Name, keyCopy, valueCopy); err != nil {
if err := sink.EmitRow(
ctx, row.tableDesc, keyCopy, valueCopy, row.timestamp,
); err != nil {
return err
}
if log.V(3) {
Expand All @@ -211,8 +215,12 @@ func emitEntries(
return nil
}

// This SpanFrontier only tracks the spans being watched on this node.
// (There is a different SpanFrontier elsewhere for the entire changefeed.)
watchedSF := makeSpanFrontier(watchedSpans...)

var lastFlush time.Time
// TODO(dan): We could keep these in a spanFrontier to eliminate dups.
// TODO(dan): We could keep these in `watchedSF` to eliminate dups.
var resolvedSpans []jobspb.ResolvedSpan

return func(ctx context.Context) ([]jobspb.ResolvedSpan, error) {
Expand All @@ -238,6 +246,7 @@ func emitEntries(
}
}
if input.resolved != nil {
_ = watchedSF.Forward(input.resolved.Span, input.resolved.Timestamp)
resolvedSpans = append(resolvedSpans, *input.resolved)
}
}
Expand All @@ -264,7 +273,7 @@ func emitEntries(
// otherwise, we could lose buffered messages and violate the
// at-least-once guarantee. This is also true for checkpointing the
// resolved spans in the job progress.
if err := sink.Flush(ctx); err != nil {
if err := sink.Flush(ctx, watchedSF.Frontier()); err != nil {
return nil, err
}
lastFlush = timeutil.Now()
Expand Down Expand Up @@ -329,7 +338,7 @@ func emitResolvedTimestamp(
payload = append([]byte(nil), payload...)
// TODO(dan): Emit more fine-grained (table level) resolved
// timestamps.
if err := sink.EmitResolvedTimestamp(ctx, payload); err != nil {
if err := sink.EmitResolvedTimestamp(ctx, payload, resolved); err != nil {
return err
}
if log.V(2) {
Expand Down
7 changes: 4 additions & 3 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context {

var err error
if ca.sink, err = getSink(
ca.spec.Feed.SinkURI, ca.spec.Feed.Opts, ca.spec.Feed.Targets,
ca.spec.Feed.SinkURI, ca.spec.Feed.Opts, ca.spec.Feed.Targets, ca.flowCtx.Settings,
); err != nil {
// Early abort in the case that there is an error creating the sink.
ca.MoveToDraining(err)
Expand Down Expand Up @@ -160,7 +160,8 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context {
if cfKnobs, ok := ca.flowCtx.TestingKnobs().Changefeed.(*TestingKnobs); ok {
knobs = *cfKnobs
}
ca.tickFn = emitEntries(ca.flowCtx.Settings, ca.spec.Feed, ca.encoder, ca.sink, rowsFn, knobs, metrics)
ca.tickFn = emitEntries(
ca.flowCtx.Settings, ca.spec.Feed, spans, ca.encoder, ca.sink, rowsFn, knobs, metrics)

// Give errCh enough buffer both possible errors from supporting goroutines,
// but only the first one is ever used.
Expand Down Expand Up @@ -393,7 +394,7 @@ func (cf *changeFrontier) Start(ctx context.Context) context.Context {

var err error
if cf.sink, err = getSink(
cf.spec.Feed.SinkURI, cf.spec.Feed.Opts, cf.spec.Feed.Targets,
cf.spec.Feed.SinkURI, cf.spec.Feed.Opts, cf.spec.Feed.Targets, cf.flowCtx.Settings,
); err != nil {
cf.MoveToDraining(err)
return ctx
Expand Down
25 changes: 18 additions & 7 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,17 @@ const (
optResolvedTimestamps = `resolved`
optUpdatedTimestamps = `updated`

optEnvelopeKeyOnly envelopeType = `key_only`
optEnvelopeRow envelopeType = `row`
optEnvelopeDiff envelopeType = `diff`
optEnvelopeDiff envelopeType = `diff`
optEnvelopeKeyOnly envelopeType = `key_only`
optEnvelopeRow envelopeType = `row`
optEnvelopeValueOnly envelopeType = `value_only`

optFormatJSON formatType = `json`
optFormatAvro formatType = `experimental_avro`

sinkParamTopicPrefix = `topic_prefix`
sinkParamBucketSize = `bucket_size`
sinkParamSchemaTopic = `schema_topic`
sinkParamTopicPrefix = `topic_prefix`
sinkSchemeBuffer = ``
sinkSchemeExperimentalSQL = `experimental-sql`
sinkSchemeKafka = `kafka`
Expand Down Expand Up @@ -142,6 +144,8 @@ func changefeedPlanHook(
return err
}

jobDescription := changefeedJobDescription(changefeedStmt, sinkURI, opts)

statementTime := p.ExecCfg().Clock.Now()
var initialHighWater hlc.Timestamp
if cursor, ok := opts[optCursor]; ok {
Expand Down Expand Up @@ -205,19 +209,24 @@ func changefeedPlanHook(
return distChangefeedFlow(ctx, p, 0 /* jobID */, details, progress, resultsCh)
}

settings := p.ExecCfg().Settings
if err := utilccl.CheckEnterpriseEnabled(
p.ExecCfg().Settings, p.ExecCfg().ClusterID(), p.ExecCfg().Organization(), "CHANGEFEED",
settings, p.ExecCfg().ClusterID(), p.ExecCfg().Organization(), "CHANGEFEED",
); err != nil {
return err
}

if details, err = validateDetails(details); err != nil {
return err
}

// In the case where a user is executing a CREATE CHANGEFEED and is still
// waiting for the statement to return, we take the opportunity to ensure
// that the user has not made any obvious errors when specifying the sink in
// the CREATE CHANGEFEED statement. To do this, we create a "canary" sink,
// which will be immediately closed, only to check for errors.
{
canarySink, err := getSink(sinkURI, opts, targets)
canarySink, err := getSink(details.SinkURI, details.Opts, details.Targets, settings)
if err != nil {
// In this context, we don't want to retry even retryable errors from the
// sync. Unwrap any retryable errors encountered.
Expand All @@ -236,7 +245,7 @@ func changefeedPlanHook(
// hooked up to resultsCh to avoid a bunch of extra plumbing.
startedCh := make(chan tree.Datums)
job, errCh, err := p.ExecCfg().JobRegistry.StartJob(ctx, startedCh, jobs.Record{
Description: changefeedJobDescription(changefeedStmt, sinkURI, opts),
Description: jobDescription,
Username: p.User(),
DescriptorIDs: func() (sqlDescIDs []sqlbase.ID) {
for _, desc := range targetDescs {
Expand Down Expand Up @@ -310,6 +319,8 @@ func validateDetails(details jobspb.ChangefeedDetails) (jobspb.ChangefeedDetails
details.Opts[optEnvelope] = string(optEnvelopeRow)
case optEnvelopeKeyOnly:
details.Opts[optEnvelope] = string(optEnvelopeKeyOnly)
case optEnvelopeValueOnly:
details.Opts[optEnvelope] = string(optEnvelopeValueOnly)
case optEnvelopeDiff:
return jobspb.ChangefeedDetails{}, errors.Errorf(
`%s=%s is not yet supported`, optEnvelope, optEnvelopeDiff)
Expand Down
17 changes: 17 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ func TestChangefeedEnvelope(t *testing.T) {
defer foo.Close(t)
assertPayloads(t, foo, []string{`foo: [1]->`})
})
t.Run(`envelope=value_only`, func(t *testing.T) {
foo := f.Feed(t, `CREATE CHANGEFEED FOR foo WITH envelope='value_only'`)
defer foo.Close(t)
assertPayloads(t, foo, []string{`foo: ->{"a": 1, "b": "a"}`})
})
}

t.Run(`sinkless`, sinklessTest(testFn))
Expand Down Expand Up @@ -1382,6 +1387,18 @@ func TestChangefeedErrors(t *testing.T) {
t, `schema_topic is not yet supported`,
`CREATE CHANGEFEED FOR foo INTO $1`, `kafka://nope/?schema_topic=foo`,
)

// The cloudStorageSink is particular about the options it will work with.
sqlDB.ExpectErr(
t, `this sink is incompatible with format=experimental_avro`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH format='experimental_avro'`,
`experimental-nodelocal:///bar?bucket_size=0ns`,
)
sqlDB.ExpectErr(
t, `this sink is incompatible with envelope=key_only`,
`CREATE CHANGEFEED FOR foo INTO $1 WITH envelope='key_only'`,
`experimental-nodelocal:///bar?bucket_size=0ns`,
)
}

func TestChangefeedPermissions(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func newConfluentAvroEncoder(opts map[string]string) (*confluentAvroEncoder, err
// TODO(dan): Figure out what updated and resolved timestamps should
// look like with avro.
for _, opt := range []string{optUpdatedTimestamps, optResolvedTimestamps} {
if _, ok := opts[optUpdatedTimestamps]; ok {
if _, ok := opts[opt]; ok {
return nil, errors.Errorf(
`%s=%s is not yet compatible with %s`, optFormat, optFormatAvro, opt)
}
Expand Down
Loading

0 comments on commit 0c7e491

Please sign in to comment.