-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
release-23.1: changefeedccl: webhook sink rewrite #100639
Merged
Merged
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
The existing webhook sink roachtest code was set up to only function locally. This change mirrors the cdc-webhook-sink-test-server code to set up a server that other roachtest nodes can connect to. Release note: None
Resolves #84676 Epic: https://cockroachlabs.atlassian.net/browse/CRDB-11356 This PR implements the Webhook sink as part of a more general `batchingSink` framework that can be used to make adding new sinks an easier process, making it far more performant than it was previously. A followup PR will be made to use the `batchingSink` for the pubsub client which also suffers performance issues. --- Sink-specific code is encapsulated in a SinkClient interface ```go type SinkClient interface { MakeResolvedPayload(body []byte, topic string) (SinkPayload, error) MakeBatchBuffer() BatchBuffer Flush(context.Context, SinkPayload) error Close() error } type BatchBuffer interface { Append(key []byte, value []byte, topic string) ShouldFlush() bool Close() (SinkPayload, error) } type SinkPayload interface{} ``` Once the Batch is ready to be Flushed, the buffer can be `Close()`'d to do any final formatting (ex: wrap in a json object with extra metadata) of the buffer-able data and obtain a final `SinkPayload` that is ready to be passed to `SinkClient.Flush`. The `SinkClient` has a separate `MakeResolvedPayload` since the sink may require resolved events be formatted differently to a batch of kvs. `Flush(ctx, payload)` encapsulates sending a blocking IO request to the sink endpoint, and may be called multiple times with the same payload due to retries. Any kind of formatting work should be served to run in the buffer's `Close` and stored as a `SinkPayload` to avoid multiple calls to `Flush` repeating work upon retries. --- The `batchingSink` handles all the logic to take a SinkClient and form a full Sink implementation. ```go type batchingSink struct { client SinkClient ioWorkers int minFlushFrequency time.Duration retryOpts retry.Options eventCh chan interface{} pacer *admission.Pacer ... } var _ Sink = (*batchingSink)(nil) ``` It involves a single goroutine which handles: - Creating, building up, and finalizing `BatchBuffer`s to eventually form a `SinkPayload` to emit - Flushing batches when they have persisted longer than a configured `minFlushFrequency` - Flushing deliberately and being able to block until the Flush has completed - Logging all the various sink metrics `EmitRow` calls are thread-safe therefore the use of the `safeSink` wrapper is not required for users of this sink. Events sent through the goroutines would normally need to exist on the heap, but to avoid excessive garbage collection of hundreds of thousands of tiny structs, both the `kvEvents{<data from EmitRow>}` events (sent from the EmitRow caller to the batching wokrer) and the `sinkBatchBuffer{<data about the batch>}` events (sent from the batching worker to the IO routine in the next section) are allocated on object pools. --- For a sink like Cloudstorage where there are large batches, doing the above and just one-by-one flushing the batch payloads on a separate routine is plenty good enough. Unfortunately the Webhook sink can be used with no batching at all with users wanting the lowest latency while still having good throughput. This means we need to be able to have multiple requests in flight. The difficulty here is if a batch with keys [a1,b1] is in flight, a batch with keys [b2,c1] needs to block until [a1,b1] completes as b2 cannot be sent and risk arriving at the destination prior to b1. Flushing out Payloads in a way that is both able to maintain key-ordering guarantees but is able to run in parallel is done by a separate `parallel_io` struct. ```go type parallelIO struct { retryOpts retry.Options ioHandler IOHandler requestCh chan IORequest resultCh chan *ioResult ... } type IOHandler func(context.Context, IORequest) error type IORequest interface { Keys() intsets.Fast } type ioResult struct { request IORequest err error } ``` It involves one goroutine to manage the key ordering guarantees and a configurable number of IO Worker goroutines that simply call `ioHandler` on an `IORequest`. IORequests represent the keys they shouldn't conflict on by providing a `intsets.Fast` struct, which allows for efficient Union/Intersects/Difference operations on them that `parallelIO` needs to maintain ordering guarantees. The request and its error (if one occured despite the retries) are returned on resultCh. --- The webhook sink is therefore formed by: 1. EmitRow is called, creating kvEvents that are sent to a Batching worker 2. The batching worker takes events and appends them to a batch 3. Once the batch is full, its encoded into an HTTP request 4. The request object is then sharded across a set of IO workers to be fully sent out in parallel with other non-key-conflicting requests. With this setup, looking at the CPU flamegraph, at high throughputs most of the `batchingSink`/`parallelIO` work didn't really show up much, the work was largely just step 3, where taking a list of messages and calling `json.Marshal` on it took almost 10% of the time, specifically a call to `json.Compress`. Since this isn't needed, and all we're doing is simply putting a list of already-formatted JSON messages into a surrounding JSON array and small object, I also swapped `json.Marshal` to just stitch together characters manually into a buffer. --- Since Matt's talked about a new significance being placed on Feature flagging new work to avoid need for technical advisories, I placed this new implementation under the changefeed.new_webhook_sink_enabled setting and defaulted it to be disabled. --- Release note (performance improvement): the webhook sink is now able to handle a drastically higher maximum throughput by enabling the "changefeed.new_webhook_sink_enabled" cluster setting.
blathers-crl
bot
force-pushed
the
blathers/backport-release-23.1-99086
branch
from
April 4, 2023 20:27
415c8fb
to
0a08140
Compare
blathers-crl
bot
requested review from
smg260 and
renatolabs
and removed request for
a team
April 4, 2023 20:27
blathers-crl
bot
force-pushed
the
blathers/backport-release-23.1-99086
branch
2 times, most recently
from
April 4, 2023 20:27
0942512
to
f09cb0e
Compare
Thanks for opening a backport. Please check the backport criteria before merging:
If some of the basic criteria cannot be satisfied, ensure that the exceptional criteria are satisfied within.
Add a brief release justification to the body of your PR to justify this backport. Some other things to consider:
|
blathers-crl
bot
requested review from
shermanCRL
and removed request for
a team
April 4, 2023 20:27
blathers-crl
bot
added
blathers-backport
This is a backport that Blathers created automatically.
O-robot
Originated from a bot.
labels
Apr 4, 2023
miretskiy
approved these changes
Apr 4, 2023
this setting seems to be missing docs at https://www.cockroachlabs.com/docs/v23.1/cluster-settings |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Labels
blathers-backport
This is a backport that Blathers created automatically.
O-robot
Originated from a bot.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Backport 2/2 commits from #99086 on behalf of @samiskin.
/cc @cockroachdb/release
Resolves #84676
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-11356
This PR implements the Webhook sink as part of a more general
batchingSink
framework that can be used to make adding new sinks an easier process, making it far more performant than it was previously.A followup PR will be made to use the
batchingSink
for the pubsub client which also suffers performance issues.Sink-specific code is encapsulated in a SinkClient interface
Once the Batch is ready to be Flushed, the writer can be
Close()
'd to do any final formatting (ex: wrap in a json object with extra metadata) of the buffer-able data and obtain a finalSinkPayload
that is ready to be passed toSinkClient.Flush
.The
SinkClient
has a separateMakeResolvedPayload
since the sink may require resolved events be formatted differently to a batch of kvs.Flush(ctx, payload)
encapsulates sending a blocking IO request to the sink endpoint, and may be called multiple times with the same payload due to retries. Any kind of formatting work should be served to run in the buffer'sClose
and stored as aSinkPayload
to avoid multiple calls toFlush
repeating work upon retries.The
batchingSink
handles all the logic to take a SinkClient and form a full Sink implementation.It involves a single goroutine which handles:
BatchWriter
s to eventually form aSinkPayload
to emitminFlushFrequency
EmitRow
calls are thread-safe therefore the use of thesafeSink
wrapper is not required for users of this sink.Events sent through the goroutines would normally need to exist on the heap, but to avoid excessive garbage collection of hundreds of thousands of tiny structs, both the
kvEvents{<data from EmitRow>}
events (sent from the EmitRow caller to the batching worker) and thesinkBatchBuffer{<data about the batch>}
events (sent from the batching worker to the IO routine in the next section) are allocated on object pools.For a sink like Cloudstorage where there are large batches, doing the above and just one-by-one flushing the batch payloads on a separate routine is plenty good enough. Unfortunately the Webhook sink can be used with no batching at all with users wanting the lowest latency while still having good throughput. This means we need to be able to have multiple requests in flight. The difficulty here is if a batch with keys [a1,b1] is in flight, a batch with keys [b2,c1] needs to block until [a1,b1] completes as b2 cannot be sent and risk arriving at the destination prior to b1.
Flushing out Payloads in a way that is both able to maintain key-ordering guarantees but is able to run in parallel is done by a separate
parallel_io
struct.It involves one goroutine to manage the key ordering guarantees and a configurable number of IO Worker goroutines that simply call
ioHandler
on anIORequest
.IORequests represent the keys they shouldn't conflict on by providing a
intsets.Fast
struct, which allows for efficient Union/Intersects/Difference operations on them thatparallelIO
needs to maintain ordering guarantees.Requests are received as IORequests and the response is also returned as an IORequest. This way the parallelIO struct does not have to do any heap allocations to communicate, the user of it can manage creating and freeing these objects in pools. The only heap allocations that occur are part of the
intset
operations as it uses a linkedlist internally.The webhook sink is therefore formed by:
With this setup, looking at the CPU flamegraph, at high throughputs most of the
batchingSink
/parallelIO
work didn't really show up much, the work was largely just step 3, where taking a list of messages and callingjson.Marshal
on it took almost 10% of the time, specifically a call tojson.Compress
.Since this isn't needed, and all we're doing is simply putting a list of already-formatted JSON messages into a surrounding JSON array and small object, I also swapped
json.Marshal
to just stitch together characters manually into a buffer.In the following flamegraph of a node at around 35% CPU usage, only 5.56% of the total cputime in the graph (three small chunks between the parallelEventConsumer chunk and the kvevent chunk) is taken up by the paralelIO and batchingSink workers. This is with batch size of 100.
The max CPU usage here was around 37% with a max throughput of 135k for a single node (the other nodes had run out of data at this point). Since the majority of the flamegraph shows time spent in the event processing code I'm going to assume this will be handled by the pacer and won't be much of an issue.
In the above flamegraph
runtime.gcDrain
does show up using 10.55% cpu but when trying the cloudstorage sink it had around the same value. I'm guessing this means there isn't an extra gc thrashing issue. I believe the only non-pool allocations that occur are the intsets.Since Matt's talked about a new significance being placed on Feature flagging new work to avoid need for technical advisories, I placed this new implementation under the changefeed.new_webhook_sink_enabled setting and defaulted it to be disabled.
Right now its
sink_webhook_v2
just to keepsink_webhook.go
unchanged so that this review is easier to do. I may movesink_webhook
todeprecated_sink_webhook
andsink_webhook_v2
to just besink_webhook
prior to merging.Release note (performance improvement): the webhook sink is now able to handle a drastically higher maximum throughput by enabling the "changefeed.new_webhook_sink_enabled" cluster setting.
Release justification: feature flagged and default off, while being a substantial improvement