Skip to content

Commit

Permalink
changefeedccl: webhook sink rewrite for increased throughput
Browse files Browse the repository at this point in the history
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-11356

This PR addresses two existing issues:
1. The unbatched webhook sink's maximum throughput is incredibly low,
   600-800 messages per second for an n1-standard-16 3 node cluster.
   Batching increases throughput however it is still far below other
   sinks (batching every 500 messages results can result a 20k mps
   throughput while kafka on the same cluster and data does 212k).
2. The addition of new sinks is difficult as many would have to
   re-implement their own parallelism/batching/retrying logic.

---

The reason for 1, why the sink is so much slower than other sinks, is
for two main reasons:
a) While the actual sending of http requests is spread across multiple
  worker goroutines, taking requests and batching them together is done
  *synchronously* in a single batching worker.  Once that batch is full
  the messages within it are then sharded out to the workers, each
  forming their own arbitrarily smaller batches, which the batching
  worker then blocks on until all requests have completed.  This means
  in the default batch-size-1 case the sink is essentially single
  threaded, forming a batch and blocking on it each time.
b) HTTP Requests are all sent to the same host and are being subject to
  the default transport MaxIdleConnsPerHost setting which is 2.  This
  means that at most 2 underlying TCP connections can be reused between
  requests, and the rest have to start entirely new ones, using up
  ports.

Fixing a) without b) results in a non-batched throughput of 1200-1800
with huge variation, also producing "cannot assign requested address"
errors.

Fixing b) alone by simply setting MaxIdleConnsPerHost to 100, the
default MaxIdleConns value, immediately increases the throughput of a
non-batched webhook sink from 800 to 48k.

Fixing the two together as is done in this PR increases the non-batched
throughput to 229k messages per second.  Without batching this
unfortunately does use a large amount of CPU, 70% in the test, therefore
batching is still recommended (with a batch size of 500 throughput
increased to 333k while cpu usage was at 25%).

For context on the throughput numbers, on the same data and cluster a
default Kafka sink puts out 214k messages per second at 21% cpu while a
default cloudstorage sink does 468k at 27% cpu.

---

The reason for 2 is that the parallelism/batching/retry/metrics logic
for the webhook and pubsub sinks are intertwined with their respective
sink-specific logic.

This PR splits out the sink-specific logic into a `ThinSink` interface
who's only responsibility is to encode batches of messages into payloads
to be sent and emit those payloads, and the
parallelism/batching/retry/metrics logic into a `sinkProcessor`.

This should allow for other simple sinks that fit a "just emit a batch
of messages somewhere" pattern to be added more easily with no
duplicated work.

Note that this PR only implements the sinkProcessor to the extent of the
needs of the Webhook sink.  A followup PR will be made to move the
Pubsub sink into this framework and will bring along further changes to
sinkProcessor to support multiple topics.

For testing this PR is still relying on the existing webhook sink tests,
however once sinkProcessor is extended to support pubsub as well the
tests will be refactored as well into sinkProcessor-specific tests and
then thin-sink-specific tests.

---

It is recommended that reviewers start by reading the sink_processor.go
file, followed by the sink_webhook file.

Release note (performance improvement): The Changefeed webhook sink has
been rewritten to support substantially larger throughputs, to a similar
magnitude as the kafka sink.
  • Loading branch information
samiskin committed Dec 5, 2022
1 parent 8a5cb51 commit b3d235e
Show file tree
Hide file tree
Showing 6 changed files with 688 additions and 563 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_library(
"sink_external_connection.go",
"sink_kafka.go",
"sink_kafka_connection.go",
"sink_processor.go",
"sink_pubsub.go",
"sink_sql.go",
"sink_webhook.go",
Expand Down
11 changes: 11 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,3 +279,14 @@ var EventConsumerElasticCPUControlEnabled = settings.RegisterBoolSetting(
"determines whether changefeed event processing integrates with elastic CPU control",
true,
)

// WebhookSinkWorkers specifies the number of workers to use to handle batching
// and emitting to a webhook sink.
var WebhookSinkWorkers = settings.RegisterIntSetting(
settings.TenantWritable,
"changefeed.webhook_sink_workers",
"the number of workers to use when emitting events to the sink: "+
"0 assigns a reasonable default, >0 assigns the setting value",
0,
settings.NonNegativeInt,
)
4 changes: 3 additions & 1 deletion pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,11 @@ func getSink(
if err != nil {
return nil, err
}

numWorkers := changefeedbase.WebhookSinkWorkers.Get(&serverCfg.Settings.SV)
return validateOptionsAndMakeSink(changefeedbase.WebhookValidOptions, func() (Sink, error) {
return makeWebhookSink(ctx, sinkURL{URL: u}, encodingOpts, webhookOpts,
defaultWorkerCount(), timeutil.DefaultTimeSource{}, metricsBuilder)
int(numWorkers), timeutil.DefaultTimeSource{}, metricsBuilder)
})
case isPubsubSink(u):
// TODO: add metrics to pubsubsink
Expand Down
Loading

0 comments on commit b3d235e

Please sign in to comment.