Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: webhook sink rewrite #99086

Merged
merged 2 commits into from
Mar 31, 2023
Merged

Conversation

samiskin
Copy link
Contributor

@samiskin samiskin commented Mar 21, 2023

Resolves #84676
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-11356

This PR implements the Webhook sink as part of a more general batchingSink framework that can be used to make adding new sinks an easier process, making it far more performant than it was previously.

A followup PR will be made to use the batchingSink for the pubsub client which also suffers performance issues.


Sink-specific code is encapsulated in a SinkClient interface

type SinkClient interface {
        MakeResolvedPayload(body []byte, topic string) (SinkPayload, error)
        MakeBatchWriter() BatchWriter
        Flush(context.Context, SinkPayload) error
        Close() error
}

type BatchWriter interface {
        AppendKV(key []byte, value []byte, topic string)
        ShouldFlush() bool
        Close() (SinkPayload, error)
}

type SinkPayload interface{}

Once the Batch is ready to be Flushed, the writer can be Close()'d to do any final formatting (ex: wrap in a json object with extra metadata) of the buffer-able data and obtain a final SinkPayload that is ready to be passed to SinkClient.Flush.

The SinkClient has a separate MakeResolvedPayload since the sink may require resolved events be formatted differently to a batch of kvs.

Flush(ctx, payload) encapsulates sending a blocking IO request to the sink endpoint, and may be called multiple times with the same payload due to retries. Any kind of formatting work should be served to run in the buffer's Close and stored as a SinkPayload to avoid multiple calls to Flush repeating work upon retries.


The batchingSink handles all the logic to take a SinkClient and form a full Sink implementation.

type batchingSink struct {
        client             SinkClient
        ioWorkers          int
        minFlushFrequency  time.Duration
        retryOpts          retry.Options
        eventPool          sync.Pool
        batchPool          sync.Pool
        eventCh            chan interface{}
        pacer              *admission.Pacer
        ...
}

var _ Sink = (*batchingSink)(nil)

It involves a single goroutine which handles:

  • Creating, building up, and finalizing BatchWriters to eventually form a SinkPayload to emit
  • Flushing batches when they have persisted longer than a configured minFlushFrequency
  • Flushing deliberately and being able to block until the Flush has completed
  • Logging all the various sink metrics

EmitRow calls are thread-safe therefore the use of the safeSink wrapper is not required for users of this sink.

Events sent through the goroutines would normally need to exist on the heap, but to avoid excessive garbage collection of hundreds of thousands of tiny structs, both the kvEvents{<data from EmitRow>} events (sent from the EmitRow caller to the batching worker) and the sinkBatchBuffer{<data about the batch>} events (sent from the batching worker to the IO routine in the next section) are allocated on object pools.


For a sink like Cloudstorage where there are large batches, doing the above and just one-by-one flushing the batch payloads on a separate routine is plenty good enough. Unfortunately the Webhook sink can be used with no batching at all with users wanting the lowest latency while still having good throughput. This means we need to be able to have multiple requests in flight. The difficulty here is if a batch with keys [a1,b1] is in flight, a batch with keys [b2,c1] needs to block until [a1,b1] completes as b2 cannot be sent and risk arriving at the destination prior to b1.

Flushing out Payloads in a way that is both able to maintain key-ordering guarantees but is able to run in parallel is done by a separate parallel_io struct.

type parallelIO struct {
	retryOpts retry.Options
	ioHandler IOHandler
	requestCh chan IORequest
	resultCh  chan IORequest
  ...
}

type IOHandler func(context.Context, IORequest) error

type IORequest interface {
	Keys() intsets.Fast
	SetError(error)
}

It involves one goroutine to manage the key ordering guarantees and a configurable number of IO Worker goroutines that simply call ioHandler on an IORequest.

IORequests represent the keys they shouldn't conflict on by providing a intsets.Fast struct, which allows for efficient Union/Intersects/Difference operations on them that parallelIO needs to maintain ordering guarantees.

Requests are received as IORequests and the response is also returned as an IORequest. This way the parallelIO struct does not have to do any heap allocations to communicate, the user of it can manage creating and freeing these objects in pools. The only heap allocations that occur are part of the intset operations as it uses a linkedlist internally.


The webhook sink is therefore formed by:

  1. EmitRow is called, creating kvEvents that are sent to a Batching worker
  2. The batching worker takes events and appends them to a batch
  3. Once the batch is full, its encoded into an HTTP request
  4. The request object is then sharded across a set of IO workers to be fully sent out in parallel with other non-key-conflicting requests.

With this setup, looking at the CPU flamegraph, at high throughputs most of the batchingSink/parallelIO work didn't really show up much, the work was largely just step 3, where taking a list of messages and calling json.Marshal on it took almost 10% of the time, specifically a call to json.Compress.

Since this isn't needed, and all we're doing is simply putting a list of already-formatted JSON messages into a surrounding JSON array and small object, I also swapped json.Marshal to just stitch together characters manually into a buffer.


In the following flamegraph of a node at around 35% CPU usage, only 5.56% of the total cputime in the graph (three small chunks between the parallelEventConsumer chunk and the kvevent chunk) is taken up by the paralelIO and batchingSink workers. This is with batch size of 100.

Screenshot 2023-03-20 at 10 48 33 PM

The max CPU usage here was around 37% with a max throughput of 135k for a single node (the other nodes had run out of data at this point). Since the majority of the flamegraph shows time spent in the event processing code I'm going to assume this will be handled by the pacer and won't be much of an issue.

In the above flamegraph runtime.gcDrain does show up using 10.55% cpu but when trying the cloudstorage sink it had around the same value. I'm guessing this means there isn't an extra gc thrashing issue. I believe the only non-pool allocations that occur are the intsets.

The following graph demonstrates a webhook first with batching of 100 messages, followed by no batching, on TPCC with 500 warehouses, on a 3 node 16 cpu roachtest cluster. At peak the batched throughput is at 350k messages per second, and at peak the unbatched throughput is 61k.

Screenshot 2023-05-12 at 2 56 23 PM

This is a similar graph for the old webhook, 18k and 3.75k for batches of 100 and unbatched respectively.
Screenshot 2023-05-12 at 3 40 28 PM


Since Matt's talked about a new significance being placed on Feature flagging new work to avoid need for technical advisories, I placed this new implementation under the changefeed.new_webhook_sink_enabled setting and defaulted it to be disabled.

Right now its sink_webhook_v2 just to keep sink_webhook.go unchanged so that this review is easier to do. I may move sink_webhook to deprecated_sink_webhook and sink_webhook_v2 to just be sink_webhook prior to merging.


Release note (performance improvement): the webhook sink is now able to handle a drastically higher maximum throughput by enabling the "changefeed.new_webhook_sink_enabled" cluster setting.

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@samiskin samiskin force-pushed the sink-refactor-v2 branch 3 times, most recently from 15ddb52 to 7e7c5ff Compare March 21, 2023 03:42
Copy link
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 12 files at r1.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @samiskin)


pkg/ccl/changefeedccl/changefeedbase/settings.go line 284 at r1 (raw file):

	false,
)

nit: not sure if this bi-furcation of the code is needed; but okay.
Please do move this low level setting right next to where we create the sink and keep it un-exported.
I don't think temporary setting raises up to the changefeed wide level.


pkg/ccl/changefeedccl/changefeedbase/settings.go line 292 at r1 (raw file):

	util.ConstantWithMetamorphicTestBool("changefeed.new_webhook_sink_enabled", false),
)

comments on exported settings.

Copy link
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable is having hard time w/ so many commits; can you squash them?

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @samiskin)

@samiskin samiskin force-pushed the sink-refactor-v2 branch 2 times, most recently from 11004aa to f3c1faf Compare March 21, 2023 16:21
Copy link
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 6 of 12 files at r1.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @samiskin)


pkg/ccl/changefeedccl/batching_sink.go line 39 at r1 (raw file):

// to the sink.
type BatchWriter interface {
	AppendKV(key []byte, value []byte, topic string)

nit: perhaps just append? Cause it's not KV we are appending?
Also, short comment on this would be nice. i kinda like previous name (buffer) -- where append has good
meaning. With batchWriter things are less clear.


pkg/ccl/changefeedccl/batching_sink.go line 41 at r1 (raw file):

	AppendKV(key []byte, value []byte, topic string)
	ShouldFlush() bool
	Close() (SinkPayload, error)

This is a strange close signature -- but i'll keep reading.


pkg/ccl/changefeedccl/batching_sink.go line 69 at r1 (raw file):

	// claimed and freed from object pools.
	eventPool sync.Pool
	batchPool sync.Pool

you want pools to be global.
make it so, plus add helpers to get objects of appropriate type.


pkg/ccl/changefeedccl/batching_sink.go line 90 at r1 (raw file):

}

type kvEvent struct {

nit: i know it's "key value" ... but I just have hard time not reading "kvEvent" as something KV layer specific.
can we have a better name? encodedEvent? simply event? emitEvent? anything other than kv, really.


pkg/ccl/changefeedccl/batching_sink.go line 191 at r1 (raw file):

// sinkBatchBuffer stores an in-progress/complete batch of messages, along with
// metadata related to the batch.

nit: (feel free to ignore): we are already inside sink specific code. I think batchBuffer might be sufficient as a name.


pkg/ccl/changefeedccl/batching_sink.go line 194 at r1 (raw file):

type sinkBatchBuffer struct {
	writer  BatchWriter
	payload SinkPayload // payload is nil until FinalizePayload has been called

curious why we need to store it then, instead of returning it from FinalizePayload? Not sure, but, I'll keep reading.


pkg/ccl/changefeedccl/batching_sink.go line 198 at r1 (raw file):

	numMessages int
	numKVBytes  int // the total amount of uncompressed kv data in the batch
	keys        intsets.Fast

keys deserves a comment.


pkg/ccl/changefeedccl/batching_sink.go line 239 at r1 (raw file):

}

// Append adds the contents of a kvEvent to the batch, merging its alloc pool

nit: missing period.


pkg/ccl/changefeedccl/batching_sink.go line 264 at r1 (raw file):

}

func (bs *batchingSink) newBatchBuffer() *sinkBatchBuffer {

nit: (or a joke) have hard time reading code that starts with "bs" -- if you know what I mean.


pkg/ccl/changefeedccl/batching_sink.go line 294 at r1 (raw file):

		defer func() {
			batchBuffer = bs.newBatchBuffer()
		}()

perhaps we should be a bit more explicit (and 1 line shorter):

toFlush := batchBuffer
batchBuffer = bs.newBatchBuffer()

pkg/ccl/changefeedccl/batching_sink.go line 305 at r1 (raw file):

		for {
			select {
			case <-ctx.Done():

I find it hard to reason about functions that seem to swallow error (ctx.Err() in this case).
Yes, I understand that this checking happens somewhere else, but -- that's what I mean by "hard to reason" -- I have
to look elsewhere to convince myself that this is correct. Wouldn't it be better to just have this function return an error?


pkg/ccl/changefeedccl/parallel_io.go line 19 at r1 (raw file):

)

// parallelIO allows submitting requests to do blocking "IOHandler" calls on

nit: perhaps "parallelIO allows to perform blocking "IOHandler" calls in parallel?


pkg/ccl/changefeedccl/parallel_io.go line 26 at r1 (raw file):

// until [a,b] completes, then [c,d] will block until [b,c] completes. If [c,d]
// errored, [b,c] would never be sent, and SetError would be called on [c,d]
// prior to it being returned on resultCh.

what about SetError on batches that were already in flight?


pkg/ccl/changefeedccl/parallel_io.go line 77 at r1 (raw file):

// Close stops all workers immediately and returns once they shut down. Inflight
// requests sent to requestCh may never result in being sent to resultCh.
func (pe *parallelIO) Close() {

what does e stand for in pe?


pkg/ccl/changefeedccl/parallel_io.go line 100 at r1 (raw file):

	for i := 0; i < numEmitWorkers; i++ {
		pe.wg.GoCtx(func(ctx context.Context) error {

I wonder... one of the reasons why e.g. event processing used fixed size parallelism is because it had to do it to ensure correct ordering...
Do we need to do it here? We keep track of inflight set of keys. Is that sufficient to ensure ordering? Do we still need to have
fixed size worker pool? Could we use something else (perhaps variable pool?)


pkg/ccl/changefeedccl/parallel_io.go line 104 at r1 (raw file):

				err := emitWithRetries(ctx, req)
				if err != nil {
					req.SetError(err)

It's not clear to me (yet) why we need SetError at all? We have tried to do IO, we have tried to do
it with retries. What else can we do for this request? Wouldn't it be correct to just bail out here?
Return this error, have the whole thing torn down?


pkg/ccl/changefeedccl/parallel_io.go line 120 at r1 (raw file):

					case emitSuccessCh <- req:
					}
				}

Would be nice to avoid almost duplication:

var resultCh <- chan IORequest
if err == nil {
   resultCh = emitSuccessCh
} else {
   req.SetError(err)
   resultCh = pe.resultCh
}
select {
  ...
  case resultCh <- req
}

pkg/ccl/changefeedccl/parallel_io.go line 126 at r1 (raw file):

	}

	var handleSuccess func(IORequest)

no need to declare this here?


pkg/ccl/changefeedccl/parallel_io.go line 129 at r1 (raw file):

	var pendingResults []IORequest

	sendToWorker := func(ctx context.Context, req IORequest) {

nit: would something like submitIO be a better name? maybe startIO?


pkg/ccl/changefeedccl/parallel_io.go line 159 at r1 (raw file):

		var stillPending = pending[:0] // Reuse underlying space
		for _, pendingReq := range pending {
			// If no intersection, nothing changed for this request's validity

nit: . at the end of a comment? (below as well?)


pkg/ccl/changefeedccl/parallel_io.go line 160 at r1 (raw file):

		for _, pendingReq := range pending {
			// If no intersection, nothing changed for this request's validity
			if !req.Keys().Intersects(pendingReq.Keys()) {

Do we need this check? We've cleared req.Keys from inflight; wouldn't the inflight check below be sufficient?


pkg/ccl/changefeedccl/parallel_io.go line 166 at r1 (raw file):

			// If it is now free to send, send it
			if !inflight.Intersects(pendingReq.Keys()) {

nit: perhaps swap conditions to make it positive?


pkg/ccl/changefeedccl/parallel_io.go line 193 at r1 (raw file):

		pendingResults = nil
		for _, res := range unhandled {
			handleSuccess(res)

same comment as sendWorker below: I think this function should return error.


pkg/ccl/changefeedccl/parallel_io.go line 198 at r1 (raw file):

		select {
		case req := <-pe.requestCh:
			if !inflight.Intersects(req.Keys()) {

nit: do you think flipping if conditions (if inflightIntersects {} else {}) would improve readability?
I personally find it easier to read if I don't have to negate things.


pkg/ccl/changefeedccl/parallel_io.go line 200 at r1 (raw file):

			if !inflight.Intersects(req.Keys()) {
				inflight.UnionWith(req.Keys())
				sendToWorker(ctx, req)

looking at sendToWorker function -- would it be better to have it return error when ctx.Done()?
I know it's a bit more typing, but relying on this loop to terminate might not be idea.
We will be processing previous unhandled results, right? And those submit more stuff to other channels?
Wouldn't it be better to just return an error right away?


pkg/ccl/changefeedccl/changefeedbase/settings.go line 290 at r1 (raw file):

	"if enabled, this setting enables a new implementation of the webhook sink"+
		" that allows for a much higher throughput",
	util.ConstantWithMetamorphicTestBool("changefeed.new_webhook_sink_enabled", false),

should this be an env variable instead? Every setting we add, will have to be retired as well...


pkg/ccl/changefeedccl/changefeedbase/settings.go line 295 at r1 (raw file):

var SinkParallelism = settings.RegisterIntSetting(
	settings.TenantWritable,
	"changefeed.sink_parallelism",

would changefeed.sink.io_workers be a better name?


pkg/cmd/roachtest/tests/cdc.go line 1228 at r1 (raw file):

			ct.runTPCCWorkload(tpccArgs{warehouses: 100, duration: "30m"})

			if _, err := ct.DB().Exec("SET CLUSTER SETTING changefeed.new_webhook_sink_enabled = true;"); err != nil {

Why not keep ti metamorphic?


pkg/ccl/changefeedccl/helpers_test.go line 990 at r1 (raw file):

	sinkType := randomSinkTypeWithOptions(options)
	if sinkType == "" {
		return

that means what? sinklesss?


pkg/ccl/changefeedccl/sink_webhook_test.go line 601 at r1 (raw file):

		appendCount := 0
		batchingSink.knobs.OnAppend = func(event *kvEvent) {
			appendCount += 1

is this thread safe?

Copy link
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sending first set of comments... There is a lot of code here, so I'll definitely need to spend more time on this.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @samiskin)

@samiskin samiskin force-pushed the sink-refactor-v2 branch 5 times, most recently from 288c6d0 to 82fbf7b Compare March 22, 2023 18:12
Copy link
Contributor Author

@samiskin samiskin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy)


pkg/ccl/changefeedccl/batching_sink.go line 69 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

you want pools to be global.
make it so, plus add helpers to get objects of appropriate type.

Done.


pkg/ccl/changefeedccl/batching_sink.go line 194 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

curious why we need to store it then, instead of returning it from FinalizePayload? Not sure, but, I'll keep reading.

Done.


pkg/ccl/changefeedccl/batching_sink.go line 198 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

keys deserves a comment.

Done.


pkg/ccl/changefeedccl/batching_sink.go line 294 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

perhaps we should be a bit more explicit (and 1 line shorter):

toFlush := batchBuffer
batchBuffer = bs.newBatchBuffer()

Done.


pkg/ccl/changefeedccl/batching_sink.go line 305 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I find it hard to reason about functions that seem to swallow error (ctx.Err() in this case).
Yes, I understand that this checking happens somewhere else, but -- that's what I mean by "hard to reason" -- I have
to look elsewhere to convince myself that this is correct. Wouldn't it be better to just have this function return an error?

Done.


pkg/ccl/changefeedccl/parallel_io.go line 26 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

what about SetError on batches that were already in flight?

Moved back to just ioResult


pkg/ccl/changefeedccl/parallel_io.go line 77 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

what does e stand for in pe?

leftover from an old name, moved to p


pkg/ccl/changefeedccl/parallel_io.go line 100 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I wonder... one of the reasons why e.g. event processing used fixed size parallelism is because it had to do it to ensure correct ordering...
Do we need to do it here? We keep track of inflight set of keys. Is that sufficient to ensure ordering? Do we still need to have
fixed size worker pool? Could we use something else (perhaps variable pool?)

We could definitely have a variable pool here but for now I'm going to leave it as a fixed size.


pkg/ccl/changefeedccl/parallel_io.go line 104 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

It's not clear to me (yet) why we need SetError at all? We have tried to do IO, we have tried to do
it with retries. What else can we do for this request? Wouldn't it be correct to just bail out here?
Return this error, have the whole thing torn down?

Moved to tearing it all down. Was concerned about new emits blocking on a torn down io but just realized that can't happen since emitters are expected to handleresult at the same time.


pkg/ccl/changefeedccl/parallel_io.go line 120 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

Would be nice to avoid almost duplication:

var resultCh <- chan IORequest
if err == nil {
   resultCh = emitSuccessCh
} else {
   req.SetError(err)
   resultCh = pe.resultCh
}
select {
  ...
  case resultCh <- req
}

Became unnecessary after moving back to ioResult


pkg/ccl/changefeedccl/parallel_io.go line 126 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

no need to declare this here?

whoops, used to be required but no longer


pkg/ccl/changefeedccl/parallel_io.go line 129 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

nit: would something like submitIO be a better name? maybe startIO?

moved to submitIO


pkg/ccl/changefeedccl/parallel_io.go line 193 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

same comment as sendWorker below: I think this function should return error.

Done.


pkg/ccl/changefeedccl/parallel_io.go line 200 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

looking at sendToWorker function -- would it be better to have it return error when ctx.Done()?
I know it's a bit more typing, but relying on this loop to terminate might not be idea.
We will be processing previous unhandled results, right? And those submit more stuff to other channels?
Wouldn't it be better to just return an error right away?

Done.


pkg/ccl/changefeedccl/changefeedbase/settings.go line 290 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

should this be an env variable instead? Every setting we add, will have to be retired as well...

I think its good to at least have the setting be a little discoverable for customers that do want a faster webhook sink and have it be easy for them to try out.


pkg/ccl/changefeedccl/changefeedbase/settings.go line 292 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

comments on exported settings.

Done.


pkg/ccl/changefeedccl/changefeedbase/settings.go line 295 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

would changefeed.sink.io_workers be a better name?

Done.


pkg/cmd/roachtest/tests/cdc.go line 1228 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

Why not keep ti metamorphic?

The old sink ends up being too slow to run in time for 100 warehouses


pkg/ccl/changefeedccl/helpers_test.go line 990 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

that means what? sinklesss?

It just means "skip this test because there are no sinks that match what's requested". Doesn't occur at all normally but when I want to test every test on a single sink and I for exapmle set webhook weight to 1 and the rest to 0, for tests that explicitly disable webhook all sinks would have a weight of 0 so any sink could be selected.


pkg/ccl/changefeedccl/sink_webhook_test.go line 601 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

is this thread safe?

Done.

@samiskin samiskin marked this pull request as ready for review March 22, 2023 18:12
@samiskin samiskin requested review from a team as code owners March 22, 2023 18:12
@samiskin samiskin requested review from smg260, renatolabs, jayshrivastava, miretskiy and a team and removed request for a team March 22, 2023 18:12
@samiskin samiskin force-pushed the sink-refactor-v2 branch 2 times, most recently from f1610df to 80e8a47 Compare March 23, 2023 17:02
Copy link
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 4 files at r2, 9 of 14 files at r4, 1 of 3 files at r5, 1 of 5 files at r6.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @jayshrivastava, @renatolabs, @samiskin, and @smg260)


-- commits line 35 at r5:
Thanks for renaming this to buffer in the code. I think your commit example needs to be updated to reflect that.


pkg/ccl/changefeedccl/batching_sink.go line 37 at r5 (raw file):

// BatchBuffer is an interface to aggregate KVs into a payload that can be sent
// to the sink.

nit: let's expand the comment just a bit -- just like you have in your PR description.
In particular, I'm looking for explanation why we need close() method. Perhaps few examples on how close method might be usable.


pkg/ccl/changefeedccl/batching_sink.go line 97 at r5 (raw file):

	flushWaiter := make(chan struct{})
	select {
	case <-ctx.Done():

why not return ctx.Err()?
Again, I see that we are doing it below; but that's a) more work for the reader, and b) you're checking things twice.
It's a lot more idiomatic to just see return Err.


pkg/ccl/changefeedccl/batching_sink.go line 117 at r5 (raw file):

// therefore escape to the heap) can both be incredibly frequent (every event
// may be its own batch) and temporary, so to avoid GC thrashing they are both
// claimed and freed from object pools.

❤️ this comment.


pkg/ccl/changefeedccl/batching_sink.go line 199 at r5 (raw file):

	close(s.doneCh)
	_ = s.wg.Wait()
	if s.pacer != nil {

safe to call close on nil pacer.


pkg/ccl/changefeedccl/batching_sink.go line 322 at r5 (raw file):

		for {
			select {
			case <-ctx.Done():

please return ctx.Err here.


pkg/ccl/changefeedccl/batching_sink.go line 365 at r5 (raw file):

	for {
		if s.pacer != nil {

Pretty sure it's safe to call Pace on nil pacer.


pkg/ccl/changefeedccl/batching_sink.go line 382 at r5 (raw file):

					tryFlushBatch()
				}
			} else if event, isKV := req.(*rowEvent); isKV {

do you think this if/else is better with:

switch r := req.(type); r {
   case flushReq:
   case *rowEvent:
   default:
      return error
}

pkg/ccl/changefeedccl/parallel_io.go line 106 at r5 (raw file):

}

func (p *parallelIO) runWorkers(ctx context.Context, numEmitWorkers int) error {

super tiny nit: I don't think this function should be named run workers.
It does a lot more than that -- and running numEmitWorkers is only 1 part of what it does.
Perhaps processIO or handleIO might be better name.

Regardless of the chosen name, I think this function needs a comment. I'm looking at
a high level outline -- perhaps some ascii art type diagram as to what this function does.
emit chan, N workers, read from request channel, handling of the key collisions.
Etc.


pkg/ccl/changefeedccl/parallel_io.go line 140 at r5 (raw file):

	var pendingResults []*ioResult

I think the name is descriptive; but a small comment would be nice.


pkg/ccl/changefeedccl/parallel_io.go line 163 at r5 (raw file):

	// in a Queue to be sent to IO workers once the conflicting requests complete.
	var inflight intsets.Fast
	var pending []IORequest

Let's make things a bit more observable. I think we should add a timestamp when IORequest was added to this queue. When we finally pop the queue, we should record metric, or at the very least log something periodically if some batch was queued for too long (;..... or both)

We should also have a metric Gauge on the number of inflight keys.


pkg/ccl/changefeedccl/parallel_io.go line 199 at r5 (raw file):

	// A set of keys can be sent immediately if no yet-to-be-handled request
	// observed so far shares any of those keys.

sounds like... this function is exactly the opposite of its name?
true means you cannot send ?


pkg/ccl/changefeedccl/sink.go line 726 at r5 (raw file):

	Max     jsonMaxRetries `json:",omitempty"`
	Backoff jsonDuration   `json:",omitempty"`
}

No good deed goes unpunished.... Those structs and in particular their parsing are completely under-tested (if at all). I would appreciate a small unit test parsing few valid/invalid configs.
(jsonMaxRetries as well)


pkg/ccl/changefeedccl/sink.go line 811 at r5 (raw file):

	}
	if idealNumber > 32 {
		return 32

have you tried with 16? is the performance worse?


pkg/ccl/changefeedccl/sink.go line 816 at r5 (raw file):

}

func sinkPacer(ctx context.Context, cfg *execinfra.ServerConfig) *admission.Pacer {

nit: better name? newPacer, newCPUPacer?


pkg/ccl/changefeedccl/sink.go line 818 at r5 (raw file):

func sinkPacer(ctx context.Context, cfg *execinfra.ServerConfig) *admission.Pacer {
	pacerRequestUnit := changefeedbase.SinkPacerRequestSize.Get(&cfg.Settings.SV)
	enablePacer := changefeedbase.PerEventElasticCPUControlEnabled.Get(&cfg.Settings.SV)

do we want to react to settings changes?
I think we do, and I think it might be pretty easy to do so.
Instead of passing pacer to webhook, pass admission pacer factory.
You already have a flushChannel in the batch worker; reset pacer whenever settings change.


pkg/cmd/roachtest/tests/cdc.go line 1228 at r1 (raw file):

Previously, samiskin (Shiranka Miskin) wrote…

The old sink ends up being too slow to run in time for 100 warehouses

Ack.


pkg/cmd/roachtest/tests/cdc.go line 192 at r6 (raw file):

		params := sinkDestHost.Query()
		params.Set("insecure_tls_skip_verify", "true")

Why are we no longer using certs?

Resolves cockroachdb#84676
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-11356

This PR implements the Webhook sink as part of a more general
`batchingSink` framework that can be used to make adding new sinks an
easier process, making it far more performant than it was previously.

A followup PR will be made to use the `batchingSink` for the pubsub
client which also suffers performance issues.

---

Sink-specific code is encapsulated in a SinkClient interface

```go
type SinkClient interface {
        MakeResolvedPayload(body []byte, topic string) (SinkPayload, error)
        MakeBatchBuffer() BatchBuffer
        Flush(context.Context, SinkPayload) error
        Close() error
}

type BatchBuffer interface {
        Append(key []byte, value []byte, topic string)
        ShouldFlush() bool
        Close() (SinkPayload, error)
}

type SinkPayload interface{}
```

Once the Batch is ready to be Flushed, the buffer can be `Close()`'d to
do any final formatting (ex: wrap in a json object with extra metadata)
of the buffer-able data and obtain a final `SinkPayload` that is ready
to be passed to `SinkClient.Flush`.

The `SinkClient` has a separate `MakeResolvedPayload` since the sink may
require resolved events be formatted differently to a batch of kvs.

`Flush(ctx, payload)` encapsulates sending a blocking IO request to the
sink endpoint, and may be called multiple times with the same payload
due to retries.  Any kind of formatting work should be served to run in
the buffer's `Close` and stored as a `SinkPayload` to avoid multiple
calls to `Flush` repeating work upon retries.

---

The `batchingSink` handles all the logic to take a SinkClient and form a
full Sink implementation.

```go
type batchingSink struct {
        client             SinkClient
        ioWorkers          int
        minFlushFrequency  time.Duration
        retryOpts          retry.Options
        eventCh            chan interface{}
        pacer              *admission.Pacer
        ...
}

var _ Sink = (*batchingSink)(nil)
```

It involves a single goroutine which handles:
- Creating, building up, and finalizing `BatchBuffer`s to eventually
form a `SinkPayload` to emit
- Flushing batches when they have persisted longer than a configured
`minFlushFrequency`
- Flushing deliberately and being able to block until the Flush has completed
- Logging all the various sink metrics

`EmitRow` calls are thread-safe therefore the use of the `safeSink` wrapper is
not required for users of this sink.

Events sent through the goroutines would normally need to exist on the
heap, but to avoid excessive garbage collection of hundreds of thousands
of tiny structs, both the `kvEvents{<data from EmitRow>}` events (sent from the
EmitRow caller to the batching wokrer) and the `sinkBatchBuffer{<data
about the batch>}` events (sent from the batching worker to the IO
routine in the next section) are allocated on object pools.

---

For a sink like Cloudstorage where there are large batches, doing the
above and just one-by-one flushing the batch payloads on a separate
routine is plenty good enough.  Unfortunately the Webhook sink can be
used with no batching at all with users wanting the lowest latency while
still having good throughput.  This means we need to be able to have
multiple requests in flight.  The difficulty here is if a batch with
keys [a1,b1] is in flight, a batch with keys [b2,c1] needs to block
until [a1,b1] completes as b2 cannot be sent and risk arriving at the
destination prior to b1.

Flushing out Payloads in a way that is both able to maintain
key-ordering guarantees but is able to run in parallel is done by a
separate `parallel_io` struct.

```go
type parallelIO struct {
	retryOpts retry.Options
	ioHandler IOHandler
	requestCh chan IORequest
	resultCh  chan *ioResult
  ...
}

type IOHandler func(context.Context, IORequest) error

type IORequest interface {
	Keys() intsets.Fast
}

type ioResult struct {
	request IORequest
	err     error
}
```

It involves one goroutine to manage the key ordering guarantees and a
configurable number of IO Worker goroutines that simply call `ioHandler`
on an `IORequest`.

IORequests represent the keys they shouldn't conflict on by providing a
`intsets.Fast` struct, which allows for efficient
Union/Intersects/Difference operations on them that `parallelIO` needs
to maintain ordering guarantees.

The request and its error (if one occured despite the retries) are returned on
resultCh.

---

The webhook sink is therefore formed by:
1. EmitRow is called, creating kvEvents that are sent to a Batching worker
2. The batching worker takes events and appends them to a batch
3. Once the batch is full, its encoded into an HTTP request
4. The request object is then sharded across a set of IO workers to be
   fully sent out in parallel with other non-key-conflicting requests.

With this setup, looking at the CPU flamegraph, at high throughputs most
of the `batchingSink`/`parallelIO` work didn't really show up much, the
work was largely just step 3, where taking a list of messages and
calling `json.Marshal` on it took almost 10% of the time, specifically a
call to `json.Compress`.

Since this isn't needed, and all we're doing is simply putting a list of
already-formatted JSON messages into a surrounding JSON array and small
object, I also swapped `json.Marshal` to just stitch together characters
manually into a buffer.

---

Since Matt's talked about a new significance being placed on Feature
flagging new work to avoid need for technical advisories, I placed this
new implementation under the changefeed.new_webhook_sink_enabled setting
and defaulted it to be disabled.

---

Release note (performance improvement): the webhook sink is now able to
handle a drastically higher maximum throughput by enabling the
"changefeed.new_webhook_sink_enabled" cluster setting.
@samiskin
Copy link
Contributor Author

bors r+

@craig
Copy link
Contributor

craig bot commented Mar 31, 2023

Build succeeded:

@craig craig bot merged commit 7b0fffc into cockroachdb:master Mar 31, 2023
@blathers-crl
Copy link

blathers-crl bot commented Mar 31, 2023

Encountered an error creating backports. Some common things that can go wrong:

  1. The backport branch might have already existed.
  2. There was a merge conflict.
  3. The backport branch contained merge commits.

You might need to create your backport manually using the backport tool.


error creating merge commit from 8cb7444 to blathers/backport-release-23.1-99086: POST https://api.github.com/repos/cockroachdb/cockroach/merges: 409 Merge conflict []

you may need to manually resolve merge conflicts with the backport tool.

Backport to branch 23.1.x failed. See errors above.


🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf.

samiskin added a commit to samiskin/cockroach that referenced this pull request Apr 4, 2023
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-13237

This change is a followup to
cockroachdb#99086 which moves the Pubsub sink
to the batching sink framework.

The changes involve:
1. Moves the Pubsub code to match the `SinkClient` interface, moving to using
the lower level v1 pubsub API that lets us publish batches manually
3. Removing the extra call to json.Marshal
4. Moving to using the `pstest` package for validating results in unit tests
5. Adding topic handling to the batching sink, where batches are created
per-topic
6. Added a pubsub_sink_config since it can now handle Retry and Flush config
settings

Release note (performance improvement): pubsub sink changefeeds can now support
higher throughputs by enabling the changefeed.new_pubsub_sink_enabled cluster
setting.
samiskin added a commit to samiskin/cockroach that referenced this pull request Apr 4, 2023
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-13237

This change is a followup to
cockroachdb#99086 which moves the Pubsub sink
to the batching sink framework.

The changes involve:
1. Moves the Pubsub code to match the `SinkClient` interface, moving to using
the lower level v1 pubsub API that lets us publish batches manually
3. Removing the extra call to json.Marshal
4. Moving to using the `pstest` package for validating results in unit tests
5. Adding topic handling to the batching sink, where batches are created
per-topic
6. Added a pubsub_sink_config since it can now handle Retry and Flush config
settings

Release note (performance improvement): pubsub sink changefeeds can now support
higher throughputs by enabling the changefeed.new_pubsub_sink_enabled cluster
setting.
samiskin added a commit to samiskin/cockroach that referenced this pull request Apr 4, 2023
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-13237

This change is a followup to
cockroachdb#99086 which moves the Pubsub sink
to the batching sink framework.

The changes involve:
1. Moves the Pubsub code to match the `SinkClient` interface, moving to using
the lower level v1 pubsub API that lets us publish batches manually
3. Removing the extra call to json.Marshal
4. Moving to using the `pstest` package for validating results in unit tests
5. Adding topic handling to the batching sink, where batches are created
per-topic
6. Added a pubsub_sink_config since it can now handle Retry and Flush config
settings

Release note (performance improvement): pubsub sink changefeeds can now support
higher throughputs by enabling the changefeed.new_pubsub_sink_enabled cluster
setting.
@samiskin
Copy link
Contributor Author

samiskin commented Apr 4, 2023

blathers backport 23.1

samiskin added a commit to samiskin/cockroach that referenced this pull request Apr 5, 2023
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-13237

This change is a followup to
cockroachdb#99086 which moves the Pubsub sink
to the batching sink framework.

The changes involve:
1. Moves the Pubsub code to match the `SinkClient` interface, moving to using
the lower level v1 pubsub API that lets us publish batches manually
3. Removing the extra call to json.Marshal
4. Moving to using the `pstest` package for validating results in unit tests
5. Adding topic handling to the batching sink, where batches are created
per-topic
6. Added a pubsub_sink_config since it can now handle Retry and Flush config
settings

Release note (performance improvement): pubsub sink changefeeds can now support
higher throughputs by enabling the changefeed.new_pubsub_sink_enabled cluster
setting.
samiskin added a commit to samiskin/cockroach that referenced this pull request Apr 6, 2023
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-13237

This change is a followup to
cockroachdb#99086 which moves the Pubsub sink
to the batching sink framework.

The changes involve:
1. Moves the Pubsub code to match the `SinkClient` interface, moving to using
the lower level v1 pubsub API that lets us publish batches manually
3. Removing the extra call to json.Marshal
4. Moving to using the `pstest` package for validating results in unit tests
5. Adding topic handling to the batching sink, where batches are created
per-topic
6. Added a pubsub_sink_config since it can now handle Retry and Flush config
settings

Release note (performance improvement): pubsub sink changefeeds can now support
higher throughputs by enabling the changefeed.new_pubsub_sink_enabled cluster
setting.
craig bot pushed a commit that referenced this pull request Apr 7, 2023
99663: sql: update connExecutor logic for pausable portals r=ZhouXing19 a=ZhouXing19

This PR replaces #96358 and is part of the initial implementation of multiple active portals.

----

This PR is to add limited support for multiple active portals. Now portals satisfying all following restrictions can be paused and resumed (i.e., with other queries interleaving it):

1. Not an internal query;
2. Read-only query;
3. No sub-queries or post-queries.

And such a portal will only have the statement executed with a _non-distributed_ plan. 

This feature is gated by a session variable `multiple_active_portals_enabled`. When it's set `true`, all portals that satisfy the restrictions above will automatically become "pausable" when being created via the pgwire `Bind` stmt. 

The core idea of this implementation is 
1. Add a `switchToAnotherPortal` status to the result-consumption state machine. When we receive an `ExecPortal` message for a different portal, we simply return the control to the connExecutor. (#99052)
2. Persist `flow` `queryID` `span` and `instrumentationHelper` for the portal, and reuse it when we re-execute a portal. This is to ensure we _continue_ the fetching rather than starting all over. (#99173)
3. To enable 2, we need to delay the clean-up of resources till we close the portal. For this we introduced the stacks of cleanup functions. (This PR)

Note that we kept the implementation of the original "un-pausable" portal, as we'd like to limit this new functionality only to a small set of statements. Eventually some of them should be replaced (e.g. the limitedCommandResult's lifecycle) with the new code. 

Also, we don't support distributed plan yet, as it involves much more complicated changes. See `Start with an entirely local plan` section in the [design doc](https://docs.google.com/document/d/1SpKTrTqc4AlGWBqBNgmyXfTweUUsrlqIaSkmaXpznA8/edit). Support for this will come as a follow-up.

Epic: CRDB-17622

Release note (sql change): initial support for multiple active portals. Now with session variable `multiple_active_portals_enabled` set to true,  portals satisfying all following restrictions can be executed in an interleaving manner:  1. Not an internal query; 2. Read-only query; 3. No sub-queries or post-queries. And such a portal will only have the statement executed with an entirely local plan. 





99947: ui: small fixes to DB Console charts shown for secondary tenants r=dhartunian a=abarganier

#97995 updated the
DB Console to filter out KV-specific charts from the metrics page
when viewing DB Console as a secondary application tenant.

The PR missed a couple small details. This patch cleans those
up with the following:

- Removes KV latency charts for app tenants
- Adds a single storage graph for app tenants showing livebytes
- Removes the "Capacity" chart on the Overview dashboard for app
  tenants

Release note: none

Epic: https://cockroachlabs.atlassian.net/browse/CRDB-12100

NB: Please only review the final commit. 1st commit is being reviewed separately @ #99860

100188: changefeedccl: pubsub sink refactor to batching sink r=rickystewart a=samiskin

Epic: https://cockroachlabs.atlassian.net/browse/CRDB-13237

This change is a followup to #99086 which moves the Pubsub sink to the batching sink framework.

The changes involve:
1. Moves the Pubsub code to match the `SinkClient` interface, moving to using the lower level v1 pubsub API that lets us publish batches manually
3. Removing the extra call to json.Marshal
4. Moving to using the `pstest` package for validating results in unit tests
5. Adding topic handling to the batching sink, where batches are created per-topic
6. Added a pubsub_sink_config since it can now handle Retry and Flush config settings
7. Added metrics even to the old pubsub for the purpose of comparing the two versions

At default settings, this resulted in a peak of 90k messages per second on a single node with throughput at 27.6% cpu usage, putting it at a similar level to kafka.

Running pubsub v2 across all of TPCC (nodes ran out of ranges at different speeds):
<img width="637" alt="Screenshot 2023-03-30 at 3 38 25 PM" src="https://user-images.githubusercontent.com/6236424/229863386-edaee27d-9762-4806-bab6-e18b8a6169d6.png">

Running pubsub v1 (barely visible, 2k messages per second) followed by v2 on tpcc.order_line (in v2 only 2 nodes ended up having ranges assigned to them):
<img width="642" alt="Screenshot 2023-04-04 at 12 53 45 PM" src="https://user-images.githubusercontent.com/6236424/229863507-1883ea45-d8ce-437b-9b9c-550afec68752.png">

In the following graphs from the cloud console, where v1 was ran followed by v2, you can see how the main reason v1 was slow was that it wasn't able to batch different keys together.
<img width="574" alt="Screenshot 2023-04-04 at 12 59 51 PM" src="https://user-images.githubusercontent.com/6236424/229864083-758c0814-d53c-447e-84c3-471cf5d56c44.png">

Publish requests remained the same despite way more messages in v2
<img width="1150" alt="Screenshot 2023-04-04 at 1 46 51 PM" src="https://user-images.githubusercontent.com/6236424/229875314-6e07177e-62c4-4c15-b13f-f75e8143e011.png">



Release note (performance improvement): pubsub sink changefeeds can now support higher throughputs by enabling the changefeed.new_pubsub_sink_enabled cluster setting.

100620: pkg/server: move DataDistribution to systemAdminServer r=dhartunian a=abarganier

The DataDistribution endpoint reports replica counts by database and table. When it was built, it operated off the assumption that a range would only ever contain a single table's data within.

Now that we have coalesced ranges, a single range can span multiple tables. Unfortunately, the DataDistribution endpoint does not take this fact into account, meaning it reports garbled and inaccurate data, unless the `spanconfig.storage_coalesce_adjacent.enabled` setting is set to false (see #98820).

For secondary tenants, ranges are *always* coalesced, so this endpoint in its current state could never report meaningful data for a tenant.

Given all of this, we have decided to make this endpoint only available for the system tenant. This patch
accomplishes this by moving the endpoint away from the adminServer and into the systemAdminServer, making it effectively unimplemented for secondary tenants.

Release note: none

Informs: #97942

Co-authored-by: Jane Xing <zhouxing@uchicago.edu>
Co-authored-by: Alex Barganier <abarganier@cockroachlabs.com>
Co-authored-by: Shiranka Miskin <shiranka.miskin@gmail.com>
blathers-crl bot pushed a commit that referenced this pull request Apr 7, 2023
Epic: https://cockroachlabs.atlassian.net/browse/CRDB-13237

This change is a followup to
#99086 which moves the Pubsub sink
to the batching sink framework.

The changes involve:
1. Moves the Pubsub code to match the `SinkClient` interface, moving to using
the lower level v1 pubsub API that lets us publish batches manually
3. Removing the extra call to json.Marshal
4. Moving to using the `pstest` package for validating results in unit tests
5. Adding topic handling to the batching sink, where batches are created
per-topic
6. Added a pubsub_sink_config since it can now handle Retry and Flush config
settings

Release note (performance improvement): pubsub sink changefeeds can now support
higher throughputs by enabling the changefeed.new_pubsub_sink_enabled cluster
setting.
@shermanCRL
Copy link
Contributor

@samiskin do we have benchmarks for before & after? I’d be interested in both throughput and CPU.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport-23.1.x Flags PRs that need to be backported to 23.1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

changefeedccl: Webhook sink is slow
4 participants