Skip to content

Commit

Permalink
changefeedccl: pubsub sink refactor to use batching sink
Browse files Browse the repository at this point in the history
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-13237

This change is a followup to
cockroachdb#99086 which moves the Pubsub sink
to the batching sink framework.

The changes involve:
1. Moves the Pubsub code to match the `SinkClient` interface, moving to using
the lower level v1 pubsub API that lets us publish batches manually
3. Removing the extra call to json.Marshal
4. Moving to using the `pstest` package for validating results in unit tests
5. Adding topic handling to the batching sink, where batches are created
per-topic
6. Added a pubsub_sink_config since it can now handle Retry and Flush config
settings

Release note (performance improvement): pubsub sink changefeeds can now support
higher throughputs by enabling the changefeed.new_pubsub_sink_enabled cluster
setting.
  • Loading branch information
samiskin committed Apr 6, 2023
1 parent 7d21940 commit 699b7f4
Show file tree
Hide file tree
Showing 12 changed files with 815 additions and 232 deletions.
10 changes: 10 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_library(
"sink_external_connection.go",
"sink_kafka.go",
"sink_pubsub.go",
"sink_pubsub_v2.go",
"sink_sql.go",
"sink_webhook.go",
"sink_webhook_v2.go",
Expand Down Expand Up @@ -163,6 +164,8 @@ go_library(
"@com_github_shopify_sarama//:sarama",
"@com_github_xdg_go_scram//:scram",
"@com_google_cloud_go_pubsub//:pubsub",
"@com_google_cloud_go_pubsub//apiv1",
"@com_google_cloud_go_pubsub//apiv1/pubsubpb",
"@org_golang_google_api//impersonate",
"@org_golang_google_api//option",
"@org_golang_google_grpc//codes",
Expand Down Expand Up @@ -292,6 +295,7 @@ go_test(
"//pkg/util/log/eventpb",
"//pkg/util/mon",
"//pkg/util/protoutil",
"//pkg/util/randident",
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/span",
Expand All @@ -313,6 +317,12 @@ go_test(
"@com_github_shopify_sarama//:sarama",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_google_cloud_go_pubsub//apiv1",
"@com_google_cloud_go_pubsub//apiv1/pubsubpb",
"@com_google_cloud_go_pubsub//pstest",
"@org_golang_google_api//option",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//credentials/insecure",
"@org_golang_x_text//collate",
],
)
Expand Down
85 changes: 62 additions & 23 deletions pkg/ccl/changefeedccl/batching_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ import (
// into batches as they arrive and once ready are flushed out.
type SinkClient interface {
MakeResolvedPayload(body []byte, topic string) (SinkPayload, error)
MakeBatchBuffer() BatchBuffer
// Batches can only hold messages for one unique topic
MakeBatchBuffer(topic string) BatchBuffer
Flush(context.Context, SinkPayload) error
Close() error
}

// BatchBuffer is an interface to aggregate KVs into a payload that can be sent
// to the sink.
type BatchBuffer interface {
Append(key []byte, value []byte, topic string)
Append(key []byte, value []byte)
ShouldFlush() bool

// Once all data has been Append'ed, Close can be called to return a finalized
Expand Down Expand Up @@ -90,9 +91,9 @@ type flushReq struct {
}

type rowEvent struct {
key []byte
val []byte
topic string
key []byte
val []byte
topicDescriptor TopicDescriptor

alloc kvevent.Alloc
mvcc hlc.Timestamp
Expand Down Expand Up @@ -176,7 +177,7 @@ func (s *batchingSink) EmitRow(
payload := newRowEvent()
payload.key = key
payload.val = value
payload.topic = "" // unimplemented for now
payload.topicDescriptor = topic
payload.mvcc = mvcc
payload.alloc = alloc

Expand Down Expand Up @@ -277,7 +278,7 @@ func (sb *sinkBatch) Append(e *rowEvent) {
sb.bufferTime = timeutil.Now()
}

sb.buffer.Append(e.key, e.val, e.topic)
sb.buffer.Append(e.key, e.val)

sb.keys.Add(hashToInt(sb.hasher, e.key))
sb.numMessages += 1
Expand All @@ -296,17 +297,22 @@ func (s *batchingSink) handleError(err error) {
}
}

func (s *batchingSink) newBatchBuffer() *sinkBatch {
func (s *batchingSink) newBatchBuffer(topic string) *sinkBatch {
batch := newSinkBatch()
batch.buffer = s.client.MakeBatchBuffer()
batch.buffer = s.client.MakeBatchBuffer(topic)
batch.hasher = s.hasher
return batch
}

// runBatchingWorker combines 1 or more row events into batches, sending the IO
// requests out either once the batch is full or a flush request arrives.
func (s *batchingSink) runBatchingWorker(ctx context.Context) {
batchBuffer := s.newBatchBuffer()
// topicBatches stores per-topic sinkBatches which are flushed individually
// when one reaches its size limit, but are all flushed together if the
// frequency timer triggers. Messages for different topics cannot be allowed
// to be batched together as the data may need to end up at a specific
// endpoint for that topic.
topicBatches := make(map[string]*sinkBatch)

// Once finalized, batches are sent to a parallelIO struct which handles
// performing multiple Flushes in parallel while maintaining Keys() ordering.
Expand Down Expand Up @@ -347,14 +353,14 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
freeSinkBatchEvent(batch)
}

tryFlushBatch := func() error {
if batchBuffer.isEmpty() {
tryFlushBatch := func(topic string) error {
batchBuffer, ok := topicBatches[topic]
if !ok || batchBuffer.isEmpty() {
return nil
}
toFlush := batchBuffer
batchBuffer = s.newBatchBuffer()
topicBatches[topic] = s.newBatchBuffer(topic)

if err := toFlush.FinalizePayload(); err != nil {
if err := batchBuffer.FinalizePayload(); err != nil {
return err
}

Expand All @@ -364,7 +370,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
select {
case <-ctx.Done():
return ctx.Err()
case ioEmitter.requestCh <- toFlush:
case ioEmitter.requestCh <- batchBuffer:
case result := <-ioEmitter.resultCh:
handleResult(result)
continue
Expand All @@ -376,8 +382,22 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
return nil
}

flushAll := func() error {
for topic := range topicBatches {
if err := tryFlushBatch(topic); err != nil {
return err
}
}
return nil
}

// flushTimer is used to ensure messages do not remain batched longer than a
// given timeout. Every minFlushFrequency seconds after the first event for
// any topic has arrived, batches for all topics are flushed out immediately
// and the timer once again waits for the first message to arrive.
flushTimer := s.ts.NewTimer()
defer flushTimer.Stop()
isTimerPending := false

for {
select {
Expand All @@ -396,11 +416,29 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {

inflight += 1

// If we're about to append to an empty batch, start the timer to
// guarantee the messages do not stay buffered longer than the
// configured frequency.
if batchBuffer.isEmpty() && s.minFlushFrequency > 0 {
var topic string
var err error
if s.topicNamer != nil {
topic, err = s.topicNamer.Name(r.topicDescriptor)
if err != nil {
s.handleError(err)
continue
}
}

// If the timer isn't pending then this message is the first message to
// arrive either ever or since the timer last triggered a flush,
// therefore we're going from 0 messages batched to 1, and should
// restart the timer.
if !isTimerPending && s.minFlushFrequency > 0 {
flushTimer.Reset(s.minFlushFrequency)
isTimerPending = true
}

batchBuffer, ok := topicBatches[topic]
if !ok {
batchBuffer = s.newBatchBuffer(topic)
topicBatches[topic] = batchBuffer
}

batchBuffer.Append(r)
Expand All @@ -414,7 +452,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {

if batchBuffer.buffer.ShouldFlush() {
s.metrics.recordSizeBasedFlush()
if err := tryFlushBatch(); err != nil {
if err := tryFlushBatch(topic); err != nil {
s.handleError(err)
}
}
Expand All @@ -423,7 +461,7 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
close(r.waiter)
} else {
sinkFlushWaiter = r.waiter
if err := tryFlushBatch(); err != nil {
if err := flushAll(); err != nil {
s.handleError(err)
}
}
Expand All @@ -434,7 +472,8 @@ func (s *batchingSink) runBatchingWorker(ctx context.Context) {
handleResult(result)
case <-flushTimer.Ch():
flushTimer.MarkRead()
if err := tryFlushBatch(); err != nil {
isTimerPending = false
if err := flushAll(); err != nil {
s.handleError(err)
}
case <-ctx.Done():
Expand Down
41 changes: 41 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/randident"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -8366,3 +8367,43 @@ func TestChangefeedExecLocality(t *testing.T) {
test(t, "x", "x=0", []bool{true, true, false, false})
test(t, "y", "y=1", []bool{false, true, false, true})
}

func TestChangefeedTopicNames(t *testing.T) {
defer leaktest.AfterTest(t)()
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
rand, _ := randutil.NewTestRand()
cfg := randident.DefaultNameGeneratorConfig()
cfg.Noise = true
cfg.Finalize()
ng := randident.NewNameGenerator(&cfg, rand, "table")

names, _ := ng.GenerateMultiple(context.Background(), 100, make(map[string]struct{}))

var escapedNames []string
for _, name := range names {
escapedNames = append(escapedNames, strings.ReplaceAll(name, `"`, `""`))
}

sqlDB := sqlutils.MakeSQLRunner(s.DB)
for _, name := range escapedNames {
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE "%s" (a INT PRIMARY KEY);`, name))
sqlDB.Exec(t, fmt.Sprintf(`INSERT INTO "%s" VALUES (1);`, name))
}

var quotedNames []string
for _, name := range escapedNames {
quotedNames = append(quotedNames, "\""+name+"\"")
}
createStmt := fmt.Sprintf(`CREATE CHANGEFEED FOR %s`, strings.Join(quotedNames, ", "))
foo := feed(t, f, createStmt)
defer closeFeed(t, foo)

var expected []string
for _, name := range names {
expected = append(expected, fmt.Sprintf(`%s: [1]->{"after": {"a": 1}}`, name))
}
assertPayloads(t, foo, expected)
}

cdcTest(t, testFn, feedTestForceSink("pubsub"))
}
10 changes: 9 additions & 1 deletion pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ const (

// OptKafkaSinkConfig is a JSON configuration for kafka sink (kafkaSinkConfig).
OptKafkaSinkConfig = `kafka_sink_config`
OptPubsubSinkConfig = `pubsub_sink_config`
OptWebhookSinkConfig = `webhook_sink_config`

// OptSink allows users to alter the Sink URI of an existing changefeed.
Expand Down Expand Up @@ -333,6 +334,7 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{
OptProtectDataFromGCOnPause: flagOption,
OptExpirePTSAfter: durationOption.thatCanBeZero(),
OptKafkaSinkConfig: jsonOption,
OptPubsubSinkConfig: jsonOption,
OptWebhookSinkConfig: jsonOption,
OptWebhookAuthHeader: stringOption,
OptWebhookClientTimeout: durationOption,
Expand Down Expand Up @@ -369,7 +371,7 @@ var CloudStorageValidOptions = makeStringSet(OptCompression)
var WebhookValidOptions = makeStringSet(OptWebhookAuthHeader, OptWebhookClientTimeout, OptWebhookSinkConfig)

// PubsubValidOptions is options exclusive to pubsub sink
var PubsubValidOptions = makeStringSet()
var PubsubValidOptions = makeStringSet(OptPubsubSinkConfig)

// ExternalConnectionValidOptions is options exclusive to the external
// connection sink.
Expand Down Expand Up @@ -888,6 +890,12 @@ func (s StatementOptions) GetKafkaConfigJSON() SinkSpecificJSONConfig {
return s.getJSONValue(OptKafkaSinkConfig)
}

// GetPubsubConfigJSON returns arbitrary json to be interpreted
// by the pubsub sink.
func (s StatementOptions) GetPubsubConfigJSON() SinkSpecificJSONConfig {
return s.getJSONValue(OptPubsubSinkConfig)
}

// GetResolvedTimestampInterval gets the best-effort interval at which resolved timestamps
// should be emitted. Nil or 0 means emit as often as possible. False means do not emit at all.
// Returns an error for negative or invalid duration value.
Expand Down
Loading

0 comments on commit 699b7f4

Please sign in to comment.