-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
changefeedccl: webhook sink rewrite for increased throughput #93032
Conversation
85a162e
to
86c5a30
Compare
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-11356 This PR addresses two existing issues: 1. The unbatched webhook sink's maximum throughput is incredibly low, 600-800 messages per second for an n1-standard-16 3 node cluster. Batching increases throughput however it is still far below other sinks (batching every 500 messages results can result a 20k mps throughput while kafka on the same cluster and data does 212k). 2. The addition of new sinks is difficult as many would have to re-implement their own parallelism/batching/retrying logic. --- The reason for 1, why the sink is so much slower than other sinks, is for two main reasons: a) While the actual sending of http requests is spread across multiple worker goroutines, taking requests and batching them together is done *synchronously* in a single batching worker. Once that batch is full the messages within it are then sharded out to the workers, each forming their own arbitrarily smaller batches, which the batching worker then blocks on until all requests have completed. This means in the default batch-size-1 case the sink is essentially single threaded, forming a batch and blocking on it each time. b) HTTP Requests are all sent to the same host and are being subject to the default transport MaxIdleConnsPerHost setting which is 2. This means that at most 2 underlying TCP connections can be reused between requests, and the rest have to start entirely new ones, using up ports. Fixing a) without b) results in a non-batched throughput of 1200-1800 with huge variation, also producing "cannot assign requested address" errors. Fixing b) alone by simply setting MaxIdleConnsPerHost to 100, the default MaxIdleConns value, immediately increases the throughput of a non-batched webhook sink from 800 to 48k. Fixing the two together as is done in this PR increases the non-batched throughput to 229k messages per second. Without batching this unfortunately does use a large amount of CPU, 70% in the test, therefore batching is still recommended (with a batch size of 500 throughput increased to 333k while cpu usage was at 25%). For context on the throughput numbers, on the same data and cluster a default Kafka sink puts out 214k messages per second at 21% cpu while a default cloudstorage sink does 468k at 27% cpu. --- The reason for 2 is that the parallelism/batching/retry/metrics logic for the webhook and pubsub sinks are intertwined with their respective sink-specific logic. This PR splits out the sink-specific logic into a `ThinSink` interface who's only responsibility is to encode batches of messages into payloads to be sent and emit those payloads, and the parallelism/batching/retry/metrics logic into a `sinkProcessor`. This should allow for other simple sinks that fit a "just emit a batch of messages somewhere" pattern to be added more easily with no duplicated work. Note that this PR only implements the sinkProcessor to the extent of the needs of the Webhook sink. A followup PR will be made to move the Pubsub sink into this framework and will bring along further changes to sinkProcessor to support multiple topics. For testing this PR is still relying on the existing webhook sink tests, however once sinkProcessor is extended to support pubsub as well the tests will be refactored as well into sinkProcessor-specific tests and then thin-sink-specific tests. --- It is recommended that reviewers start by reading the sink_processor.go file, followed by the sink_webhook file. Release note (performance improvement): The Changefeed webhook sink has been rewritten to support substantially larger throughputs, to a similar magnitude as the kafka sink.
86c5a30
to
b3d235e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @HonoreDB, @miretskiy, @samiskin, and @shermanCRL)
pkg/ccl/changefeedccl/sink_processor.go
line 297 at r1 (raw file):
if atomic.LoadInt32(&sp.closing) != 0 { sp.inFlight.Done() continue
I think we should return nil
here.
Also, is there a good reason to have sp.closing
not be a channel? It would be easier to reason about this code if we select
ed on a closed
channel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @HonoreDB, @jayshrivastava, @miretskiy, and @shermanCRL)
pkg/ccl/changefeedccl/sink_processor.go
line 297 at r1 (raw file):
Previously, jayshrivastava (Jayant Shrivastava) wrote…
I think we should
return nil
here.Also, is there a good reason to have
sp.closing
not be a channel? It would be easier to reason about this code if weselect
ed on aclosed
channel
I deliberately made it so that workers always drain every message out rather than exiting in the middle, since it makes it much easier to use the WaitGroup for tracking without risking it being incorrectly non-zero. Its a tradeoff between "You have to make sure you never exit without reducing inFlight appropriately" and either using locks on every message or making things a lot more racy (which is what I dealt with in my event_processor pr, if folks are ok with this draining method I'll likely move that PR to this method as well).
I like this "single flag for closing, just rely on the input channel closing to know when you're done" method a lot more, there isn't much to drain and the code feels a lot simpler to me.
Also I just realized I never added a buffer size to the input channels at all lol, so they'd drain immediately. Need to look at what perf is like when they are.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 4 of 6 files at r1.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @HonoreDB, @jayshrivastava, @samiskin, and @shermanCRL)
a discussion (no related file):
I have some concerns around the overall direction of the codebase.
Few things stand out as being suboptimal and/or messy (and not necessarily caused by this PR, but more of an evolutionary state).
- Our api is really messy. We create a "sink" (webhook) which isn't really a sink (because we actually create ThinSink); some calls go directly to the "real" sink (resolve messages), some don't; API is a multi step process (add messages), then emit them. What is the message? Well, that depends on the actual sink (SinkMessage)
- Parallelism is becoming more and more complex. We essentially have duplicated code between parallel processors (in event_processors) and here. Messages come into event_processor, they get split into N workers, then those N workers feed data into a single sink, then that sink may again split messages into M workers -- likely same as N but could be different. Logic to manage flushing, ordering, etc gets duplicated.
it feels to me that we should spend some time clarifying/cleaning this up. How? I'm not 100% sure on this tbh. Perhaps, batching stuff can be lifted all the way to event processing?
Perhaps we should split sink API. As implemented by this PR, the responsibility of webhookSink is rather limited. It sort of produces opaque thing (array of JSON), and then responsible for emitting that thing.
Perhaps the split might involve introduction of buffer abstraction. Say something like:
type sinkBuffer interface {
Append(MessagePayload...)
}
So, in a sense, webhook sink is a "sink buffer" that takes encoded messages, and appends it somewhere (in the case of webhook sink -- it's a JSON array).
Then, perhaps a sink is a combination of sinkBuffer + IO related methods:
type Sink interface {
sinkBuffer
ShouldFlush() bool
Flush()
}
This is just some ideas/mussings about the state of affairs. I would love to see if we can iterate more on the design
so that the API is more straightforward; ideally, data would flow 1 way: kv feed -> encoder -> bufffer -> sink
I'm sending my comments so far, though I'm sure there will be more.
pkg/ccl/changefeedccl/sink_processor.go
line 33 at r1 (raw file):
// EmitPayload potentially being retried multiple times with the same payload if // errors are observed. type ThinSink interface {
Besides the name being somewhat of a tongue twister, I'm just not a fan
of this name. The interface name does not convey in any way what it does.
And, of course, thin is in the eye of a beholder, and batching doesn't
appear to be very thing to me.
Reading up until this point, it feels like this interface is responsible for batching...
And therefore, perhaps Batcher
might be a better name?
Most likely none of this needs to be exported?
Just putting this comment now -- while I read the rest of the code: Is this really an interface?
Do we plan on having different implementations?
Coming back to the above question, I'm not sure whether if we will have different implementations
(e.g. kafka, webhook, etc) -- but it does appear that it might make sense to have a "pass through" implementation
that foregoes overhead of workers, channels, etc when batch size is 1?
pkg/ccl/changefeedccl/sink_processor.go
line 37 at r1 (raw file):
EncodeResolvedMessage(ResolvedMessagePayload) (SinkPayload, error) EmitPayload(SinkPayload) error Close() error
exported interface methods should have a comment.
pkg/ccl/changefeedccl/sink_processor.go
line 73 at r1 (raw file):
// their worker as no-ops rather than terminating workers immediately, as this // makes the race-condition-free code easier to reason about. type sinkProcessor struct {
not sure about the name... We are not processing 'sink's...
messageBatcher? batcher? dispatcher? messageDispatcher?
pkg/ccl/changefeedccl/sink_processor.go
line 105 at r1 (raw file):
} var _ Sink = (*sinkProcessor)(nil) // sinkProcessor should implement Sink
no need for the comment, I think...
Pretty idiomatic.
Also, it is strange that sinkProcessing is a sink, and that it also has a sink ... which it uses for.. something.
pkg/ccl/changefeedccl/sink_processor.go
line 119 at r1 (raw file):
// // TODO(samiskin): Verify throughput when emits are slower to complete. return 5 * system.NumCPU()
5 is a strange multiple... Few things that need to be addressed here:
- I think
runtime.GOMAXPROCS(0)
is a preferred way - The number of workers can get large on a larger node -- 64vCPU node would result in 320 workers. This is not a good default in those cases.much
- I think there should be a cap -- similar to how defaultNumWorkers() works in event_processing.
- This multiple needs to be verified... How much worse is a multiple of 1? How about 1/2? 2? Please post these numbers and we can decide if whatever the gap in performances are worse CPU costs.
pkg/ccl/changefeedccl/sink_processor.go
line 187 at r1 (raw file):
worker := sp.workerIndex(key) sp.eventChans[worker] <- rowWorkerPayload{
even though rowWorkerPayload is stack allocated, it nonetheless escapes to heap because of the channels.
We need to benchmark to see the impact of those small message allocations on go runtime GC -- I suspect
it's non negligible (given that this happens for each message).
It might be beneficial to have a payload pool for *rowWorkerPayloads.
pkg/ccl/changefeedccl/sink_processor.go
line 187 at r1 (raw file):
worker := sp.workerIndex(key) sp.eventChans[worker] <- rowWorkerPayload{
What are the guarantees around this call never blocking?
I am concerned that some worker encounters errror, sets closing to 1 -- but we're not paying
attention to that, and here we block. Using channel is better imo.
Select on emitting this message, or "termCh" being closed (in which case you return saved error).
pkg/ccl/changefeedccl/sink_processor.go
line 210 at r1 (raw file):
} func (sp *sinkProcessor) EmitResolvedTimestamp(
nit: add function comment (here and throughout).. XYZ implements Sink.
pkg/ccl/changefeedccl/sink_processor.go
line 247 at r1 (raw file):
// TODO(samiskin): Investigate different numbers for this buffer size, this // number is just re-using the value from sink_cloudstorage batchCh := make(chan batchWorkerMessage, 256)
Reusing numbers is fine -- though, I think sink_cloudstorage justified the choice ...
I would like to understand if having a buffer size > 0 is a requirement (correctness wise).
Also, it is probably a good idea to have the size be metamorphic (again, to catch correctness bugs).
pkg/ccl/changefeedccl/sink_processor.go
line 268 at r1 (raw file):
// Send the encoded batch to a separate worker so that flushes do not block // further message aggregation batchCh <- batchWorkerMessage{
What are the guarantees that this will not block?
pkg/ccl/changefeedccl/sink_processor.go
line 268 at r1 (raw file):
// Send the encoded batch to a separate worker so that flushes do not block // further message aggregation batchCh <- batchWorkerMessage{
Same comment as for the rowPayloadMessage allocations above -- this one, of course, is not
that bad because of batching; but I think this matters when batch size is 1...
Anyway, bench impact on go runtime GC + add a sync.Pool as needed.
pkg/ccl/changefeedccl/sink_processor.go
line 289 at r1 (raw file):
case msg, ok := <-sp.eventChans[workerIndex]: if !ok { flushBatch() // Ensure all batched messages are handled prior to exit.
Do we need to do this? Shouldn't the sink be flushed/closed explicitly by the caller?
pkg/ccl/changefeedccl/sink_processor.go
line 297 at r1 (raw file):
Previously, samiskin (Shiranka Miskin) wrote…
I deliberately made it so that workers always drain every message out rather than exiting in the middle, since it makes it much easier to use the WaitGroup for tracking without risking it being incorrectly non-zero. Its a tradeoff between "You have to make sure you never exit without reducing inFlight appropriately" and either using locks on every message or making things a lot more racy (which is what I dealt with in my event_processor pr, if folks are ok with this draining method I'll likely move that PR to this method as well).
I like this "single flag for closing, just rely on the input channel closing to know when you're done" method a lot more, there isn't much to drain and the code feels a lot simpler to me.
Also I just realized I never added a buffer size to the input channels at all lol, so they'd drain immediately. Need to look at what perf is like when they are.
Please see my comments about WaitGroup elsewhere. I agree with Jay re channels; there are other reasons for this too as described elsewhere.
pkg/ccl/changefeedccl/sink_processor.go
line 364 at r1 (raw file):
emitTime, batch.numMessages, batch.mvcc, batch.kvBytes, sinkDoesNotCompress) batch.alloc.Release(sp.ctx)
this can be deferred above, right?
pkg/ccl/changefeedccl/sink_processor.go
line 385 at r1 (raw file):
func (sp *sinkProcessor) Flush(ctx context.Context) error { sp.inFlight.Wait()
The use of inFlight is probably not great. Every time I use it, I always come back to the
unfortunate reality that it does not support cancellation. So, a flush call
on a sink that seems to be blocked for a long time (maybe downstream server is super slow)
will block here for a very long time even if changefeed is cancelled.
I think you are attempting to do too many micro optimizations at once. It's okay to use
mutex to keep counter, it's okay to use channels. Remove/fix them if you detect
problems.
I suspect that even though using mutex is slower than say not using mutex, the reality
that even if you see 200k messages instead of 220k messages, that's still a 500x improvement
over current state.
pkg/ccl/changefeedccl/sink_processor.go
line 398 at r1 (raw file):
defer sp.mu.Unlock() sp.mu.closeErr = err atomic.StoreInt32(&sp.closing, 1)
instead of atomic, have a termCh (termination channel) that is closed when you encounter first error; select on this
channel every time you emit anything.
pkg/ccl/changefeedccl/sink_processor.go
line 407 at r1 (raw file):
} _ = sp.workerGroup.Wait() return sp.sink.Close()
do we swallow close error?
pkg/ccl/changefeedccl/sink_processor.go
line 417 at r1 (raw file):
} func newMessageBatch() messageBatch {
this method doesn't seem useful right now... you could just do &messageBatch with exactly the same result.
Of course, if you decide to use sync.Pool for those, then yes, having a helper that gets object from
the pool, and re-initializes as appropriate would be great.
Also, having pool means that you don't need to re-allocate batch.buffer -- just reset it, reusing previous
array allocations.
pkg/ccl/changefeedccl/sink_processor.go
line 434 at r1 (raw file):
} func (mb *messageBatch) moveIntoBuffer(p rowWorkerPayload, keyInValue bool) {
nit: append might be a better name.
pkg/ccl/changefeedccl/sink_processor.go
line 438 at r1 (raw file):
mb.bufferBytes += len(p.msg.val) // Don't double-count the key bytes if the key is included in the value
I don't know if it's correct. KeyInValue simply means that you want to include key inside value.
Why? It actually doesn't matter -- maybe you just like seeing json array of key datums.
But you still have the key itself -- and whether or not the key length included, that depends on the sink
(e.g. kafka vs cloudstorage)
You can also view bufferBytes as something that refers to how many bytes you have in memory...
Basically, I think this is not double counting -- but is a correct behavior to increment by key length.
pkg/ccl/changefeedccl/sink_webhook.go
line 76 at r1 (raw file):
} sp, err := makeSinkProcessor(context.Background(), thinSink, cfg, mb)
what's the purpose of passing ctx to sink processor if we are passing background ctx?
pkg/ccl/changefeedccl/sink_webhook.go
line 143 at r1 (raw file):
Transport: &http.Transport{ DialContext: (&net.Dialer{Timeout: timeout}).DialContext, IdleConnTimeout: 90 * time.Second, // taken from DefaultTransport
Instead of copying, perhaps do something similar to
https://github.com/cockroachdb/cockroach/blob/master/pkg/util/log/http_sink.go#L32
Take http.DefaultTransport, clone it and change it appropriately.
pkg/ccl/changefeedccl/sink_webhook.go
line 147 at r1 (raw file):
// Raising this value to 200 makes little difference while reducing it // to 50 results in an ~8% reduction in throughput. MaxIdleConnsPerHost: 100,
well, this one is a bit scary. I don't know if 100 or 200 is small/big. The difference
in performance maybe because of where you were writing -- I suspect you were emitting to a single http end point, and not to a load balanced sink, so you had to bump up MaxIdleConnsPerHost because .. you have just 1 host.
I wonder if this limit should be a function of how many producers you have. If all producers emit to the same http.Transport, then you'd expect 1 outstanding request per producer.
pkg/ccl/changefeedccl/sink_webhook.go
line 323 at r1 (raw file):
if cfg.Flush.Messages < 0 || cfg.Flush.Bytes < 0 || cfg.Flush.Frequency < 0 || cfg.Retry.Max < 0 || cfg.Retry.Backoff < 0 { return batchCfg, retryCfg, errors.Errorf("invalid option value %s, all config values must be non-negative", changefeedbase.OptWebhookSinkConfig)
should we use pgerror for this with pgcode.InvalidParameterValue
pkg/ccl/changefeedccl/sink_webhook.go
line 435 at r1 (raw file):
} // max backoff should be initial * 2 ^ maxRetries opts.MaxBackoff = opts.InitialBackoff * time.Duration(int(math.Pow(2.0, float64(opts.MaxRetries))))
do we need to compute it? doesn't retry options compute it anyway? Or... do we need to be
exact? Can't we just set it to 10seconds or whatever and be done w/ it?
pkg/ccl/changefeedccl/changefeedbase/settings.go
line 290 at r1 (raw file):
"the number of workers to use when emitting events to the sink: "+ "0 assigns a reasonable default, >0 assigns the setting value", 0,
make default metamorphic?
97555: server: use workersCtx in cmux error handler r=knz a=stevendanna Since the error handler outlives the startup, it should be using the workersCtx. I suspect this is the cause of #97032 but I haven't been able to reproduce that locally in a unit test. Fixes #93032 Epic: none Release note: none Co-authored-by: Steven Danna <danna@cockroachlabs.com>
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-11356
Resolves #84676
This PR addresses two existing issues:
The reason for 1, why the sink is so much slower than other sinks, is for two main reasons:
worker goroutines, taking requests and batching them together is done
synchronously in a single batching worker. Once that batch is full
the messages within it are then sharded out to the workers, each
forming their own arbitrarily smaller batches, which the batching
worker then blocks on until all requests have completed. This means
in the default batch-size-1 case the sink is essentially single
threaded, forming a batch and blocking on it each time.
the default transport MaxIdleConnsPerHost setting which is 2. This
means that at most 2 underlying TCP connections can be reused between
requests, and the rest have to start entirely new ones, using up
ports.
Fixing a) without b) results in a non-batched throughput of 1200-1800 with huge variation, producing tons of "cannot assign requested address" errors that propagate to job retries.
Fixing b) alone by simply setting MaxIdleConnsPerHost to 100, the default MaxIdleConns value, immediately increases the throughput of a non-batched webhook sink from 800 to 48k.
Fixing the two together as is done in this PR increases the non-batched throughput to 229k messages per second. Without batching this unfortunately does use a large amount of CPU, 70% in the test, therefore batching is still recommended (with a batch size of 500 throughput increased to 333k while cpu usage was at 25%).
For context on the throughput numbers, on the same data and cluster a default Kafka sink puts out 214k messages per second at 21% cpu while a default cloudstorage sink does 468k at 27% cpu. The old webhook with a batch size of 500 puts out 20.6k messages per second.
In the image above from left to right we see the Emitted Messages for default cloudstorage, webhook v2 (batch size of 500), webhook v2 (default batch size of 1), default kafka (using a larger batch size for kafka didn't make much difference), webhook v1 (batch size of 500), and then default webhook v1 which is too small to see.
Note that these numbers do not factor in the performance during times of many errors in sending to the sink. Much of the kafka AsyncProducer complexity is due to how it handles retries.
The reason for 2 is that the parallelism/batching/retry/metrics logic for the webhook and pubsub sinks are intertwined with their respective sink-specific logic.
This PR splits out the sink-specific logic into a
ThinSink
interface who's only responsibility is to encode batches of messages into payloads to be sent and emit those payloads, and theparallelism/batching/retry/metrics logic into a
sinkProcessor
.This should allow for other simple sinks that fit a "just emit a batch of messages somewhere" pattern to be added more easily with no duplicated work.
Note that this PR only implements the sinkProcessor to the extent of the needs of the Webhook sink. A followup PR will be made to move the Pubsub sink into this framework and will bring along further changes to sinkProcessor to support multiple topics.
For now testing this PR is still relying on the existing webhook sink tests, however once sinkProcessor is extended to support pubsub as well the tests will be refactored as well into sinkProcessor-specific tests and then thin-sink-specific tests.
Another potential improvement to add here is to add a throttling setting as with the new throughputs it may be overwhelming to downstream endpoints.
Release note (performance improvement): The Changefeed webhook sink has been rewritten to support substantially larger throughputs, to a similar magnitude as the kafka sink.