Skip to content

Commit

Permalink
changefeedccl: pubsub sink refactor to use batching sink
Browse files Browse the repository at this point in the history
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-13237

This change is a followup to
cockroachdb#99086 which moves the Pubsub sink
to the batching sink framework.

The changes involve:
1. Moves the Pubsub code to match the `SinkClient` interface, moving to using
the lower level v1 pubsub API that lets us publish batches manually
3. Removing the extra call to json.Marshal
4. Moving to using the `pstest` package for validating results in unit tests
5. Adding topic handling to the batching sink, where batches are created
per-topic
6. Added a pubsub_sink_config since it can now handle Retry and Flush config
settings

Release note (performance improvement): pubsub sink changefeeds can now support
higher throughputs by enabling the changefeed.new_pubsub_sink_enabled cluster
setting.
  • Loading branch information
samiskin committed Apr 4, 2023
1 parent c71b0ed commit e713202
Show file tree
Hide file tree
Showing 11 changed files with 730 additions and 269 deletions.
10 changes: 10 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_library(
"sink_external_connection.go",
"sink_kafka.go",
"sink_pubsub.go",
"sink_pubsub_v2.go",
"sink_sql.go",
"sink_webhook.go",
"sink_webhook_v2.go",
Expand Down Expand Up @@ -163,6 +164,8 @@ go_library(
"@com_github_shopify_sarama//:sarama",
"@com_github_xdg_go_scram//:scram",
"@com_google_cloud_go_pubsub//:pubsub",
"@com_google_cloud_go_pubsub//apiv1",
"@com_google_cloud_go_pubsub//apiv1/pubsubpb",
"@org_golang_google_api//impersonate",
"@org_golang_google_api//option",
"@org_golang_google_grpc//codes",
Expand Down Expand Up @@ -313,6 +316,13 @@ go_test(
"@com_github_shopify_sarama//:sarama",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_google_cloud_go_pubsub//:pubsub",
"@com_google_cloud_go_pubsub//apiv1",
"@com_google_cloud_go_pubsub//apiv1/pubsubpb",
"@com_google_cloud_go_pubsub//pstest",
"@org_golang_google_api//option",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//credentials/insecure",
"@org_golang_x_text//collate",
],
)
Expand Down
85 changes: 62 additions & 23 deletions pkg/ccl/changefeedccl/batching_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ import (
// into batches as they arrive and once ready are flushed out.
type SinkClient interface {
MakeResolvedPayload(body []byte, topic string) (SinkPayload, error)
MakeBatchBuffer() BatchBuffer
// Batches can only hold messages for one unique topic
MakeBatchBuffer(topic string) BatchBuffer
Flush(context.Context, SinkPayload) error
Close() error
}

// BatchBuffer is an interface to aggregate KVs into a payload that can be sent
// to the sink.
type BatchBuffer interface {
Append(key []byte, value []byte, topic string)
Append(key []byte, value []byte)
ShouldFlush() bool

// Once all data has been Append'ed, Close can be called to return a finalized
Expand Down Expand Up @@ -90,9 +91,9 @@ type flushReq struct {
}

type rowEvent struct {
key []byte
val []byte
topic string
key []byte
val []byte
topicDescriptor TopicDescriptor

alloc kvevent.Alloc
mvcc hlc.Timestamp
Expand Down Expand Up @@ -176,7 +177,7 @@ func (s *batchingSink) EmitRow(
payload := newRowEvent()
payload.key = key
payload.val = value
payload.topic = "" // unimplemented for now
payload.topicDescriptor = topic
payload.mvcc = mvcc
payload.alloc = alloc

Expand Down Expand Up @@ -277,7 +278,7 @@ func (sb *sinkBatch) Append(e *rowEvent) {
sb.bufferTime = timeutil.Now()
}

sb.buffer.Append(e.key, e.val, e.topic)
sb.buffer.Append(e.key, e.val)

sb.keys.Add(hashToInt(sb.hasher, e.key))
sb.numMessages += 1
Expand All @@ -296,17 +297,22 @@ func (s *batchingSink) handleError(err error) {
}
}

func (s *batchingSink) newBatchBuffer() *sinkBatch {
func (s *batchingSink) newBatchBuffer(topic string) *sinkBatch {
batch := newSinkBatch()
batch.buffer = s.client.MakeBatchBuffer()
batch.buffer = s.client.MakeBatchBuffer(topic)
batch.hasher = s.hasher
return batch
}

// runBatchingWorker combines 1 or more row events into batches, sending the IO
// requests out either once the batch is full or a flush request arrives.
func (s *batchingSink) runBatchingWorker(ctx context.Context) {
batchBuffer := s.newBatchBuffer()
// topicBatches stores per-topic sinkBatches which are flushed individually
// when one reaches its size limit, but are all flushed together if the
// frequency timer triggers. Messages for different topics cannot be allowed
// to be batched together as the data may need to end up at a specific
// endpoint for that topic.
topicBatches := make(map[string]*sinkBatch)

// Once finalized, batches are sent to a parallelIO struct which handles
// performing multiple Flushes in parallel while maintaining Keys() ordering.
Expand Down Expand Up @@ -347,14 +353,14 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
freeSinkBatchEvent(batch)
}

tryFlushBatch := func() error {
if batchBuffer.isEmpty() {
tryFlushBatch := func(topic string) error {
batchBuffer, ok := topicBatches[topic]
if !ok || batchBuffer.isEmpty() {
return nil
}
toFlush := batchBuffer
batchBuffer = s.newBatchBuffer()
topicBatches[topic] = s.newBatchBuffer(topic)

if err := toFlush.FinalizePayload(); err != nil {
if err := batchBuffer.FinalizePayload(); err != nil {
return err
}

Expand All @@ -364,7 +370,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
select {
case <-ctx.Done():
return ctx.Err()
case ioEmitter.requestCh <- toFlush:
case ioEmitter.requestCh <- batchBuffer:
case result := <-ioEmitter.resultCh:
handleResult(result)
continue
Expand All @@ -376,8 +382,22 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
return nil
}

flushAll := func() error {
for topic := range topicBatches {
if err := tryFlushBatch(topic); err != nil {
return err
}
}
return nil
}

// flushTimer is used to ensure messages do not remain batched longer than a
// given timeout. Every minFlushFrequency seconds after the first event for
// any topic has arrived, batches for all topics are flushed out immediately
// and the timer once again waits for the first message to arrive.
flushTimer := s.ts.NewTimer()
defer flushTimer.Stop()
isTimerPending := false

for {
select {
Expand All @@ -396,11 +416,29 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {

inflight += 1

// If we're about to append to an empty batch, start the timer to
// guarantee the messages do not stay buffered longer than the
// configured frequency.
if batchBuffer.isEmpty() && s.minFlushFrequency > 0 {
var topic string
var err error
if s.topicNamer != nil {
topic, err = s.topicNamer.Name(r.topicDescriptor)
if err != nil {
s.handleError(err)
continue
}
}

// If the timer isn't pending then this message is the first message to
// arrive either ever or since the timer last triggered a flush,
// therefore we're going from 0 messages batched to 1, and should
// restart the timer.
if !isTimerPending && s.minFlushFrequency > 0 {
flushTimer.Reset(s.minFlushFrequency)
isTimerPending = true
}

batchBuffer, ok := topicBatches[topic]
if !ok {
batchBuffer = s.newBatchBuffer(topic)
topicBatches[topic] = batchBuffer
}

batchBuffer.Append(r)
Expand All @@ -414,7 +452,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {

if batchBuffer.buffer.ShouldFlush() {
s.metrics.recordSizeBasedFlush()
if err := tryFlushBatch(); err != nil {
if err := tryFlushBatch(topic); err != nil {
s.handleError(err)
}
}
Expand All @@ -423,7 +461,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
close(r.waiter)
} else {
sinkFlushWaiter = r.waiter
if err := tryFlushBatch(); err != nil {
if err := flushAll(); err != nil {
s.handleError(err)
}
}
Expand All @@ -434,7 +472,8 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
handleResult(result)
case <-flushTimer.Ch():
flushTimer.MarkRead()
if err := tryFlushBatch(); err != nil {
isTimerPending = false
if err := flushAll(); err != nil {
s.handleError(err)
}
case <-ctx.Done():
Expand Down
10 changes: 9 additions & 1 deletion pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ const (

// OptKafkaSinkConfig is a JSON configuration for kafka sink (kafkaSinkConfig).
OptKafkaSinkConfig = `kafka_sink_config`
OptPubsubSinkConfig = `pubsub_sink_config`
OptWebhookSinkConfig = `webhook_sink_config`

// OptSink allows users to alter the Sink URI of an existing changefeed.
Expand Down Expand Up @@ -333,6 +334,7 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{
OptProtectDataFromGCOnPause: flagOption,
OptExpirePTSAfter: durationOption.thatCanBeZero(),
OptKafkaSinkConfig: jsonOption,
OptPubsubSinkConfig: jsonOption,
OptWebhookSinkConfig: jsonOption,
OptWebhookAuthHeader: stringOption,
OptWebhookClientTimeout: durationOption,
Expand Down Expand Up @@ -369,7 +371,7 @@ var CloudStorageValidOptions = makeStringSet(OptCompression)
var WebhookValidOptions = makeStringSet(OptWebhookAuthHeader, OptWebhookClientTimeout, OptWebhookSinkConfig)

// PubsubValidOptions is options exclusive to pubsub sink
var PubsubValidOptions = makeStringSet()
var PubsubValidOptions = makeStringSet(OptPubsubSinkConfig)

// ExternalConnectionValidOptions is options exclusive to the external
// connection sink.
Expand Down Expand Up @@ -888,6 +890,12 @@ func (s StatementOptions) GetKafkaConfigJSON() SinkSpecificJSONConfig {
return s.getJSONValue(OptKafkaSinkConfig)
}

// GetPubsubConfigJSON returns arbitrary json to be interpreted
// by the pubsub sink.
func (s StatementOptions) GetPubsubConfigJSON() SinkSpecificJSONConfig {
return s.getJSONValue(OptPubsubSinkConfig)
}

// GetResolvedTimestampInterval gets the best-effort interval at which resolved timestamps
// should be emitted. Nil or 0 means emit as often as possible. False means do not emit at all.
// Returns an error for negative or invalid duration value.
Expand Down
42 changes: 38 additions & 4 deletions pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,16 @@ var NewWebhookSinkEnabled = settings.RegisterBoolSetting(
util.ConstantWithMetamorphicTestBool("changefeed.new_webhook_sink_enabled", false),
)

// NewPubsubSinkEnabled determines whether or not the refactored Webhook sink
// or the deprecated sink should be used.
var NewPubsubSinkEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"changefeed.new_pubsub_sink_enabled",
"if enabled, this setting enables a new implementation of the pubsub sink"+
" that allows for a higher throughput",
util.ConstantWithMetamorphicTestBool("changefeed.new_pubsub_sink_enabled", false),
)

func getSink(
ctx context.Context,
serverCfg *execinfra.ServerConfig,
Expand Down Expand Up @@ -239,8 +249,16 @@ func getSink(
})
}
case isPubsubSink(u):
// TODO: add metrics to pubsubsink
return MakePubsubSink(ctx, u, encodingOpts, AllTargets(feedCfg), opts.IsSet(changefeedbase.OptUnordered))
var testingKnobs *TestingKnobs
if knobs, ok := serverCfg.TestingKnobs.Changefeed.(*TestingKnobs); ok {
testingKnobs = knobs
}
if NewPubsubSinkEnabled.Get(&serverCfg.Settings.SV) {
return makePubsubSink(ctx, u, encodingOpts, opts.GetPubsubConfigJSON(), AllTargets(feedCfg), opts.IsSet(changefeedbase.OptUnordered),
numSinkIOWorkers(serverCfg), newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{}, metricsBuilder, testingKnobs)
} else {
return makeDeprecatedPubsubSink(ctx, u, encodingOpts, AllTargets(feedCfg), opts.IsSet(changefeedbase.OptUnordered), metricsBuilder, testingKnobs)
}
case isCloudStorageSink(u):
return validateOptionsAndMakeSink(changefeedbase.CloudStorageValidOptions, func() (Sink, error) {
// Placeholder id for canary sink
Expand Down Expand Up @@ -737,11 +755,11 @@ func defaultRetryConfig() retry.Options {
}

func getSinkConfigFromJson(
jsonStr changefeedbase.SinkSpecificJSONConfig,
jsonStr changefeedbase.SinkSpecificJSONConfig, baseConfig sinkJSONConfig,
) (batchCfg sinkBatchConfig, retryCfg retry.Options, err error) {
retryCfg = defaultRetryConfig()

var cfg = sinkJSONConfig{}
var cfg = baseConfig
cfg.Retry.Max = jsonMaxRetries(retryCfg.MaxRetries)
cfg.Retry.Backoff = jsonDuration(retryCfg.InitialBackoff)
if jsonStr != `` {
Expand Down Expand Up @@ -854,6 +872,22 @@ func nilPacerFactory() *admission.Pacer {
return nil
}

func shouldFlushBatch(bytes int, messages int, config sinkBatchConfig) bool {
switch {
// all zero values is interpreted as flush every time
case config.Messages == 0 && config.Bytes == 0 && config.Frequency == 0:
return true
// messages threshold has been reached
case config.Messages > 0 && messages >= config.Messages:
return true
// bytes threshold has been reached
case config.Bytes > 0 && bytes >= config.Bytes:
return true
default:
return false
}
}

func sinkSupportsConcurrentEmits(sink EventSink) bool {
_, ok := sink.(*batchingSink)
return ok
Expand Down
Loading

0 comments on commit e713202

Please sign in to comment.